随着直播行业的崛起,大型互联网直播公司每日都会产生海量的直播数据,为了更好地服务主播与用户,提高直播质量与用户粘性,往往会对大量的数据进行分析与统计,从中挖掘商业价值,我们将通过一个实战案例,来使用Hadoop技术来实现对直播数据的统计与分析。
我这里是简化过后的数据格式,大致如下:
{"id":"1580089010000","uid":"12001002543","nickname":"jack2543","gold":561,"watchnumpv":1697,"follower":1509,"gifter":2920,"watchnumuv":5410,"length":3542,"exp":183}
{"id":"1580089010001","uid":"12001001853","nickname":"jack1853","gold":660,"watchnumpv":8160,"follower":1781,"gifter":551,"watchnumuv":4798,"length":189,"exp":89}
{"id":"1580089010002","uid":"12001003786","nickname":"jack3786","gold":14,"watchnumpv":577,"follower":1759,"gifter":2643,"watchnumuv":8910,"length":1203,"exp":54}
数据格式为json格式,大家可以自己写程序批量随机生成相关数据,当然,这里我也为大家提供模拟数据文件的下载,解压后的文件名为video_20200504.log:
[dltable file=”直播模拟数据” size=”206KB”]免费下载[/dltable]

程序员导航
优网导航旗下整合全网优质开发资源,一站式IT编程学习与工具大全网站
运营部门门需要针对主播每天的开播数据进行分析,统计出来每天受欢迎程度比较高的一些主播,进而对这些主播分发更多流量,挖掘最大价值,我们这里主要做两个具体需求:
1、对数据中的金币数量,总观看pv,粉丝关注数量,视频总开播时长等指标进行统计
2、统计每天开播时长最长的前10名主播及对应的开播时长
具体实现流程我们分为4个大步骤,分别如下:
[list]实现数据清洗
实现指标统计
实现TOP N统计
实现定时任务脚本[/list]
1、实现数据清洗
由于原始数据是通过日志方式进行记录的,在使用日志采集工具采集到HDFS之后,还需要对数据进行清洗过滤,丢弃缺失字段的数据,针对异常字段值进行标准化处理。
[v_blue]数据清洗作业:
需求:
1.从原始数据(JSON格式)中过滤出来需要的字段
主播id(uid)、金币数量(gold)、总观看PV(watchnumpv)、粉丝关注数量(follower)、视频开播总时长(length)
2.针对核心字段进行异常值判断
金币数量、总观看PV、粉丝关注数量、视频总开播时长
以上四个字段正常情况下都不应该是负值,也不应该缺失,如果这些字段值为负值,则认为是异常数据,直接丢弃,如果这些字段个别缺失,则认为该字段的值为0
分析:
1:由于原始数据是json格式的,所以可以使用fastison对原始数据进行解析,获取指定字段的内容
2:然后对获取到的数据进行判断,只保留满足条件的数据即可
3:由于不需要聚合过程, 只是一个简单的过滤操作,所以只需要map阶段即可,reduce阶段就不需要了
4:其中map阶段的k1, v1的数据类型是固定的:
k2, v2的数据类型为:
1)创建Maven项目,pom.xml除了之前的配置,还需要新增fastjson依赖
com.alibaba fastjson 1.2.68
2)创建清洗Mapper

AI 工具导航
优网导航旗下AI工具导航,精选全球千款优质 AI 工具集
public class DataCleanMapper extends Mapper{ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { //1.将v1转为json对象 JSONObject json = JSON.parseObject(v1.toString()); //2.提取v1中的核心字段-使用getIntValue而不使用getIntger,因为getIntValue获取不到会返回0 String uid = json.getString("uid");//主播id(uid) int gold = json.getIntValue("gold");//金币数量(gold) int watchnumpv = json.getIntValue("watchnumpv");//总观看PV(watchnumpv) int follower = json.getIntValue("follower");//粉丝关注数量(follower) int length = json.getIntValue("length");//视频开播总时长(length) //过滤异常数据 if(uid!= null && !"".equals(uid) && gold>=0 && watchnumpv>=0 && follower>=0 && length>=0) { //组装k2,v2 NullWritable k2 = NullWritable.get(); Text v2 = new Text(uid+"\t"+gold+"\t"+watchnumpv+"\t"+follower+"\t"+length); //将k2,v2写出去 context.write(k2, v2); } } }
3)创建清洗JOB
public class DataCleanJob {
public static void main(String[] args) {
try {
if(args.length!=2) {
System.exit(1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DataCleanJob.class);
job.setMapperClass(DataCleanMapper.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置reduce数量为0
job.setNumReduceTasks(0);
//设置Mapper输出类型
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
//提交job
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4)打包提交运行测试
注意上传的jar包是带有依赖的jar,因为我们要使用fastjson,video log日志文件上传到/data/videoinfo/日志日期目录递归创建/data/videoinfo/日志日期目录语句:
hdfs dfs -mkdir -p /data/videoinfo/20200504
运行
hadoop jar video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.clean.DataCleanJob /data/videoinfo/20200504/video_20200504.log /res0504
查看清洗后的总纪录数:
hdfs dfs -cat /res0504/* | wc -l
2、指标统计实现
对数据中的金币数量,总观看pv,粉丝关注数量,视频总开播时长等指标进行统计。
1)自定义bean,封装金币数量,总观看pv,粉丝关注数量,视频总开播时长等统计指标

免费在线工具导航
优网导航旗下整合全网优质免费、免注册的在线工具导航大全
public class VideoInfoWritable implements Writable {
private long gold;
private long watchnumpv;
private long follower;
private long length;
public long getGold() {
return gold;
}
public void setGold(long gold) {
this.gold = gold;
}
public long getWatchnumpv() {
return watchnumpv;
}
public void setWatchnumpv(long watchnumpv) {
this.watchnumpv = watchnumpv;
}
public long getFollower() {
return follower;
}
public void setFollower(long follower) {
this.follower = follower;
}
public long getLength() {
return length;
}
public void setLength(long length) {
this.length = length;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(gold);
out.writeLong(watchnumpv);
out.writeLong(follower);
out.writeLong(length);
}
@Override
public void readFields(DataInput in) throws IOException {
gold = in.readLong();
watchnumpv = in.readLong();
follower = in.readLong();
length = in.readLong();
}
@Override
public String toString() {
return gold+"\t"+watchnumpv+"\t"+follower+"\t"+length;
}
}
2)自定义Mapper
/** * 统计金币数量,总观看pv,粉丝关注数量,视频总开播时长等指标 * k1,v1来自清洗后的文件 */ public class VideoInfoMapper extends Mapper{ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String[] line = v1.toString().split("\t"); //获取指标 String uid = line[0]; long gold = Long.parseLong(line[1]); long watchnumpv = Long.parseLong(line[2]); long follower = Long.parseLong(line[3]); long length = Long.parseLong(line[4]); //组装k2,v2 Text k2 = new Text(uid); VideoInfoWritable v2 = new VideoInfoWritable(); v2.setGold(gold); v2.setWatchnumpv(watchnumpv); v2.setFollower(follower); v2.setLength(length); //将k2,v2写出去 context.write(k2, v2); } }
3)自定义Reducer
public class VideoInfoReducer extends Reducer{ @Override protected void reduce(Text k2, Iterable v2s,Context context) throws IOException, InterruptedException { //遍历v2s,对对应的指标求和 long goldsum = 0; long watchnumpvsum = 0; long followersum = 0; long lengthsum = 0; for(VideoInfoWritable v2:v2s) { goldsum += v2.getGold(); watchnumpvsum += v2.getWatchnumpv(); followersum += v2.getFollower(); lengthsum += v2.getLength(); } //组装k3,v3 Text k3 = k2; VideoInfoWritable v3 = new VideoInfoWritable(); v3.setGold(goldsum); v3.setWatchnumpv(watchnumpvsum); v3.setFollower(followersum); v3.setLength(lengthsum); //将k3,v3写出去 context.write(k3, v3); } }
4)自定义JOB
public class VideoInfoJob {
public static void main(String[] args) {
try {
if(args.length!=2) {
System.exit(0);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(VideoInfoJob.class);
job.setMapperClass(VideoInfoMapper.class);
job.setReducerClass(VideoInfoReducer.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置Mapper输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(VideoInfoWritable.class);
//设置Reducer输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(VideoInfoWritable.class);
//提交job
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、实现TOP N统计
统计每天开播时长最 长的前10名主播及对应的开播时长
[v_blue]分析:基于清洗后的数据进行统计TOP10。
1:为了统计每天开播最长的前10名主播信息,需要在map阶段获取数据中每个主播的id和直播时长
2:所以map阶段的<k2,v2> 为<Text, LongWritable>
3:在Reduce端对相同主播的直播时长进行累加求和,把这些数据存储到一 个临时的map集合中
4:在Reduce 端的cleanup函数中对map集合中的数据根据直播时长进行排序
注意:cleanup只执行一次,会在所有的reduce执行完,可以将最终数据进行筛选。
5:最后在cleanup函数中把直播时长最长的前10名主播的信息写出到hdfs.文件中[/v_blue]
[v_warn]注意:要求根据输入路径参数获取日期,将日期以yyyy-MM-dd的格式放到k3中一起写出来,方便导入到mysql数据库能看到数据对应的日期。[/v_warn]
1)自定义Mapper
public class VideoInfoTop10Mapper extends Mapper{ @Override protected void map(LongWritable k1, Text v1,Context context) throws IOException, InterruptedException { String[] line = v1.toString().split("\t"); String uid = line[0]; long length = Long.parseLong(line[4]); //组装k2,v2 Text k2 = new Text(uid); LongWritable v2 = new LongWritable(length); //将k2,v2写出去 context.write(k2, v2); } }
2)自定义Reducer
public class VideoInfoTop10Reducer extends Reducer{ //存储最终所有数据 private Map map = new HashMap (); @Override protected void reduce(Text k2, Iterable v2s,Context context) throws IOException, InterruptedException { long lengthsum = 0; for(LongWritable v2:v2s) { lengthsum += v2.get(); } map.put(k2.toString(), lengthsum); } //任务初始化的时候执行一次,仅执行一次,一般在里面做一些初始化资源链接的动作 @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } //任务结束的时候执行次,仅执行一次,做一些关闭资源的操作 @Override protected void cleanup(Context context) throws IOException, InterruptedException { //获取日期 Configuration conf = context.getConfiguration(); String dt = conf.get("dt"); //对map按照length降序排序 Map sortedMap = MapUtils.sortByValueDesc(map); int count = 1; Set > set = sortedMap.entrySet(); Iterator > it = set.iterator(); while(count<=10 && it.hasNext()) { Entry entry = it.next(); //封装k3,v3 Text k3 = new Text(dt+"\t"+entry.getKey()); LongWritable v3 = new LongWritable(entry.getValue()); //将k3,v3写出去 context.write(k3, v3); count++; } } }
3)MapUtils
package com.pzy.utils;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.checkerframework.checker.units.qual.K;
public class MapUtils {
//降序排序
public static > Map sortByValueDesc(Map map)
{
List> list = new LinkedList>(map.entrySet());
Collections.sort(list, new Comparator>()
{
@Override
public int compare(Map.Entry o1, Map.Entry o2)
{
int compare = (o1.getValue()).compareTo(o2.getValue());
return -compare;
}
});
Map result = new LinkedHashMap();
for (Map.Entry entry : list) {
result.put(entry.getKey(), entry.getValue());
}
return result;
}
//升序排序
public static > Map sortByValueAsc(Map map)
{
List> list = new LinkedList>(map.entrySet());
Collections.sort(list, new Comparator>()
{
@Override
public int compare(Map.Entry o1, Map.Entry o2)
{
int compare = (o1.getValue()).compareTo(o2.getValue());
return compare;
}
});
Map result = new LinkedHashMap();
for (Map.Entry entry : list) {
result.put(entry.getKey(), entry.getValue());
}
return result;
}
}
4)自定义job
public class VideoInfoTop10Job {
public static void main(String[] args) {
try {
if(args.length!=2) {
System.exit(1);
}
//从输入参数中获取日期并转为yyyy-MM-dd格式,方便导入mysql数据库
String[] fields = args[0].split("/");
String tempDt = fields[fields.length-1];
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd");
String dt = sdf2.format(sdf.parse(tempDt));
Configuration conf = new Configuration();
//为了方便Map和Reduce使用dt参数,可以将dt存入conf中
conf.set("dt", dt);
Job job = Job.getInstance(conf);
job.setJarByClass(VideoInfoTop10Job.class);
job.setMapperClass(VideoInfoTop10Mapper.class);
job.setReducerClass(VideoInfoTop10Reducer.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置Mapper输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//设置Reducer输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//提交job
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4、实现定时任务脚本
1)把任务提交命令进行封装,方便使用,便于定时任务调度
2)脚本开发
步骤:
a.在video项目下新建bin目录,里面新建名为video_handler.sh的的脚本,创建后eclipse提示商店有支持shell的编辑器,我们可以去安装


b.脚本
在eclipse中创建名为video_handler.sh的脚本文件,代码如下:
[v_warn]注意:if [ “X$1” = “X” ] 中5个空格一个不能少。 if(空格1)[(空格2)”X$1″(空格3)=(空格4)”X”(空格5)][/v_warn]
#!/bin/bash
# 获取环境变量
source /etc/profile
# 判断用户是否输入日期参数,如果输入则处理对应日期数据,否则默认处理昨天数据
if [ "X$1" = "X" ]
then
yes_time=`date +%Y%m%d --date="1 days ago"`
else
yes_time=$1
fi
# 定义clean job的输入输出路径
cleanjob_input=hdfs://master:9820/data/videoinfo/${yes_time}
cleanjob_output=hdfs://master:9820/data/videoinfo_clean/${yes_time}
# 定义统计指标任务1输入输出路径
videoinfojob_input=${cleanjob_output}
videoinfojob_output=hdfs://master:9820/res/videoinfojob/${yes_time}
# 定义统计指标任务2输入输出路径
videoinfotop10job_input=${cleanjob_output}
videoinfotop10job_output=hdfs://master:9820/res/videoinfotop10job/${yes_time}
# 定义jar包统一路径
jobs_home=/usr/hadoop/jobs
# 删除输出目录,为了兼容脚本重跑情况
hdfs dfs -rm -r ${cleanjob_output}
hdfs dfs -rm -r ${videoinfojob_output}
hdfs dfs -rm -r ${videoinfotop10job_output}
# 执行数据清洗任务 \表示换行,该指令还未结束
hadoop jar \
${jobs_home}/video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.clean.DataCleanJob \
${cleanjob_input} ${cleanjob_output}
# 判断数据清洗任务是否执行成功-查看输出目录的_SUCCESS文件
hdfs dfs -ls ${cleanjob_output}/_SUCCESS
# $?指令的返回值如果是0表示上一条指令执行成功,非0则失败
if [ "$?" = "0" ]
then
echo "cleanjob execute success....."
# 执行统计指标任务1
hadoop jar \
${jobs_home}/video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.videoinfo.VideoInfoJob \
${videoinfojob_input} ${videoinfojob_output}
#执行统计指标任务2
hadoop jar \
${jobs_home}/video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.top10.VideoInfoTop10Job \
${videoinfotop10job_input} ${videoinfotop10job_output}
else
echo "cleanjob execute failed.....date is ${yes_time}"
fi
c.将jar传到jobs目录
将脚本新建到jobs目录下:
指令:
sudo vi video_handler.sh
[v_warn]注意:
1)脚本复制内容到linux系统创建(如果上传运行会报错,因为windows下换行符和linux下不一样)。
2)粘贴式要注意可能会有部分代码丢失,你需要去查看,如果丢失就补齐。
3)给 video_handler.sh赋予权限,否则hadoop用户运行时会无权限执行[/v_warn]
赋予权限指令:
sudo chmod +x video_handler.sh
d.手工执行脚本
-x 代表调试模式
sh -x video_handler.sh 参数(可选)
e.在hadoop目录下新建joblogs目录,指令:
mkdir joblogs
[v_warn]注意:不能用sudo去创建,否则hadoop执行脚本时会提示无操作权限[/v_warn]
f.定时执行脚本,时间每天0点30分,执行日志重定向到/usr/hadoop/joblogs/video_handler.log下
需要把下面这行代码:
30 00 * * * /usr/hadoop/jobs/video_handler.sh >> /usr/hadoop/joblogs/video_handler.log
添加到当前用户的crontab中(添加到/etc/crontab中也可以,不过需要在shell脚本路径前指定执行用户为hadoop),指令:
crontab -e
ps:定时任务执行脚本出现问题,可以去应的mail下去查看:
sudo cat /var/spool/mail/hadoop





