/usr/local/hadoop-3.1.1
netstat -nltp| grep 19888 //0.0.0.0
JPS

Hadoop是一个能够对海量数据进行分布式处理的系统架构。
Hadoop框架的核心是:HDFS和MapReduce( map函数和reduce函数 )
安装hadoop时候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息
MapReduce由JobTracker接收用户提交的Job,然后下发任务到各个节点上,由节点上的Task Tracker负责具体执行
-
客户端(client):编写mapreduce程序,配置作业,提交作业,这就是程序员完成的工作;
-
JobTracker:初始化作业,分配作业,与TaskTracker通信,协调整个作业的执行;
-
TaskTracker:保持与JobTracker的通信,在分配的数据片段上执行Map或Reduce任务,TaskTracker和JobTracker的不同有个很重要的方面,就是在执行任务时候TaskTracker可以有n多个,JobTracker则只会有一个(JobTracker只能有一个就和hdfs里namenode一样存在单点故障,我会在后面的mapreduce的相关问题里讲到这个问题的)
-
Hdfs:保存作业的数据、配置信息等等,最后的结果也是保存在hdfs上面


//
需要在一堆扑克牌(张数未知)中统计四种花色的牌有多少张,只需要找几个人,每人给一堆,数出来四种花色的张数,然后汇总给另外一个人就可以了。比如两个人每人一堆扑克牌,查出红桃、黑桃、梅花、方片之
后四个人,每个人只负责统计一种花色,最终将结果汇报给一个人,这是典型的map-reduce模型
//
shuffle阶段又可以分为Map端的shuffle和Reduce端的shuffle。

Map端的shuffle会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是HDFS。每个Map的输出会先写到内存缓冲区中,当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill
Reduce端的shuffle主要包括三个阶段,copy、sort(merge)和reduce。
最后就是Reduce过程了,在这个过程中产生了最终的输出结果,并将其写到HDFS上
本机免密登录
#RSAAuthentication yes
#PubkeyAuthentication yes
#PermitRootLogin no //禁止root用户登录
1.3生成密钥
输入命令 ssh-keygen -t rsa 然后一路回车即可
1.4复制到公共密钥中
cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
1.5再次登录,即可免密匙
[root@master ~]# ssh localhost
maven + host + test
//运行
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar grep input output 'dfs[a-z.]+'
hdfs namenode -format //格式化文件系统
问题1:but there is no HDFS_NAMENODE_USER defined. Aborting launch
# vi etc/hadoop/core-site.xml <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://172.16.10.150:9000</value> </property> </configuration> # vi etc/hadoop/hdfs-site.xml <configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>

hadoop fs -ls /user/test/input/ //检查hdfs文件拷贝
关闭防火墙
systemctl disable firewalld
systtemctl stop firewalld
编辑/etc/selinux/config文件SELINUX=enforcing修改成SELINUX=disable
http://172.16.10.150:9870 //namenode
http://172.16.10.150:9864 //datenode

#执行
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar grep /user/test/input output 'dfs[a-z.]+'
hdfs dfs -cat output/*
sbin/start-yarn.sh
http://172.16.10.150:8088/cluster ResourceManager+NodeMananger
https://blog.csdn.net/xiaoduan_/article/details/79689882 配置并启动jobhistory
#yarn-site.xml <!-- 开启日志聚合 --><property><name>yarn.log-aggregation-enable</name><value>true</value></property> #mapred-site.xml <!-- 设置jobhistoryserver 没有配置的话 history入口不可用 --> <property> <name>mapreduce.jobhistory.address</name> <value>172.16.10.150:10020</value> </property> <!-- 配置web端口 --> <property> <name>mapreduce.jobhistory.webapp.address</name> <value>172.16.10.150:19888</value> </property> <!-- 配置正在运行中的日志在hdfs上的存放路径 --> <property> <name>mapreduce.jobhistory.intermediate-done-dir</name> <value>/history/done_intermediate</value> </property> <!-- 配置运行过的日志存放在hdfs上的存放路径 --> <property> <name>mapreduce.jobhistory.done-dir</name> <value>/history/done</value> </property>
#启动
sbin/mr-jobhistory-daemon.sh start historyserver #jobhistory

=============================================
IDEA远程调用hadoop
1. https://github.com/steveloughran/winutils -hadoop.dll winutils.exe
2. 配置环境变量HADOOP_HOME classpath. [winutils.exe放入新建的文件夹hadoop3.1.1/bin]
3. 运行代码POM.xml
3. 运行代码POM.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.jason</groupId> <artifactId>testWord</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceencoding>UTF-8</project.build.sourceencoding> <hadoop.version>3.1.1</hadoop.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.2</version> </dependency> </dependencies> </project>
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.junit.Before; import org.junit.Test; import java.net.URI; import java.util.Iterator; import java.util.Map.Entry; /** * * 客户端去操作hdfs时,是有一个用户身份的 * 默认情况下,hdfs客户端api会从jvm中获取一个参数来作为自己的用户身份:-DHADOOP_USER_NAME=hadoop * * 也可以在构造客户端fs对象时,通过参数传递进去 * @author * */ public class HdfsClientDemo { FileSystem fs = null; Configuration conf = null; static String ip = "172.16.10.150"; @Before public void init() throws Exception{ conf = new Configuration(); //conf.set("fs.defaultFS", "hdfs://"+ip+":9000"); //conf.set("hadoop.home.dir", "D:/Developer/hadoop-2.8.4"); //拿到一个文件系统操作的客户端实例对象 /*fs = FileSystem.get(conf);*/ //可以直接传入 uri和用户身份 fs = FileSystem.get(new URI("hdfs://"+ip+":9000"),conf, "root"); //最后一个参数为用户名 } @Test public void testUpload() throws Exception { Thread.sleep(2000); fs.copyFromLocalFile(new Path("F:\\工作目录\\users.txt"), new Path("/input/users.txt")); fs.close(); } @Test public void listTest() throws Exception{ FileStatus[] listStatus = fs.listStatus(new Path("/")); for (FileStatus fileStatus : listStatus) { System.err.println(fileStatus.getPath()+"================="+fileStatus.toString()); } //会递归找到所有的文件 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/output/"), true); while(listFiles.hasNext()){ LocatedFileStatus next = listFiles.next(); String name = next.getPath().getName(); Path path = next.getPath(); System.out.println(name + "---" + path.toString()); } } @Test public void testDownload() throws Exception { fs.copyToLocalFile(new Path("/user/test/input/users.txt"), new Path("d:/users.txt")); fs.close(); } @Test public void testConf(){ Iterator<Entry<String, String>> iterator = conf.iterator(); while (iterator.hasNext()) { Entry<String, String> entry = iterator.next(); System.out.println(entry.getValue() + "--" + entry.getValue());//conf加载的内容 } } /** * 创建目录 */ @Test public void makdirTest() throws Exception { boolean mkdirs = fs.mkdirs(new Path("/aaa/bbb")); System.out.println(mkdirs); } /** * 删除 */ public static void deleteDir(FileSystem fs, String dir) throws Exception{ boolean delete = fs.delete(new Path("/"+dir), true);//true, 递归删除 System.out.println(delete); } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://"+ip+":9000"); //拿到一个文件系统操作的客户端实例对象 FileSystem fs = FileSystem.get(conf); fs.copyFromLocalFile(new Path("D:/unLockPass.txt"), new Path("/input/unLockPass.txt")); fs.close(); } }
===================================WordCount================================================

import org.apache.hadoop.fs.*; import org.apache.hadoop.io.LongWritable; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCount { static String ip = "172.16.10.150"; public static class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { private final IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer token = new StringTokenizer(line); while (token.hasMoreTokens()) { word.set(token.nextToken()); context.write(word, one); } } } public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void printDSFile(FileSystem fs, String dir){ BufferedReader in = null; String line; try { System.out.println("uri:" + fs.getUri()); RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"+dir+"/"), true); while(listFiles.hasNext()){ LocatedFileStatus next = listFiles.next(); String name = next.getPath().getName(); Path path = next.getPath(); FSDataInputStream result = fs.open(path); in = new BufferedReader(new InputStreamReader(result, "UTF-8")); String out=""; while ((line = in.readLine()) != null) { out+=line+"\r\n"; } System.out.println(name + ":\r\n" + out); } }catch(IOException ex){ ex.printStackTrace(); }finally { if (in != null) { try { in.close(); }catch(IOException ex){ ex.printStackTrace(); } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://"+ip+":9000"); FileSystem fs = FileSystem.get(new URI("hdfs://"+ip+":9000"),conf, "root"); HdfsClientDemo.deleteDir(fs, "output"); Job job = new Job(conf); job.setJarByClass(WordCount.class); job.setJobName("wordcount"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(fs.getUri()+"/input/")); FileOutputFormat.setOutputPath(job, new Path(fs.getUri()+"/output/")); job.waitForCompletion(true); printDSFile(fs, "output"); /* fs.getHomeDirectory() - xxxx/user/root hadoop fs -rmdir /output hadoop fs -cat /output/* */ } }

本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名,转载请标明出处
最后编辑时间为:
2019/12/20 15:54