Hadoop经典综合性案例—温度排序示例

IT 文章5年前 (2020)发布 小编
0 0 0

温度排序示例是一个综合性比较强的Hadoop经典案例,除了基础的MapReduce,还有自定义序列化对象、分区、分组、自定义排序等相关知识,对于刚入门的同学来说,理解起来可能会稍有困难。

假设有多年气温数据,如下:

1949-10-01 14:21:02	34
1949-10-02 15:01:01	36
1949-10-03 15:01:01	39
1949-10-04 17:01:01	38
1949-10-05 18:01:01	42
1950-01-01 11:21:02	32
1950-01-02 12:21:02	37
1950-10-03 12:21:02	27
1950-10-01 12:21:02	37
1950-10-02 12:21:02	41
1951-07-01 12:21:02	45
1951-07-02 13:21:02	41
1951-12-01 12:21:02	20
1951-12-02 13:21:02	27
1951-07-03 11:21:03	47
1951-07-04 14:21:03	39
1951-07-05 14:21:03	43
1951-12-03 12:21:02	23

注意:模拟数据存放在weather.txt中,时间和温度之间不是空格,而是tab键隔开的。

ad

程序员导航

优网导航旗下整合全网优质开发资源,一站式IT编程学习与工具大全网站

现在要求找到每年每月的3个最高温度时刻并按照温度进行降序排序,同时由于数据数量可能会很大,为了提高效率,将每一年的数据分别交给不同的reduce执行,产生不同的文件。

1、将weather.txt上传至HDFS的/input目录下

2、自定义时间温度的封装类MyKey.java

/**
 * 自定义时间温度的封装类
 */
public class MyKey implements WritableComparable {

	private int year;//年
	private int month;//月
	private int temp;//温度

	public int getYear() {
		return year;
	}

	public void setYear(int year) {
		this.year = year;
	}

	public int getMonth() {
		return month;
	}

	public void setMonth(int month) {
		this.month = month;
	}

	public int getTemp() {
		return temp;
	}

	public void setTemp(int temp) {
		this.temp = temp;
	}

	//序列化:通过write方法写入序列化数据流
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(year);
		out.writeInt(month);
		out.writeInt(temp);
	}

	//反序列化:通过readFields方法从序列化的数据流中读出进行赋值
	@Override
	public void readFields(DataInput in) throws IOException {
		//注意读取顺序要和序列化顺序保持一致
		year = in.readInt();
		month = in.readInt();
		temp = in.readInt();
	}

	//按照字典顺序进行比较,返回值是一个int型
	public int compareTo(MyKey mykey) {
		return this==mykey?0:-1;
	}
}

3、自定义Mapper

ad

AI 工具导航

优网导航旗下AI工具导航,精选全球千款优质 AI 工具集

/**
 * 自定义Mapper
 * 对k1,v1进行处理,提取年、月和温度,封装成MyKey
 */
public class MyMapper extends Mapper {
	@Override
	protected void map(LongWritable k1, Text v1, Context context)
			throws IOException, InterruptedException {
		//按照tab键切割v1
		String[] datas = v1.toString().split("\t");
		//获取日期
		String date = datas[0];
		//获取温度
		int temp = Integer.parseInt(datas[1]);
		//切割日期,获取年和月
		String[] dates = date.split("-");
		int year = Integer.parseInt(dates[0]);
		int month = Integer.parseInt(dates[1]);
		//组装
		MyKey k2 = new MyKey();
		k2.setYear(year);
		k2.setMonth(month);
		k2.setTemp(temp);
		Text v2 = v1;
		//写出
		context.write(k2, v2);
	}
}

4、自定义分区

/**
 * 自定义分区:溢写之前对进行分区
 * 要求相同年份的数据在同一个分区,我们这针对1949年、1950年和1951年分成对应
 * 3个分区,对应3个reduce task进行处理
 */
public class MyPartitioner extends Partitioner {

	@Override
	public int getPartition(MyKey k2, Text v2, int numPartitions) {
		/*
		 * 假如1949年、1950年和1951年这3年:
		 * 1949-1949=0;得到第1个分区
		 * 1950-1949=1;得到第2个分区
		 * 1952-1949=2;得到第3个分区
		 */
		return (k2.getYear()-1949)%numPartitions;
	}

}

5、自定义Sort

/**
 * 自定义排序比较器:在溢写之前,分区之后对分区内按照k2或者k2的部分进行排序
 * 这里我们按照年、月、温度进行排序,先以年做比较,如果年相等就比较月,年不相等
 * 就返回比较结果,如果月相等就以温度倒序排序,月不相等就返回比较结果
 * 需要继承WritableComparator,重新compare(WritableComparable a, WritableComparable b)方法
 */
public class MySort extends WritableComparator {
	//使用super()调用序列化构造函数
	public MySort() {
		super(MyKey.class,true);
	}
	//重写compare
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		//将WritableComparable强转为MyKey
		MyKey mykey1 = (MyKey)a;
		MyKey mykey2 = (MyKey)b;
		//先比较年
		int r1 = Integer.compare(mykey1.getYear(), mykey2.getYear());
		//如果年相等,就比较月
		if(r1==0) {
			int r2 = Integer.compare(mykey1.getMonth(), mykey2.getMonth());
			//如果月相等,就对温度进行倒序排序,加-,表示倒序排序
			if(r2==0) {
				return -Integer.compare(mykey1.getTemp(), mykey2.getTemp());
			}
			return r2;
		}
		return r1;
	}
}

6、自定义Group

/**
 * 自定义分组比较器:在溢写之前,分区排序之后,对同一分区内排序好的按照k2或k2的部分进行分组
 * 相同key的数据对应的value制作成一个iterable
 * 这里我们按照年月进行分组,同年同月分一组,先以年做比较,如果年不相等返回比较结果,
 * 如果年相等,返回月份的比较值
 */
public class MyGroup extends WritableComparator {
	//使用super()调用序列化构造函数
	public MyGroup() {
		super(MyKey.class,true);
	}
	//重写compare
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		//将WritableComparable强转为MyKey
		MyKey mykey1 = (MyKey)a;
		MyKey mykey2 = (MyKey)b;
		//先以年做比较
		int r1 = Integer.compare(mykey1.getYear(), mykey2.getYear());
		//如果年相等,返回月份的比较值
		if(r1==0) {
			return Integer.compare(mykey1.getMonth(), mykey2.getMonth());
		}
		return r1;
	}

}

7、自定义Reducer

public class MyReducer extends Reducer {

	@Override
	protected void reduce(MyKey k2, Iterable v2s, Context context)
			throws IOException, InterruptedException {
		System.out.println(k2.getYear()+"-"+k2.getMonth()+"-"+k2.getTemp());
		//遍历v2s,取前3个纪录
		int num = 0;//计数器
		for(Text v2:v2s) {
			if(++num>3) {//超过3条,结束循环
				break;
			}else {
				//k3为空
				Text v3 = v2;
				//写出
				context.write(NullWritable.get(), v3);
			}
		}
	}
}

8、自定义Job

/**
 * 自定义温度排序处理任务
 */
public class MyRunJob {
	public static void main(String[] args) {
		try {
			if(args.length!=2) {
				System.exit(1);
			}
			Configuration conf = new Configuration();
			Job job = Job.getInstance(conf);
			job.setJarByClass(MyRunJob.class);
			job.setMapperClass(MyMapper.class);
			job.setReducerClass(MyReducer.class);
			FileInputFormat.setInputPaths(job, new Path(args[0]));
			FileOutputFormat.setOutputPath(job, new Path(args[1]));
			//设置自定义分区
			job.setPartitionerClass(MyPartitioner.class);
			//设置自定义排序
			job.setSortComparatorClass(MySort.class);
			//设置自定义分组
			job.setGroupingComparatorClass(MyGroup.class);
			//设置Reducer数量
			job.setNumReduceTasks(3);
			//设置Mapper输出类型
			job.setMapOutputKeyClass(MyKey.class);
			job.setMapOutputValueClass(Text.class);
			//设置Reducer输出类型
			job.setOutputKeyClass(NullWritable.class);
			job.setOutputValueClass(Text.class);
			//提交job
			job.waitForCompletion(true);
		} catch (IOException | ClassNotFoundException | InterruptedException e) {
			e.printStackTrace();
		}

	}
}

9、生成jar上传运行

ad

免费在线工具导航

优网导航旗下整合全网优质免费、免注册的在线工具导航大全

hadoop jar weather-0.0.1-SNAPSHOT.jar com.pzy.weather.MyRunJob /input/weather.txt /outputweather

10、查看结果
Hadoop经典综合性案例—温度排序示例

© 版权声明

相关文章

暂无评论

暂无评论...