快乐学习 一个网站喵查铺子(catpuzi.com)全搞定~

MapReduce实现网站用户流量统计 自定义分组 MapReduce实现用户浏览网站统计

大数据 数据帝 2020-02-27 扫描二维码
文章目录[隐藏]

MapReduce实现网站用户流量统计

网站的文件都会统计注册用户的流量信息,下面实现一个统计用户的上行流量,下行流量和总共使用流量。基本的思路还是通过map(获取日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量,然后封装成kv发送出去),reduce(遍历values,然后进行累加求和再输出)和run类来处理,但是map中传给reduce的value有上,下行流量两个值,因此,一次携带多个数据,可以采取新建一个JavaBean(类)实现。

public class Flow Sum Mapper extendsMapper<LongWritable,Text,Text,FlowBean>

这里需要提到一个序列化的概念,在内存里面很多数据块,如果经过网络传输,需要采用流的形式。当传输完成后,可以通过反序列化的方式,将之前的对象进行还原。

FlowBean我们实现的这个类,就是为了实现序列化和反序列化的方法,同时,里面也是为了实现上行流量和下行流量的封装。在反序列的实现中,使用了反射的机制,通过空参的构造函数完成反序列化。
在map,reduce和run也实现后,我们将该包打包成jar文件上传到hadoop服务器上执行,就得到如下的结果,用户的号码,上行总流量,下行总流量和总的用户流量。

MapReduce实现网站用户流量统计 自定义分组  MapReduce实现用户浏览网站统计

 

MapReduce自定义分组

 

 

当分析数据量很大的时候,有时候我们希望通过某些特点来查询用户或者号码,比如:广东省的所有用户号码可以分组在一起为一个文件,湖北省的所有用户可以分组在一起输出为一个文件,这样在后续分析或统计数据的时候可以方便的查找。之前实现的用户流量统计是逐条读取用户数据,然后全部输出到一个文件中。其实,反编译mapreduce的源码(如下所示),里面有个类HashPartitioner。

MapReduce实现网站用户流量统计 自定义分组  MapReduce实现用户浏览网站统计

默认里面的numReduceTask任务为1的时候,传进来的K-V值,返回的组均为1,因此也就会把所有的结果生成到一个文件。所以,我们可以改写这个源码中的类,实现分组。
文件的分割与读取还是与之前的mapreduce的基本逻辑还是一致,只不过需要添加一个分组的类。将需要分组的类,先缓存在内存中HashMap中,这样从数据库中读取表的数据结果全部缓存在了HashMap里面。

private static HashMap<String,Integer> areaMap = new HashMap<>();
static{
loadTableToAreaMap(areamap);
};
最后,我们将代码打成jar包上传至服务器运行。其结果如下,所示,可以看到,输出的文件结果不再是1个,变成我们改造后的6个组。

MapReduce实现网站用户流量统计 自定义分组  MapReduce实现用户浏览网站统计
MapReduce实现网站用户流量统计 自定义分组  MapReduce实现用户浏览网站统计

MapReduce实现用户浏览网站统计

在各大网站里面,很多情况下可以通过捕获用户浏览的网页和内容,从而判断用户对哪些网页感兴趣,达到分析用户行为的目的。下面通过MapReduce实现一个日志文件的分析,可以看出用户访问哪个网站最多,也就是流量最多。
首先,我们还是在新建3个类map,reduce和run.Map中还是继承Mapper类,通过读取文件的url和网站的流量。在Reduce里面,为了对传入的map结果进行排序,使用到了treemap这个类,与hashmap不同的是,treemap是可以对缓存的结果进行排序的,并且treemap里面是根据key来排序,是不能根据value来排序的。因此,我们在传递的K-V对的时候,K应该是流量汇总的结果,V是URL站点。

private TreeMap<FlowBean,Text> treeMap = new TreeMap<>();

为了能判断reduce方法已经全部工作完成,也就是说treemap里面已经有了全部的集合数据,重写了cleanup的这个方法。

protected void cleanup(Context context)
throws IOException, InterruptedException {
Set<Entry<FlowBean,Text>> entrySet = treeMap.entrySet();
double tempCount = 0 ;
for(Entry<FlowBean,Text> ent : entrySet)
if(tempCount / globalCount < 0.8){
context.write(ent.getValue(), new LongWritable(ent.getKey().getS_flow()));
tempCount += ent.getKey().getS_flow();

喜欢 (1)
关于作者: