1.写完计数程序打包成jar
只要class文件即可
2.上传到node1上
3.hadoop jar weather.jar com.hadoop.mr.weather.WeatherSystem
hdfs dfs -ls /data/weather/output
hdfs dfs -cat /data/weather/output/part-r-00000
也可以把内容copy到当前的目录
hdfs dfs -get /data/weather/output/* ./
public class WeatherSystem {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration(true);
Job job = Job.getInstance(configuration);
job.setJarByClass(WeatherSystem.class);
job.setJobName("weather");
//map start
job.setMapperClass(WeatherMapper.class);
job.setMapOutputKeyClass(WeatherData.class);
job.setOutputValueClass(IntWritable.class);
job.setPartitionerClass(WeatherPartition.class);
job.setSortComparatorClass(WeatherComparator.class);
// job.setCombinerClass(cls);
//map end
//reduce start
job.setGroupingComparatorClass(WeatherGroupComparator.class);
job.setReducerClass(WeatherReducer.class);
job.setNumReduceTasks(2);
//reduce end
Path input = new Path("/data/weather/input/weather.txt");
FileInputFormat.addInputPath(job, input );
Path output = new Path("/data/weather/output");
//测试专用为防止出问题
if(output.getFileSystem(configuration).exists(output)){
output.getFileSystem(configuration).delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);
// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);
}
}
public class WeatherMapper extends Mapper<LongWritable, Text, WeatherData, IntWritable>{
private WeatherData wdkey = new WeatherData();
private final static IntWritable ivalue = new IntWritable(1);
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str[] = StringUtils.split(value.toString(),'\t');
try{
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date date = sdf.parse(str[0]);
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
wdkey.setYear(calendar.get(calendar.YEAR));
wdkey.setMonth(calendar.get(calendar.MONTH)+1);
wdkey.setDay(calendar.get(calendar.DAY_OF_MONTH));
int temperature = Integer.valueOf(str[1].substring(0, str[1].length()-1));
wdkey.setTemperature(temperature);
ivalue.set(temperature);
context.write(wdkey, ivalue);
} catch (Exception e) {
// TODO: handle exception
}
}
}
public class WeatherReducer extends Reducer<WeatherData, IntWritable, Text, IntWritable>{
//相同的key为一组
//1999 01 01 38(key) 38(value)
//1999 01 11 32(key) 32(value)
//1999 01 12 38(key) 38(value)
//1999 01 11 28(key) 28(value)
private Text key = new Text();
private IntWritable result = new IntWritable();
@Override
protected void reduce(WeatherData data, Iterable<IntWritable> iterable,
Reducer<WeatherData, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int flag = 0;
int day = 0;
for(IntWritable writable : iterable){
if(flag == 0){
//1999-01-01 38
key.set(data.getYear()+"-"+data.getMonth()+"-"+data.getDay()+" " + data.getTemperature());
result.set(data.getTemperature());
flag ++;
day = data.getDay();
context.write(key, result);
}
if(flag != 0 && day != data.getDay()){
key.set(data.getYear()+"-"+data.getMonth()+"-"+data.getDay()+" " + data.getTemperature());
result.set(data.getTemperature());
context.write(key, result);
break;
}
}
}
}
public class WeatherPartition extends Partitioner<WeatherData, IntWritable>{
@Override
public int getPartition(WeatherData key, IntWritable value, int numPartitions) {
// 这里对key做分组,这个方法应该简单,尽量让reduce能并行处理完
//针对weather没有效果
return key.hashCode() % numPartitions;
}
}
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class WeatherComparator extends WritableComparator{
public WeatherComparator(){
super(WeatherData.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
WeatherData data1 = (WeatherData)a;
WeatherData data2 = (WeatherData)b;
int yearCompare = Integer.compare(data1.getYear(), data2.getYear());
if(yearCompare == 0){
int monthCompare = Integer.compare(data1.getMonth(), data2.getMonth());
if(monthCompare == 0){
int temperatureCompare = Integer.compare(data1.getTemperature(), data2.getTemperature());
return -temperatureCompare;
}
return monthCompare;
}
return yearCompare;
}
}
package com.hadoop.mr.weather;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class WeatherData implements WritableComparable<WeatherData>{
private int year;
private int month;
private int day;
private int temperature;
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(year);
out.writeInt(month);
out.writeInt(day);
out.writeInt(temperature);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
year = in.readInt();
month = in.readInt();
day = in.readInt();
temperature = in.readInt();
}
@Override
public int compareTo(WeatherData that) {
// TODO Auto-generated method stub
//日期正序
int yearCompare = Integer.compare(this.year, that.year);
if(yearCompare == 0){
int monthCompare = Integer.compare(this.month, that.month);
if(monthCompare == 0 ){
return Integer.compare(this.day, that.day);
}
return monthCompare;
}
return yearCompare;
}
/**
* @return the year
*/
public int getYear() {
return year;
}
/**
* @param year the year to set
*/
public void setYear(int year) {
this.year = year;
}
/**
* @return the month
*/
public int getMonth() {
return month;
}
/**
* @param month the month to set
*/
public void setMonth(int month) {
this.month = month;
}
/**
* @return the day
*/
public int getDay() {
return day;
}
/**
* @param day the day to set
*/
public void setDay(int day) {
this.day = day;
}
/**
* @return the temperature
*/
public int getTemperature() {
return temperature;
}
/**
* @param temperature the temperature to set
*/
public void setTemperature(int temperature) {
this.temperature = temperature;
}
}
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class WeatherGroupComparator extends WritableComparator{
public WeatherGroupComparator(){
super(WeatherData.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
WeatherData data1 = (WeatherData)a;
WeatherData data2 = (WeatherData)b;
int yearCompare = Integer.compare(data1.getYear(), data2.getYear());
if(yearCompare == 0){
return Integer.compare(data1.getMonth(), data2.getMonth());
}
return yearCompare;
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/140821.html