分享两篇文章,结合看更清楚一点。
背景
假设有一个学生各门课的成绩的表单,应用hive取出每科成绩前100名的学生成绩。
这个就是典型在分组取Top N的需求。
解决思路
对于取出每科成绩前100名的学生成绩,针对学生成绩表,根据学科,成绩做order by排序,然后对排序后的成绩,执行自定义函数row_number(),必须带一个或者多个列参数,如ROW_NUMBER(col1, ....),它的作用是按指定的列进行分组生成行序列。在ROW_NUMBER(a,b) 时,若两条记录的a,b列相同,则行序列+1,否则重新计数。
只要返回row_number()返回值小于100的的成绩记录,就可以返回每个单科成绩前一百的学生。
解决过程
成绩表结构
create table score_table ( subject string, student string, score int)partitioned by (date string)
如果要查询2012年每科成绩前100的学生成绩,sql如下
create temporary function row_number as 'com.blue.hive.udf.RowNumber';select subject,score,student from (select subject,score,student from score where dt='2012' order by subject,socre desc) order_scorewhere row_number(subject) <= 100;
com.blue.hive.udf.RowNumber是自定义函数,函数的作用是按指定的列进行分组生成行序列。这里根据每个科目的所有成绩,生成序列,序列值从1开始自增。
假设成绩表的记录如下:
物理 80 张三数学 100 李一物理 90 张二数学 90 李二物理 100 张一数学 80 李三 .....
经过order by全局排序后,记录如下
物理 100 张一物理 90 张二物理 80 张三 .....数学 100 李一数学 90 李二数学 80 李三 ....
接着执行row_number函数,返回值如下
科目 成绩 学生 row_number物理 100 张一 1物理 90 张二 2物理 80 张三 3.....数学 100 李一 1数学 90 李二 2数学 80 李三 3....
因为hive是基于MAPREADUCE的,必须保证row_number执行是在reducer中执行。上述的语句保证了成绩表的记录,按照科目和成绩做了全局排序,然后在reducer端执行row_number函数,如果在map端执行了row_number,那么结果将是错误的。
要查看row_number函数在map端还是reducer端执行,可以查看hive的执行计划:
create temporary function row_number as 'com.blue.hive.udf.RowNumber';explain select subject,score,student from (select subject,score,student from score where dt='2012' order by subject,socre desc) order_scorewhere row_number(subject) <= 100;
explain不会执行mapreduce计算,只会显示执行计划。
只要row_number函数在reducer端执行,除了使用order by全局排序配合,也可以使用distribute by + sort by。distribute by可以让相同科目的成绩记录发送到同一个reducer,而sort by可以在reducer端对记录做排序。
而使用order by全局排序,只有一个reducer,未能充分利用资源,相比之下,distribute by + sort by在这里更有性能优势,可以在多个reducer做排序,再做row_number的计算。
sql如下:
create temporary function row_number as 'com.blue.hive.udf.RowNumber';select subject,score,student from (select subject,score,student from score where dt='2012' distribute by subject sort by subject asc, socre desc) order_scorewhere row_number(subject) <= 100;
如果成绩有学院字段college,要找出学院里,单科成绩前一百的学生,解决方法如下:
create temporary function row_number as 'com.blue.hive.udf.RowNumber';explain select college,subject,score,student from (select college,subject,score,student from score where dt='2012' order by college asc,subject asc,socre desc) order_scorewhere row_number(college,subject) <= 100;
如果成绩有学院字段college,要找出学院里,总成绩前一百的学生,解决方法如下:
create temporary function row_number as 'com.blue.hive.udf.RowNumber';explain select college,totalscore,student from (select college,student,sum(score) as totalscore from score where dt='2012' group by college,student order by college asc,totalscore desc) order_scorewhere row_number(college) <= 100;
row_number的源码
函数row_number(),必须带一个或者多个列参数,如ROW_NUMBER(col1, ....),它的作用是按指定的列进行分组生成行序列。在ROW_NUMBER(a,b) 时,若两条记录的a,b列相同,则行序列+1,否则重新计数。
package com.blue.hive.udf;import org.apache.hadoop.hive.ql.exec.UDF;public class RowNumber extends UDF { private static int MAX_VALUE = 50; private static String comparedColumn[] = new String[MAX_VALUE]; private static int rowNum = 1; public int evaluate(Object... args) { String columnValue[] = new String[args.length]; for (int i = 0; i < args.length; i++) 『 columnValue[i] = args[i].toString(); } if (rowNum == 1) { for (int i = 0; i < columnValue.length; i++) comparedColumn[i] = columnValue[i]; } for (int i = 0; i < columnValue.length; i++) { if (!comparedColumn[i].equals(columnValue[i])) { for (int j = 0; j < columnValue.length; j++) { comparedColumn[j] = columnValue[j]; } rowNum = 1; return rowNum++; } } return rowNum++; }}
编译后,打包成一个jar包,如/usr/local/hive/udf/blueudf.jar
然后在hive shell下使用,如下:
add jar /usr/local/hive/udf/blueudf.jar;create temporary function row_number as 'com.blue.hive.udf.RowNumber';select subject,score,student from (select subject,score,student from score where dt='2012' order by subject,socre desc) order_scorewhere row_number(subject) <= 100;
hive 0.12之前可用,0.12之后不可用,只能用窗口函数替代。
参考
-----------------------------------------分割线-----------------------------------------------------
问题:
有如下数据文件 city.txt (id, city, value)
cat city.txt
1 wh 5002 bj 6003 wh 1004 sh 4005 wh 2006 bj 1007 sh 2008 bj 3009 sh 900需要按 city 分组聚合,然后从每组数据中取出前两条value最大的记录。1、这是实际业务中经常会遇到的 group TopK 问题,下面来看看 pig 如何解决:
1 2 3 4 5 | a = load '/data/city.txt' using PigStorage( ' ' ) as (id:chararray, city:chararray, value: int ); b = group a by city; c = foreach b {c1= order a by value desc ; c2=limit c1 2; generate group ,c2.value;}; d = stream c through `sed 's/[(){}]//g' `; dump d; |
结果:
1 2 3 | (bj,600,300) (sh,900,400) (wh,500,200) |
这几行代码其实也实现了mysql中的 group_concat 函数的功能:
1 2 3 4 5 | a = load '/data/city.txt' using PigStorage( ' ' ) as (id:chararray, city:chararray, value: int ); b = group a by city; c = foreach b {c1= order a by value desc ; generate group ,c1.value;}; d = stream c through `sed 's/[(){}]//g' `; dump d; |
结果:
1 2 3 | (bj,600,300,100) (sh,900,400,200) (wh,500,200,100) |
2、下面我们再来看看hive如何处理group topk的问题:
本质上HSQL和sql有很多相同的地方,但HSQL目前功能还有很多缺失,至少不如原生态的SQL功能强大,
比起PIG也有些差距,如果SQL中这类分组topk的问题如何解决呢?
1 2 3 | select * from city a where 2>( select count (1) from city where cname=a.cname and value>a.value) distribute by a.cname sort by a.cname,a.value desc ; |
但是这种写法在HQL中直接报语法错误了,下面我们只能用hive udf的思路来解决了:
排序city和value,然后对city计数,最后where过滤掉city列计数器大于k的行即可。
好了,上代码:
(1)定义UDF:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | package com.example.hive.udf; import org.apache.hadoop.hive.ql. exec .UDF; public final class Rank extends UDF{ private int counter; private String last_key; public int evaluate(final String key ){ if ( ! key .equalsIgnoreCase(this.last_key) ) { this.counter = 0; this.last_key = key ; } return this.counter++; } } |
(2)注册jar、建表、导数据,查询:
1 2 3 4 5 6 7 8 9 | add jar Rank.jar; create temporary function rank as 'com.example.hive.udf.Rank' ; create table city(id int ,cname string,value int ) row format delimited fields terminated by ' ' ; LOAD DATA LOCAL INPATH 'city.txt' OVERWRITE INTO TABLE city; select cname, value from ( select cname,rank(cname) csum,value from ( select id, cname, value from city distribute by cname sort by cname,value desc )a )b where csum < 2; |
(3)结果:
1 2 3 4 5 6 | bj 600 bj 300 sh 900 sh 400 wh 500 wh 200 |
REF:hive中分组取前N个值的实现
3、最后我们来看一下原生态的MR:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | import java.io.IOException; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class GroupTopK { // 这个 MR 将会取得每组年龄中 id 最大的前 3 个 // 测试数据由脚本生成:http://my.oschina.net/leejun2005/blog/76631 public static class GroupTopKMapper extends Mapper<LongWritable, Text, IntWritable, LongWritable> { IntWritable outKey = new IntWritable(); LongWritable outValue = new LongWritable(); String[] valArr = null ; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { valArr = value.toString().split( "\t" ); outKey.set(Integer.parseInt(valArr[ 2 ])); // age int outValue.set(Long.parseLong(valArr[ 0 ])); // id long context.write(outKey, outValue); } } public static class GroupTopKReducer extends Reducer<IntWritable, LongWritable, IntWritable, LongWritable> { LongWritable outValue = new LongWritable(); public void reduce(IntWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { TreeSet<Long> idTreeSet = new TreeSet<Long>(); for (LongWritable val : values) { idTreeSet.add(val.get()); if (idTreeSet.size() > 3 ) { idTreeSet.remove(idTreeSet.first()); } } for (Long id : idTreeSet) { outValue.set(id); context.write(key, outValue); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); System.out.println(otherArgs.length); System.out.println(otherArgs[ 0 ]); System.out.println(otherArgs[ 1 ]); if (otherArgs.length != 3 ) { System.err.println( "Usage: GroupTopK <in> <out>" ); System.exit( 2 ); } Job job = new Job(conf, "GroupTopK" ); job.setJarByClass(GroupTopK. class ); job.setMapperClass(GroupTopKMapper. class ); job.setReducerClass(GroupTopKReducer. class ); job.setNumReduceTasks( 1 ); job.setOutputKeyClass(IntWritable. class ); job.setOutputValueClass(LongWritable. class ); FileInputFormat.addInputPath(job, new Path(otherArgs[ 1 ])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[ 2 ])); System.exit(job.waitForCompletion( true ) ? 0 : 1 ); } } |
hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1
结果:
hadoop fs -cat /tmp/1/part-r-00000
0 128696950 128699710 128699761 128698131 128698701 12869951......
数据验证:
awk '$3==0{print $1}' record_new.txt|sort -nr|head -3
128699761286997112869695可以看到结果没有问题。
注:测试数据由以下脚本生成:
PS:
如果说hive类似sql的话,那pig就类似plsql存储过程了:程序编写更自由,逻辑能处理的更强大了。
pig中还能直接通过反射调用java的静态类中的方法,这块内容请参考之前的相关pig博文。
附几个HIVE UDAF链接,有兴趣的同学自己看下:
Hive UDAF和UDTF实现group by后获取top值
hive中自定义函数(UDAF)实现多行字符串拼接为一行 编写Hive UDAF Hive UDAF开发