之前在MapReduce原理里面提到的例子,今天用代码将其实现,并且剖行其过程。按照之前对那个统计的例子分析,我们将代码主要分为三块
map的过程;reduce的过程;统管调度的过程。
因此,我们可以分别创建3个类WCMapper,WCReduce和WCJob.
首先,在Map的类里面,我们首先继承hadoop的父类mapper
public class WCMapper extends Mapper<LongWritable,Text,Text,LongWritable>
{
…
}
可以看到,该Mapper类定义的是4个泛型,定义成泛型的原因,是为了保证从文本中读取数据时候两边的数据类型一致,不会存在由于数据转换而抛出的错误。前面的两个参数是拿数据的数据类型,而后面两个是输出的数据类型(输出给Reduce保持一致)。另外,map和reduce的输入和输出都是以key-value的形式封装的。
而其中的参数LongWritable和Text,其实是java里面Long和String的原型,但是JDK里面Long和String这种数据对象在网络中传输需要序列化,实现过程中有冗余,因此,为了提升性能,采用LongWritable和Text这两个hadoop的封装类型使传递数据的效率更高,更精简。
接下来,我们可以重写map方法对我们的数据先进行切分并且遍历结果,然后将切分后的单词发送出去。
然后,再reduce里面,我们拿到map的数据后,将所有的key-value都缓存起来,进行分组。
最后,在编写一个job类来通知集群调用哪些map和reduce,还有数据的输入和输出路径,从而进行统一的作业调度管理。