Hadoop实践

/ Java / 0 条评论 / 87浏览
http://apache.fayea.com/hadoop/common/
https://blog.csdn.net/yongge1981/article/details/80504935 hadoop3更新
/usr/local/hadoop-3.1.1
http://hadoop.apache.org/docs/r3.1.1/
netstat -nltp| grep 19888 //0.0.0.0
JPS
Hadoop是一个能够对海量数据进行分布式处理的系统架构。
Hadoop框架的核心是:HDFS和MapReduce( map函数和reduce函数 )
安装hadoop时候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息 

MapReduce由JobTracker接收用户提交的Job,然后下发任务到各个节点上,由节点上的Task Tracker负责具体执行 
  1. 客户端(client):编写mapreduce程序,配置作业,提交作业,这就是程序员完成的工作;
  2. JobTracker:初始化作业,分配作业,与TaskTracker通信,协调整个作业的执行;
  3. TaskTracker:保持与JobTracker的通信,在分配的数据片段上执行Map或Reduce任务,TaskTracker和JobTracker的不同有个很重要的方面,就是在执行任务时候TaskTracker可以有n多个,JobTracker则只会有一个(JobTracker只能有一个就和hdfs里namenode一样存在单点故障,我会在后面的mapreduce的相关问题里讲到这个问题的)
  4. Hdfs:保存作业的数据、配置信息等等,最后的结果也是保存在hdfs上面

//
需要在一堆扑克牌(张数未知)中统计四种花色的牌有多少张,只需要找几个人,每人给一堆,数出来四种花色的张数,然后汇总给另外一个人就可以了。比如两个人每人一堆扑克牌,查出红桃、黑桃、梅花、方片之
后四个人,每个人只负责统计一种花色,最终将结果汇报给一个人,这是典型的map-reduce模型
//
shuffle阶段又可以分为Map端的shuffle和Reduce端的shuffle。
https://blog.csdn.net/afandaafandaafanda/article/details/52828870

Map端的shuffle会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是HDFS。每个Map的输出会先写到内存缓冲区中,当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill 
 Reduce端的shuffle主要包括三个阶段,copy、sort(merge)和reduce。
 最后就是Reduce过程了,在这个过程中产生了最终的输出结果,并将其写到HDFS上

本机免密登录
#RSAAuthentication yes
#PubkeyAuthentication yes
#PermitRootLogin no //禁止root用户登录
1.3生成密钥
输入命令 ssh-keygen -t rsa  然后一路回车即可
1.4复制到公共密钥中
cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
1.5再次登录,即可免密匙
[root@master ~]# ssh localhost

https://www.2cto.com/net/201706/651203.html
maven + host + test
//运行
hadoop jar  share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar grep input output 'dfs[a-z.]+'
hdfs namenode -format  //格式化文件系统
问题1:but there is no HDFS_NAMENODE_USER defined. Aborting launch
https://blog.csdn.net/mxfeng/article/details/72770432?locationNum=5&fps=1
# vi etc/hadoop/core-site.xml
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://172.16.10.150:9000</value>
    </property>
</configuration>
# vi etc/hadoop/hdfs-site.xml
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

hadoop fs -ls /user/test/input/   //检查hdfs文件拷贝

关闭防火墙
systemctl disable firewalld
systtemctl stop firewalld
编辑/etc/selinux/config文件SELINUX=enforcing修改成SELINUX=disable

http://172.16.10.150:9870  //namenode
http://172.16.10.150:9864  //datenode


#执行
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar grep /user/test/input output 'dfs[a-z.]+'
hdfs dfs -cat output/*
sbin/start-yarn.sh
http://172.16.10.150:8088/cluster ResourceManager+NodeMananger
https://blog.csdn.net/xiaoduan_/article/details/79689882 配置并启动jobhistory
#yarn-site.xml <!-- 开启日志聚合 --><property><name>yarn.log-aggregation-enable</name><value>true</value></property> #mapred-site.xml <!-- 设置jobhistoryserver 没有配置的话 history入口不可用 --> <property> <name>mapreduce.jobhistory.address</name> <value>172.16.10.150:10020</value> </property> <!-- 配置web端口 --> <property> <name>mapreduce.jobhistory.webapp.address</name> <value>172.16.10.150:19888</value> </property> <!-- 配置正在运行中的日志在hdfs上的存放路径 --> <property> <name>mapreduce.jobhistory.intermediate-done-dir</name> <value>/history/done_intermediate</value> </property> <!-- 配置运行过的日志存放在hdfs上的存放路径 --> <property> <name>mapreduce.jobhistory.done-dir</name> <value>/history/done</value> </property>

#启动
sbin/mr-jobhistory-daemon.sh start historyserver #jobhistory
=============================================
https://www.cnblogs.com/zh-yp/p/7884084.html
IDEA远程调用hadoop
1. https://github.com/steveloughran/winutils -hadoop.dll winutils.exe
2. 配置环境变量HADOOP_HOME classpath. [winutils.exe放入新建的文件夹hadoop3.1.1/bin]
3. 运行代码POM.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.jason</groupId> <artifactId>testWord</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceencoding>UTF-8</project.build.sourceencoding> <hadoop.version>3.1.1</hadoop.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.2</version> </dependency> </dependencies> </project> 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Before;
import org.junit.Test;

import java.net.URI;
import java.util.Iterator;
import java.util.Map.Entry;
/**
*
* 客户端去操作hdfs时,是有一个用户身份的
* 默认情况下,hdfs客户端api会从jvm中获取一个参数来作为自己的用户身份:-DHADOOP_USER_NAME=hadoop
*
* 也可以在构造客户端fs对象时,通过参数传递进去
* @author
*
*/
public class HdfsClientDemo {
FileSystem fs = null;
Configuration conf = null;
static String ip = "172.16.10.150";

@Before
public void init() throws Exception{
conf = new Configuration();
//conf.set("fs.defaultFS", "hdfs://"+ip+":9000");
//conf.set("hadoop.home.dir", "D:/Developer/hadoop-2.8.4");
//拿到一个文件系统操作的客户端实例对象
/*fs = FileSystem.get(conf);*/
//可以直接传入 uri和用户身份
fs = FileSystem.get(new URI("hdfs://"+ip+":9000"),conf, "root"); //最后一个参数为用户名
}

@Test
public void testUpload() throws Exception {
Thread.sleep(2000);
fs.copyFromLocalFile(new Path("F:\\工作目录\\users.txt"), new Path("/input/users.txt"));
fs.close();
}

@Test
public void listTest() throws Exception{

FileStatus[] listStatus = fs.listStatus(new Path("/"));
for (FileStatus fileStatus : listStatus) {
System.err.println(fileStatus.getPath()+"================="+fileStatus.toString());
}
//会递归找到所有的文件
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/output/"), true);
while(listFiles.hasNext()){
LocatedFileStatus next = listFiles.next();
String name = next.getPath().getName();
Path path = next.getPath();
System.out.println(name + "---" + path.toString());
}
}
@Test
public void testDownload() throws Exception {
fs.copyToLocalFile(new Path("/user/test/input/users.txt"), new Path("d:/users.txt"));
fs.close();
}
@Test
public void testConf(){
Iterator<Entry<String, String>> iterator = conf.iterator();
while (iterator.hasNext()) {
Entry<String, String> entry = iterator.next();
System.out.println(entry.getValue() + "--" + entry.getValue());//conf加载的内容
}
}
/**
* 创建目录
*/
@Test
public void makdirTest() throws Exception {
boolean mkdirs = fs.mkdirs(new Path("/aaa/bbb"));
System.out.println(mkdirs);
}
/**
* 删除
*/
public static void deleteDir(FileSystem fs, String dir) throws Exception{
boolean delete = fs.delete(new Path("/"+dir), true);//true, 递归删除
System.out.println(delete);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://"+ip+":9000");
//拿到一个文件系统操作的客户端实例对象
FileSystem fs = FileSystem.get(conf);
fs.copyFromLocalFile(new Path("D:/unLockPass.txt"), new Path("/input/unLockPass.txt"));
fs.close();
}
}
===================================WordCount================================================
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.LongWritable;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
static String ip = "172.16.10.150";

public static class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
private final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
word.set(token.nextToken());
context.write(word, one);
}
}
}
public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}

public static void printDSFile(FileSystem fs, String dir){
BufferedReader in = null;
String line;
try {
System.out.println("uri:" + fs.getUri());
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"+dir+"/"), true);
while(listFiles.hasNext()){
LocatedFileStatus next = listFiles.next();
String name = next.getPath().getName();
Path path = next.getPath();
FSDataInputStream result = fs.open(path);
in = new BufferedReader(new InputStreamReader(result, "UTF-8"));
String out="";
while ((line = in.readLine()) != null) {
out+=line+"\r\n";
}
System.out.println(name + ":\r\n" + out);
}
}catch(IOException ex){
ex.printStackTrace();
}finally {
if (in != null) {
try {
in.close();
}catch(IOException ex){
ex.printStackTrace();
}
}
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://"+ip+":9000");
FileSystem fs = FileSystem.get(new URI("hdfs://"+ip+":9000"),conf, "root");
HdfsClientDemo.deleteDir(fs, "output");
Job job = new Job(conf);
job.setJarByClass(WordCount.class);
job.setJobName("wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(fs.getUri()+"/input/"));
FileOutputFormat.setOutputPath(job, new Path(fs.getUri()+"/output/"));
job.waitForCompletion(true);

printDSFile(fs, "output");
/*
fs.getHomeDirectory() - xxxx/user/root
hadoop fs -rmdir /output
hadoop fs -cat /output/*
*/
}
}