讲程序云计算老师怼我的英语发音,尴尬。
这是我做的关于MapReduce的课件,花了精力。感觉还可以。点击MapReduce介绍PPT
– – – – – – – – – – – – – – -分割线- – – – – – – – – – – – – – – – – –
题目描述:
二、实验代码:
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
public class ClassScore {
private static String SPACE = "\t";
public static class Map extends
Mapper<LongWritable, Text, Text, Text> {
// 实现map函数
private Text word1=new Text("score");
private Text word2=new Text("distribution");
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizerArticle = new StringTokenizer(line); //
while (tokenizerArticle.hasMoreElements()) {
String strName = tokenizerArticle.nextToken();// 成绩部分
String strScore = tokenizerArticle.nextToken();
String namescore=strName+SPACE+strScore;
context.write(word1,new Text(namescore));
context.write(word2,new Text(namescore));
}
}
}
public static class GenderPartitioner extends Partitioner<Text, Text> {
public int getPartition(Text key, Text value, int numReduceTasks) {
String[] namescore = value.toString().split(SPACE);
int score = Integer.parseInt(namescore[1]);
String str = key.toString();
// 默认指定分区 0
// if (numReduceTasks == 0)
// return 0;
if("distribution".equals(str))
{
if ((score>= 90)&&(score <=100)) {
return 1 % numReduceTasks ;
}else if((score >=80)&&(score< 90)) {
return 2 % numReduceTasks;
}else if((score >=70)&&(score< 80)){
return 3 % numReduceTasks;
}else if((score >= 60)&&(score<70)){
return 4 % numReduceTasks;
}else{
return 5 % numReduceTasks;
}
}
else
{
return 0;
}
}
}
public static class Reduce extends
Reducer<Text, Text, Text, IntWritable> {
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
String ss=key.toString();
if("score".equals(ss))
{
int sum = 0;
int count = 0;
int min = 150 ;
int max = 0 ;
//int i=0;
int score = 0 ;
String name1 = " ";
String name2 = " ";
List<String> cache =new ArrayList<String>();
for (Text val : values) {
cache.add(val.toString());
String[] valTokens = val.toString().split(SPACE);
score = Integer.parseInt(valTokens[1]);
if (score > max) {
//name1 = valTokens[0];
max = score;
}
if (score < min)
{
min =score;
}
sum+=score;
count++;
}
int average = (int) sum / count;// 计算平均成绩
if(sum%count>=(count/2))
average+=1;
context.write(new Text("The average is"), new IntWritable(average));
context.write(new Text("The min score is"), new IntWritable(min));
for (String val : cache) {
String[] valTokens = val.split(SPACE);
score = Integer.parseInt(valTokens[1]);
if(score==min)
{
name2 = valTokens[0];
context.write(new Text(name2),new IntWritable(min));
}
}
context.write(new Text("The max score is"), new IntWritable(max));
for (String val : cache) {
String[] valTokens = val.split(SPACE);
score = Integer.parseInt(valTokens[1]);
if(score==max)
{
name1 = valTokens[0];
context.write(new Text(name1),new IntWritable(max));
}
}
}
else
{
String nname =" " ;
int score =0 ;
for (Text val : values) {
String[] valTokens = val.toString().split(SPACE);
nname = valTokens[0];
score = Integer.parseInt(valTokens[1]);
context.write(new Text(nname), new IntWritable(score));
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) { // 判断路径参数是否为2个
System.err.println("Usage: Data Deduplication <in> <out>");
System.exit(2);
}
// set maprduce job name
Job job = new Job(conf, "ClassScore");
job.setJarByClass(ClassScore.class);
// 设置Map、Combine和Reduce处理类
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setPartitionerClass(GenderPartitioner.class);
job.setNumReduceTasks(6);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
三、代码详解:
这里只叙述核心代码:
(1)Mapper函数读取文件中的数据,映射成两组键值对。两组键值对有不同的key,相同的value—-“姓名+’\t’+课程成绩”。
(2)Partitioner函数,分区函数根据键值对不同的key分为两种操作,其中一种用于求成绩的平均值、最大值、最小值,用一个Reduce任务处理。另一种操作用于输出不同成绩段的学生姓名和成绩,根据成绩的不同分区,用5个Reduce任务去处理。
(3)Reduce任务通过匹配不同的key做不同的操作。对于key为“score”的作求平均值、最大值、最小值的操作,对key为“distribution”的键值对,作输出各个成绩段学生姓名和成绩的操作。
详细代码解析:http://pan.baidu.com/s/1o7XNXV0
四、实验结果截图:
实验输出及结果:
实验输入 input:
class1.txt
class2.txt
class3.txt
实验结果输出output:
90~100分分段:
此外还有80~90分段、70~80分段等,不再赘述。
控制台输出:
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/10422.html