Hadoop MapReduce求某年最高温度

1091621048773_.pic_hd.jpg

环境

  • CentOS 6.8 64位 1核 2GB

  • JDK 1.7.0_75 64 位

  • Hadoop 1.1.2

启动Hadoop

  • 进入 /app/hadoop-1.1.2/bin 目录
1
$ cd /app/hadoop-1.1.2/bin
  • 启动全部进程
1
2
3
4
5
6
$ ./start-all.sh
starting namenode, logging to /app/hadoop-1.1.2/libexec/../logs/hadoop-yohann-namenode-VM-2-14-centos.out
hadoop: starting datanode, logging to /app/hadoop-1.1.2/libexec/../logs/hadoop-yohann-datanode-VM-2-14-centos.out
hadoop: starting secondarynamenode, logging to /app/hadoop-1.1.2/libexec/../logs/hadoop-yohann-secondarynamenode-VM-2-14-centos.out
starting jobtracker, logging to /app/hadoop-1.1.2/libexec/../logs/hadoop-yohann-jobtracker-VM-2-14-centos.out
hadoop: starting tasktracker, logging to /app/hadoop-1.1.2/libexec/../logs/hadoop-yohann-tasktracker-VM-2-14-centos.out
  • 查看启动进程
1
2
3
4
5
6
7
$ jps
26352 DataNode
26560 JobTracker
26235 NameNode
26668 TaskTracker
26470 SecondaryNameNode
18989 Jps

确保以上进程都被启动。

创建代码

  • 创建 /app/hadoop-1.1.2/myclass 目录并进入
1
2
3
$ cd /app/hadoop-1.1.2
$ mkdir myclass
$ cd myclass
  • 创建 MaxTemperature.java,代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {

public static void main(String[] args) throws Exception {
if(args.length != 2) {
System.err.println("Usage: MaxTemperature<input path> <output path>");
System.exit(-1);
}

Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
  • 创建 MaxTemperatureMapper.java,代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

private static final int MISSING = 9999;

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
String year = line.substring(15, 19);

int airTemperature;
if(line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}

String quality = line.substring(92, 93);
if(airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
  • 创建 MaxTemperatureReducer.java,代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int maxValue = Integer.MIN_VALUE;
for(IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}

编译代码

  • 编译
1
$ javac -classpath ../hadoop-core-1.1.2.jar MaxTemperature*.java

使用以上命令对刚刚创建的 java 代码进行编译,为保证编译成功,加入 classpath 变量,引入 hadoop-core-1.1.2.jar 包。

  • 把编译好的class文件打包
1
$ jar cvf ./MaxTemperature.jar ./MaxTemperature*.class
  • 把打好的包移动到上级目录
1
$ mv MaxTemperature.jar ..
  • 删除编译好的class文件
1
$ rm MaxTemperature*.class

测试

  • 准备气象数据

气象数据具体的下载地址为 ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/, 本次测试使用了1971-1973年的气象数据。

  • 整合气象数据
1
$ zcat *.gz > temperature.txt

执行以上命令把气象数据文件解压并整合到 temperature.txt 文件中。

  • 在HDFS中创建/max/in目录
1
$ hadoop fs -mkdir -p /max/in
  • 把 temperature.txt 上传到 /max/in 目录中
1
$ hadoop fs -copyFromLocal temperature.txt /max/in
  • 在HDFS中查看 /max/in 目录
1
2
3
$ hadoop fs -ls /max/in
Found 1 items
-rw-r--r-- 1 yohann supergroup 46337829 2021-05-15 14:54 /max/in/temperature.txt
  • 在 /app/hadoop-1.1.2 目录中执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
$ hadoop jar MaxTemperature.jar MaxTemperature /max/in/temperature.txt  /max/out
21/05/15 14:55:51 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
21/05/15 14:55:51 INFO input.FileInputFormat: Total input paths to process : 1
21/05/15 14:55:51 INFO util.NativeCodeLoader: Loaded the native-hadoop library
21/05/15 14:55:51 WARN snappy.LoadSnappy: Snappy native library not loaded
21/05/15 14:55:52 INFO mapred.JobClient: Running job: job_202105151050_0001
21/05/15 14:55:53 INFO mapred.JobClient: map 0% reduce 0%
21/05/15 14:56:01 INFO mapred.JobClient: map 100% reduce 0%
21/05/15 14:56:08 INFO mapred.JobClient: map 100% reduce 33%
21/05/15 14:56:11 INFO mapred.JobClient: map 100% reduce 100%
21/05/15 14:56:11 INFO mapred.JobClient: Job complete: job_202105151050_0001
21/05/15 14:56:11 INFO mapred.JobClient: Counters: 29
21/05/15 14:56:11 INFO mapred.JobClient: Job Counters
21/05/15 14:56:11 INFO mapred.JobClient: Launched reduce tasks=1
21/05/15 14:56:11 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=7522
21/05/15 14:56:11 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
21/05/15 14:56:11 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
21/05/15 14:56:11 INFO mapred.JobClient: Launched map tasks=1
21/05/15 14:56:11 INFO mapred.JobClient: Data-local map tasks=1
21/05/15 14:56:11 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=9851
21/05/15 14:56:11 INFO mapred.JobClient: File Output Format Counters
21/05/15 14:56:11 INFO mapred.JobClient: Bytes Written=27
21/05/15 14:56:11 INFO mapred.JobClient: FileSystemCounters
21/05/15 14:56:11 INFO mapred.JobClient: FILE_BYTES_READ=2297180
21/05/15 14:56:11 INFO mapred.JobClient: HDFS_BYTES_READ=46337935
21/05/15 14:56:11 INFO mapred.JobClient: FILE_BYTES_WRITTEN=4698774
21/05/15 14:56:11 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=27
21/05/15 14:56:11 INFO mapred.JobClient: File Input Format Counters
21/05/15 14:56:11 INFO mapred.JobClient: Bytes Read=46337829
21/05/15 14:56:11 INFO mapred.JobClient: Map-Reduce Framework
21/05/15 14:56:11 INFO mapred.JobClient: Map output materialized bytes=2297180
21/05/15 14:56:11 INFO mapred.JobClient: Map input records=211054
21/05/15 14:56:11 INFO mapred.JobClient: Reduce shuffle bytes=2297180
21/05/15 14:56:11 INFO mapred.JobClient: Spilled Records=417668
21/05/15 14:56:11 INFO mapred.JobClient: Map output bytes=1879506
21/05/15 14:56:11 INFO mapred.JobClient: Total committed heap usage (bytes)=159387648
21/05/15 14:56:11 INFO mapred.JobClient: CPU time spent (ms)=3370
21/05/15 14:56:11 INFO mapred.JobClient: Combine input records=0
21/05/15 14:56:11 INFO mapred.JobClient: SPLIT_RAW_BYTES=106
21/05/15 14:56:11 INFO mapred.JobClient: Reduce input records=208834
21/05/15 14:56:11 INFO mapred.JobClient: Reduce input groups=3
21/05/15 14:56:11 INFO mapred.JobClient: Combine output records=0
21/05/15 14:56:11 INFO mapred.JobClient: Physical memory (bytes) snapshot=262066176
21/05/15 14:56:11 INFO mapred.JobClient: Reduce output records=3
21/05/15 14:56:11 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1548926976
21/05/15 14:56:11 INFO mapred.JobClient: Map output records=208834
  • 查看结果,温度需要除以10
1
2
3
4
$ hadoop fs -cat /max/out/part-r-00000
1971 400
1972 411
1973 430

也可以访问 http://当前IP:50030/jobtracker.jsp ,通过页面查看结果。

-------------本文结束感谢您的阅读-------------
0%