MapReduce 是一种编程模型,"Map(映射)"和"Reduce(归约)",是它们的主要思想,我们通过 Map 函数来分布式处理输入数据,然后通过 Reduce 汇总结果并输出。我们编写一个 MapReduce 程序的一般步骤是:
- 编写 map 程序。
- 编写 reduce 程序。
- 编写程序驱动。
本章节的目标
本节中我们将使用 MapReduce 框架来编写一个简单的例子,这个例子是用来统计 HDFS 指定目录下的文件中每个字符出现的次数并将统计结果输出到 HDFS 的指定目录中。点击此处获取本章节源代码。
Map 程序
我们继承 Mapper 类并重写了其 map 方法。Map 阶段输入的数据是从 hdfs 中拿到的原数据,输入的 key 为某一行起始位置相对于文件起始位置的偏移量,value 为该行的文本。输出的内容同样也为键-值对,这个时候输出数据的键值对的类型可以自己指定,在本例中 key 是 Text 类型的,value 是 LongWritable 类型的。输出的结果将会被发送到 reduce 函数进一步处理。
- public class CharCountMapper extends Mapper< LongWritable, Text, Text, LongWritable> {
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- // 将这一行文本转为字符数组
- char[] chars = value.toString().toCharArray();
- for (char c : chars) {
- // 某个字符出现一次,便输出其出现 1 次。
- context.write(new Text(c + ""), new LongWritable(1));
- }
- }
- }
Reduce 程序
我们继承 Reducer 类并重写了其 reduce 方法。在本例中 Reduce 阶段的输入是 Map 阶段的输出,输出的结果可以作为最终的输出结果。相信你也注意到了,reduce 方法的第二个参数是一个 Iterable,MapReduce 会将 map 阶段中相同字符的输出汇总到一起作为 reduce 的输入。
- public class CharCountReducer extends Reducer< Text, LongWritable, Text, LongWritable> {
- @Override
- protected void reduce(Text key, Iterable< LongWritable> values, Context context)
- throws IOException, InterruptedException {
- long count = 0;
- for (LongWritable value : values) {
- count += value.get();
- }
- context.write(key, new LongWritable(count));
- }
- }
驱动程序
到目前为止,我们已经有了 map 程序和 reduce 程序,我们还需要一个驱动程序来运行整个作业。可以看到我们在这里初始化了一个 Job 对象。Job 对象指定整个 MapReduce 作业的执行规范。我们用它来控制整个作业的运作,在这里我们指定了 jar 包位置还有我们的 Map 程序、Reduce 程序、Map 程序的输出类型、整个作业的输出类型还有输入输出文件的地址。
- public class CharCountDriver {
- public static void main(String[] args) throws Exception {
- Configuration configuration = new Configuration();
- Job job = Job.getInstance(configuration);
- // Hadoop 会自动根据驱动程序的类路径来扫描该作业的 Jar 包。
- job.setJarByClass(cn.itweknow.mr.CharCountDriver.class);
- // 指定 mapper
- job.setMapperClass(CharCountMapper.class);
- // 指定 reducer
- job.setReducerClass(CharCountReducer.class);
- // map 程序的输出键-值对类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
- // 输出键-值对类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- // 输入文件的路径
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- // 输入文件路径
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- boolean res = job.waitForCompletion(true);
- System.exit(res?0:1);
- }
- }
执行 MapReduce 作业 (编辑:威海站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|