博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hive中分组取前N个值
阅读量:4700 次
发布时间:2019-06-09

本文共 9937 字,大约阅读时间需要 33 分钟。

分享两篇文章,结合看更清楚一点。

背景

假设有一个学生各门课的成绩的表单,应用hive取出每科成绩前100名的学生成绩。

这个就是典型在分组取Top N的需求。

 

解决思路

对于取出每科成绩前100名的学生成绩,针对学生成绩表,根据学科,成绩做order by排序,然后对排序后的成绩,执行自定义函数row_number(),必须带一个或者多个列参数,如ROW_NUMBER(col1, ....),它的作用是按指定的列进行分组生成行序列。在ROW_NUMBER(a,b) 时,若两条记录的a,b列相同,则行序列+1,否则重新计数。

只要返回row_number()返回值小于100的的成绩记录,就可以返回每个单科成绩前一百的学生。

 

解决过程

成绩表结构

create table score_table (  subject        string,  student       string,  score           int)partitioned by (date string)

 

如果要查询2012年每科成绩前100的学生成绩,sql如下

create temporary function row_number as 'com.blue.hive.udf.RowNumber';select subject,score,student from    (select subject,score,student from score where dt='2012'  order by subject,socre desc) order_scorewhere row_number(subject) <= 100;

com.blue.hive.udf.RowNumber是自定义函数,函数的作用是按指定的列进行分组生成行序列。这里根据每个科目的所有成绩,生成序列,序列值从1开始自增。

假设成绩表的记录如下:

物理  80 张三数学  100 李一物理  90  张二数学  90  李二物理  100 张一数学  80  李三 .....

经过order by全局排序后,记录如下

物理  100 张一物理  90  张二物理  80 张三 .....数学  100 李一数学  90  李二数学  80  李三 ....

接着执行row_number函数,返回值如下

科目  成绩 学生   row_number物理  100 张一      1物理  90  张二      2物理  80  张三      3.....数学  100 李一      1数学  90  李二      2数学  80  李三      3....

因为hive是基于MAPREADUCE的,必须保证row_number执行是在reducer中执行。上述的语句保证了成绩表的记录,按照科目和成绩做了全局排序,然后在reducer端执行row_number函数,如果在map端执行了row_number,那么结果将是错误的。

要查看row_number函数在map端还是reducer端执行,可以查看hive的执行计划:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';explain select subject,score,student from    (select subject,score,student from score where dt='2012'  order by subject,socre desc) order_scorewhere row_number(subject) <= 100;

explain不会执行mapreduce计算,只会显示执行计划。

 

只要row_number函数在reducer端执行,除了使用order by全局排序配合,也可以使用distribute by + sort by。distribute by可以让相同科目的成绩记录发送到同一个reducer,而sort by可以在reducer端对记录做排序。

而使用order by全局排序,只有一个reducer,未能充分利用资源,相比之下,distribute by + sort by在这里更有性能优势,可以在多个reducer做排序,再做row_number的计算。

sql如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';select subject,score,student from    (select subject,score,student from score where dt='2012'  distribute by subject sort by subject asc, socre desc) order_scorewhere row_number(subject) <= 100;

 

如果成绩有学院字段college,要找出学院里,单科成绩前一百的学生,解决方法如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';explain select college,subject,score,student from    (select college,subject,score,student from score where dt='2012'  order by college asc,subject asc,socre desc) order_scorewhere row_number(college,subject) <= 100;

 

如果成绩有学院字段college,要找出学院里,总成绩前一百的学生,解决方法如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';explain select college,totalscore,student from    (select college,student,sum(score) as totalscore from score where dt='2012'  group by college,student  order by college asc,totalscore desc) order_scorewhere row_number(college) <= 100;

 

row_number的源码

函数row_number(),必须带一个或者多个列参数,如ROW_NUMBER(col1, ....),它的作用是按指定的列进行分组生成行序列。在ROW_NUMBER(a,b) 时,若两条记录的a,b列相同,则行序列+1,否则重新计数。

package com.blue.hive.udf;import org.apache.hadoop.hive.ql.exec.UDF;public class RowNumber extends UDF {    private static int MAX_VALUE = 50;    private static String comparedColumn[] = new String[MAX_VALUE];    private static int rowNum = 1;    public int evaluate(Object... args) {        String columnValue[] = new String[args.length];        for (int i = 0; i < args.length; i++) 『            columnValue[i] = args[i].toString();        }        if (rowNum == 1) {            for (int i = 0; i < columnValue.length; i++)                comparedColumn[i] = columnValue[i];        }        for (int i = 0; i < columnValue.length; i++) {            if (!comparedColumn[i].equals(columnValue[i])) {                for (int j = 0; j < columnValue.length; j++) {                    comparedColumn[j] = columnValue[j];                }                rowNum = 1;                return rowNum++;            }        }        return rowNum++;    }}

编译后,打包成一个jar包,如/usr/local/hive/udf/blueudf.jar

然后在hive shell下使用,如下:

add jar /usr/local/hive/udf/blueudf.jar;create temporary function row_number as 'com.blue.hive.udf.RowNumber';select subject,score,student from    (select subject,score,student from score where dt='2012'  order by subject,socre desc) order_scorewhere row_number(subject) <= 100;

 

hive 0.12之前可用,0.12之后不可用,只能用窗口函数替代。

参考  

-----------------------------------------分割线-----------------------------------------------------

问题:

有如下数据文件 city.txt (id, city, value)

cat city.txt 

1 wh 500
2 bj 600
3 wh 100
4 sh 400
5 wh 200
6 bj 100
7 sh 200
8 bj 300
9 sh 900
需要按 city 分组聚合,然后从每组数据中取出前两条value最大的记录。

1、这是实际业务中经常会遇到的 group TopK 问题,下面来看看 pig 如何解决:

1
2
3
4
5
a =
load
'/data/city.txt' 
using PigStorage(
' '
)
as
(id:chararray, city:chararray, value:
int
);
b =
group
a
by
city;
c = foreach b {c1=
order
a
by
value
desc
; c2=limit c1 2; generate
group
,c2.value;};
d = stream c through `sed
's/[(){}]//g'
`;
dump d;

结果:

1
2
3
(bj,600,300)
(sh,900,400)
(wh,500,200)

这几行代码其实也实现了mysql中的 group_concat 函数的功能:

1
2
3
4
5
a =
load
'/data/city.txt' 
using PigStorage(
' '
)
as
(id:chararray, city:chararray, value:
int
);
b =
group
a
by
city;
c = foreach b {c1=
order
a
by
value
desc
;  generate
group
,c1.value;};
d = stream c through `sed
's/[(){}]//g'
`;
dump d;

结果:

1
2
3
(bj,600,300,100)
(sh,900,400,200)
(wh,500,200,100)

2、下面我们再来看看hive如何处理group topk的问题:

本质上HSQL和sql有很多相同的地方,但HSQL目前功能还有很多缺失,至少不如原生态的SQL功能强大,

比起PIG也有些差距,如果SQL中这类分组topk的问题如何解决呢?

1
2
3
select
*
from
city a
where
2>(
select
count
(1)
from
city
where
cname=a.cname
and
value>a.value)
distribute
by
a.cname sort
by
a.cname,a.value
desc
;

但是这种写法在HQL中直接报语法错误了,下面我们只能用hive udf的思路来解决了:

排序city和value,然后对city计数,最后where过滤掉city列计数器大于k的行即可。

好了,上代码:

(1)定义UDF:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.
exec
.UDF;
      
public
final class Rank extends UDF{
    
private
int 
counter;
    
private String last_key;
    
public
int
evaluate(final String
key
){
      
if ( !
key
.equalsIgnoreCase(this.last_key) ) {
         
this.counter = 0;
         
this.last_key =
key
;
      
}
      
return
this.counter++;
    
}
}

(2)注册jar、建表、导数据,查询:

1
2
3
4
5
6
7
8
9
add
jar Rank.jar;
create
temporary
function
rank
as
'com.example.hive.udf.Rank'
;
create
table
city(id
int
,cname string,value
int
) row format delimited fields terminated
by
' '
;
LOAD
DATA
LOCAL
INPATH
'city.txt'
OVERWRITE
INTO
TABLE
city;
select
cname, value
from
(
    
select
cname,rank(cname) csum,value
from
(
        
select
id, cname, value
from
city distribute
by
cname sort
by
cname,value
desc
    
)a
)b
where
csum < 2;

(3)结果:

 

1
2
3
4
5
6
bj  600
bj  300
sh  900
sh  400
wh  500
wh  200
可以看到,hive相比pig来说,处理起来稍微复杂了点,但随着hive的日渐完善,以后比pig更简洁也说不定。

REF:hive中分组取前N个值的实现

 

3、最后我们来看一下原生态的MR:

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import
java.io.IOException;
import
java.util.TreeSet;
 
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.util.GenericOptionsParser;
 
public
class
GroupTopK {
    
// 这个 MR 将会取得每组年龄中 id 最大的前 3 个
    
// 测试数据由脚本生成:http://my.oschina.net/leejun2005/blog/76631
    
public
static
class
GroupTopKMapper
extends
            
Mapper<LongWritable, Text, IntWritable, LongWritable> {
        
IntWritable outKey =
new
IntWritable();
        
LongWritable outValue =
new
LongWritable();
        
String[] valArr =
null
;
 
        
public
void
map(LongWritable key, Text value, Context context)
                
throws
IOException, InterruptedException {
            
valArr = value.toString().split(
"\t"
);
            
outKey.set(Integer.parseInt(valArr[
2
]));
// age int
            
outValue.set(Long.parseLong(valArr[
0
]));
// id long
            
context.write(outKey, outValue);
        
}
    
}
 
    
public
static
class
GroupTopKReducer
extends
            
Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {
 
        
LongWritable outValue =
new
LongWritable();
 
        
public
void
reduce(IntWritable key, Iterable<LongWritable> values,
                
Context context)
throws
IOException, InterruptedException {
            
TreeSet<Long> idTreeSet =
new
TreeSet<Long>();
            
for
(LongWritable val : values) {
                
idTreeSet.add(val.get());
                
if
(idTreeSet.size() >
3
) {
                    
idTreeSet.remove(idTreeSet.first());
                
}
            
}
            
for
(Long id : idTreeSet) {
                
outValue.set(id);
                
context.write(key, outValue);
            
}
        
}
    
}
 
    
public
static
void
main(String[] args)
throws
Exception {
        
Configuration conf =
new
Configuration();
        
String[] otherArgs =
new
GenericOptionsParser(conf, args)
                
.getRemainingArgs();
 
        
System.out.println(otherArgs.length);
        
System.out.println(otherArgs[
0
]);
        
System.out.println(otherArgs[
1
]);
 
        
if
(otherArgs.length !=
3
) {
            
System.err.println(
"Usage: GroupTopK <in> <out>"
);
            
System.exit(
2
);
        
}
        
Job job =
new
Job(conf,
"GroupTopK"
);
        
job.setJarByClass(GroupTopK.
class
);
        
job.setMapperClass(GroupTopKMapper.
class
);
        
job.setReducerClass(GroupTopKReducer.
class
);
        
job.setNumReduceTasks(
1
);
        
job.setOutputKeyClass(IntWritable.
class
);
        
job.setOutputValueClass(LongWritable.
class
);
        
FileInputFormat.addInputPath(job,
new
Path(otherArgs[
1
]));
        
FileOutputFormat.setOutputPath(job,
new
Path(otherArgs[
2
]));
        
System.exit(job.waitForCompletion(
true
) ?
0
:
1
);
    
}
}

hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1

结果:

 

hadoop fs -cat /tmp/1/part-r-00000

0       12869695
0       12869971
0       12869976
1       12869813
1       12869870
1       12869951

......

数据验证:

awk '$3==0{print $1}' record_new.txt|sort -nr|head -3

12869976
12869971
12869695

可以看到结果没有问题。 

注:测试数据由以下脚本生成:

 

PS:

如果说hive类似sql的话,那pig就类似plsql存储过程了:程序编写更自由,逻辑能处理的更强大了。

pig中还能直接通过反射调用java的静态类中的方法,这块内容请参考之前的相关pig博文。

附几个HIVE UDAF链接,有兴趣的同学自己看下:

Hive UDAF和UDTF实现group by后获取top值 

hive中自定义函数(UDAF)实现多行字符串拼接为一行 
编写Hive UDAF 
Hive UDAF开发 

转载于:https://www.cnblogs.com/LeeZz/p/4725868.html

你可能感兴趣的文章
MVC中用Jquery、JS和Ajax 实现分页 存储过程是用mysql写的。
查看>>
APIO 2014 回文串(Manacher+后缀自动机+倍增)
查看>>
类的实例
查看>>
Git 更安全的强制推送,--force-with-lease
查看>>
Vue.js入门
查看>>
php中NULL、false、0、" "有何区别?
查看>>
从键盘读取数据,回车才能显示的问题
查看>>
[补档]Cube
查看>>
UESTC 914 方老师的分身I Dijkstra
查看>>
NumPy基础操作(1)
查看>>
如何解决cellIndex在IE下兼容性问题
查看>>
ASP.NET:关于.net中的runat
查看>>
Flash 环境 全屏效果
查看>>
HTML超文本标记语言(八)——表单<form>
查看>>
iOS开发----UI部分----iPhone各类屏幕的分辨率
查看>>
PHP面向对象(OOP)----分页类
查看>>
监听SD卡状态
查看>>
使用transform实现手风琴布局
查看>>
vs2017 EFCore 迁移数据库命令
查看>>
serialVersionUID的作用
查看>>