MapReduce Java API实例-统计平均成绩

生活中,最使人疲惫的往往不是道路的遥远,而是心中的郁闷;最使人痛苦的往往不是生活的不幸,而是希望的破灭;最使人颓废的往往不是前途的坎坷,而是自信的丧失;最使人绝望的往往不是挫折的打击,而是心灵的死亡。所以我们要有自己的梦想,让梦想的星光指引着我们走出落漠,走出惆怅,带着我们走进自己的理想。

导读:本篇文章讲解 MapReduce Java API实例-统计平均成绩,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

场景

MapReduce Java API实例-统计单词出现频率:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119410169

在上面对单个txt文件进行统计的基础上,Mapreduce也是支持文件夹下多个文件处理的。

统计学生各科平均成绩,每科成绩为一个文件。

在Map阶段和上面统计单次频率差不多,然后在Reduce阶段求出总和后,除以科目数,

并将输出value的数据类型设置为FloatWritable即可。

新建三个数据集,chinese.txt、math.txt、english.txt

分别代表三科成绩,每科成绩的格式如下

MapReduce Java API实例-统计平均成绩

 

每科成绩左边是姓名,右边是成绩,并且姓名和成绩之间是用空格分开。

注意这里是一个空格,因为下面处理的规则就是按照中间一个空格来处理的。

这点要尤为注意,并且如果这个文件是在Windows上新建并添加的空格,一定要注意排查上传到Centos以及HDFS集群中是否格式变化。

注:

博客:
https://blog.csdn.net/badao_liumang_qizhi
关注公众号
霸道的程序猿
获取编程相关电子书、教程推送与免费下载。

实现

1、Map实现代码

package com.badao.averagegrade;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class AverageGradeMapper extends Mapper<Object,Text,Text,IntWritable> {
    //1、编写map函数,通过继承Mapper类实现里面的map函数
    //   Mapper类当中的第一个函数是Object,也可以写成Long
    //   第一个参数对应的值是行偏移量

    //2、第二个参数类型通常是Text类型,Text是Hadoop实现的String 类型的可写类型
    //   第二个参数对应的值是每行字符串

    //3、第三个参数表示的是输出key的数据类型

    //4、第四个参数表示的是输出value的数据类型,IntWriable 是Hadoop实现的int类型的可写数据类型

    public final static IntWritable one = new IntWritable(1);
    public Text word = new Text();

    //key 是行偏移量
    //value是每行字符串
    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String[] str = value.toString().split(" ");
        context.write(new Text(str[0]),new IntWritable(Integer.parseInt(str[1])));
    }
}

2、Reduce代码

package com.badao.averagegrade;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


//第一个参数类型是输入值key的数据类型,map中间输出key的数据类型
//第二个参数类型是输入值value的数据类型,map中间输出value的数据类型
//第三个参数类型是输出值key的数据类型,他的数据类型要跟job.setOutputKeyClass(Text.class) 保持一致
//第四个参数类型是输出值value的数据类型,它的数据类型要跟job.setOutputValueClass(IntWriable.class) 保持一致

public class AverageGradeReducer extends Reducer<Text, IntWritable,Text, FloatWritable> {

    public FloatWritable result = new FloatWritable();


    //key就是单词  values是单词出现频率列表
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable val:values)
        {
            //get就是取出IntWriable的值
            sum += val.get();
        }
        //3表示科目数
        result.set((float)sum/3);
        context.write(key,result);
    }
}

3、Job代码

package com.badao.averagegrade;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


import java.io.IOException;

public class AverageGradeJob {
    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        wordCountLocal();
    }

    public static void wordCountLocal()throws IOException, ClassNotFoundException, InterruptedException
    {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://192.168.148.128:9000");
        //conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        System.setProperty("HADOOP_USER_NAME","root");
        //实例化一个作业,word count是作业的名字
        Job job = Job.getInstance(conf, "averagegrade");
        //指定通过哪个类找到对应的jar包
        job.setJarByClass(AverageGradeJob.class);
        //为job设置Mapper类
        job.setMapperClass(AverageGradeMapper.class);
        //为job设置reduce类
        job.setReducerClass(AverageGradeReducer.class);
        job.setMapOutputValueClass(IntWritable.class);
        //为job的输出数据设置key类
        job.setOutputKeyClass(Text.class);
        //为job输出设置value类
        job.setOutputValueClass(FloatWritable.class);
        //为job设置输入路径,输入路径是存在的文件夹/文件
        FileInputFormat.addInputPath(job,new Path("/grade"));
        //为job设置输出路径
        FileOutputFormat.setOutputPath(job,new Path("/averageGrade3"));
        job.waitForCompletion(true);
    }

}

然后将上面的三个成绩的txt上传到集群HDFS中,运行job

MapReduce Java API实例-统计平均成绩

 

可以在集群HDFS中看到生成统计好的文件,查看paat-r-00000的内容

MapReduce Java API实例-统计平均成绩

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/136228.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!