Hadoop天气系统

勤奋不是嘴上说说而已,而是实际的行动,在勤奋的苦度中持之以恒,永不退却。业精于勤,荒于嬉;行成于思,毁于随。在人生的仕途上,我们毫不迟疑地选择勤奋,她是几乎于世界上一切成就的催产婆。只要我们拥着勤奋去思考,拥着勤奋的手去耕耘,用抱勤奋的心去对待工作,浪迹红尘而坚韧不拔,那么,我们的生命就会绽放火花,让人生的时光更加的闪亮而精彩。

导读:本篇文章讲解 Hadoop天气系统,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

1.写完计数程序打包成jar
只要class文件即可
2.上传到node1上
3.hadoop jar weather.jar com.hadoop.mr.weather.WeatherSystem

hdfs dfs -ls /data/weather/output
hdfs dfs -cat /data/weather/output/part-r-00000
也可以把内容copy到当前的目录
hdfs dfs -get /data/weather/output/* ./

public class WeatherSystem {

	public static void main(String[] args) throws Exception {
		Configuration configuration = new Configuration(true);
		Job job = Job.getInstance(configuration);
		
		job.setJarByClass(WeatherSystem.class);
		job.setJobName("weather");
		//map start

		
		job.setMapperClass(WeatherMapper.class);
		job.setMapOutputKeyClass(WeatherData.class);
		job.setOutputValueClass(IntWritable.class);
		job.setPartitionerClass(WeatherPartition.class);
		job.setSortComparatorClass(WeatherComparator.class);
//		job.setCombinerClass(cls);
		//map end
		//reduce start
		job.setGroupingComparatorClass(WeatherGroupComparator.class);
    job.setReducerClass(WeatherReducer.class);
		job.setNumReduceTasks(2);
		//reduce end

    
		Path input = new Path("/data/weather/input/weather.txt");
		FileInputFormat.addInputPath(job, input );
		Path output = new Path("/data/weather/output");
		//测试专用为防止出问题
		if(output.getFileSystem(configuration).exists(output)){
			output.getFileSystem(configuration).delete(output, true);
		}
		FileOutputFormat.setOutputPath(job, output);	
		
    // Submit the job, then poll for progress until the job is complete
    job.waitForCompletion(true);

	}
	
}

public class WeatherMapper extends Mapper<LongWritable, Text, WeatherData, IntWritable>{


   private WeatherData wdkey = new WeatherData();
	 private final static IntWritable ivalue = new IntWritable(1);

   public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  	 String str[] = StringUtils.split(value.toString(),'\t');
  	 
  	 try{
	  	 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
	  	 Date date = sdf.parse(str[0]);
	  	 Calendar calendar = Calendar.getInstance();
	  	 calendar.setTime(date);
	  	 wdkey.setYear(calendar.get(calendar.YEAR));
	  	 wdkey.setMonth(calendar.get(calendar.MONTH)+1);
	  	 wdkey.setDay(calendar.get(calendar.DAY_OF_MONTH));
	  	 
	  	 int temperature = Integer.valueOf(str[1].substring(0, str[1].length()-1));
	  	 wdkey.setTemperature(temperature);
	 	 
	  	 ivalue.set(temperature);
	  	 
	  	 context.write(wdkey, ivalue);
  	 } catch (Exception e) {
			// TODO: handle exception
		}

   }

	
}

public class WeatherReducer extends Reducer<WeatherData, IntWritable, Text, IntWritable>{
	//相同的key为一组
	//1999 01 01 38(key)  38(value)
	//1999 01 11 32(key)  32(value)
	//1999 01 12 38(key)  38(value)
	//1999 01 11 28(key)  28(value)
	private Text key = new Text();
	private IntWritable result = new IntWritable();
	
	
	@Override
	protected void reduce(WeatherData data, Iterable<IntWritable> iterable,
	    Reducer<WeatherData, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		
		int flag = 0;
		int day = 0;
		for(IntWritable writable : iterable){
			
			if(flag == 0){
				//1999-01-01 38
				key.set(data.getYear()+"-"+data.getMonth()+"-"+data.getDay()+" " + data.getTemperature());
				result.set(data.getTemperature());
				flag ++;
				day = data.getDay();
				context.write(key, result);
			}
			if(flag != 0 && day != data.getDay()){
				key.set(data.getYear()+"-"+data.getMonth()+"-"+data.getDay()+" " + data.getTemperature());
				result.set(data.getTemperature());
				context.write(key, result);
				break;
			}
		}
		
	}

	
	
	
}

public class WeatherPartition extends Partitioner<WeatherData, IntWritable>{

	@Override
	public int getPartition(WeatherData key, IntWritable value, int numPartitions) {
		// 这里对key做分组,这个方法应该简单,尽量让reduce能并行处理完
		//针对weather没有效果
		return key.hashCode() % numPartitions;
	}

}

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class WeatherComparator extends WritableComparator{
	
	public WeatherComparator(){
		super(WeatherData.class, true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		// TODO Auto-generated method stub
		WeatherData data1 = (WeatherData)a;
		WeatherData data2 = (WeatherData)b;
		int yearCompare = Integer.compare(data1.getYear(), data2.getYear());
		if(yearCompare == 0){
			int monthCompare = Integer.compare(data1.getMonth(), data2.getMonth());
			if(monthCompare == 0){
				int temperatureCompare = Integer.compare(data1.getTemperature(), data2.getTemperature());
				return -temperatureCompare;
			}
			return monthCompare;
		}
		return yearCompare;
	}

	
}

package com.hadoop.mr.weather;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class WeatherData implements WritableComparable<WeatherData>{

	private int year;
	private int month;
	private int day;
	private int temperature;
	
	
	
	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeInt(year);
		out.writeInt(month);
		out.writeInt(day);
		out.writeInt(temperature);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		year = in.readInt();
		month = in.readInt();
		day = in.readInt();
		temperature = in.readInt();
	}

	@Override
	public int compareTo(WeatherData that) {
		// TODO Auto-generated method stub
		
		//日期正序
		int yearCompare = Integer.compare(this.year, that.year);
		if(yearCompare == 0){
			int monthCompare = Integer.compare(this.month, that.month);
			if(monthCompare == 0 ){
				return Integer.compare(this.day, that.day);
			}
			return monthCompare;
		}
		
		return yearCompare;
	}

	/**
	 * @return the year
	 */
	public int getYear() {
		return year;
	}

	/**
	 * @param year the year to set
	 */
	public void setYear(int year) {
		this.year = year;
	}

	/**
	 * @return the month
	 */
	public int getMonth() {
		return month;
	}

	/**
	 * @param month the month to set
	 */
	public void setMonth(int month) {
		this.month = month;
	}

	/**
	 * @return the day
	 */
	public int getDay() {
		return day;
	}

	/**
	 * @param day the day to set
	 */
	public void setDay(int day) {
		this.day = day;
	}

	/**
	 * @return the temperature
	 */
	public int getTemperature() {
		return temperature;
	}

	/**
	 * @param temperature the temperature to set
	 */
	public void setTemperature(int temperature) {
		this.temperature = temperature;
	}

	
	
}

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class WeatherGroupComparator extends WritableComparator{

	public WeatherGroupComparator(){
		super(WeatherData.class, true);
	}
	
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		// TODO Auto-generated method stub
		WeatherData data1 = (WeatherData)a;
		WeatherData data2 = (WeatherData)b;
		int yearCompare = Integer.compare(data1.getYear(), data2.getYear());
		if(yearCompare == 0){
			return Integer.compare(data1.getMonth(), data2.getMonth());
		}
		return yearCompare;
	}
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/140821.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!