在这里贴一个pig源码的分析,做pig很长时间没做笔记,不包含任何细节,以后有机会再说吧
http://blackproof.iteye.com/blog/1769219
hadoop pig入门总结
- pig简介
- pig数据类型
- pig latin语法
- pig udf自定义
- pig derived衍生
- 推荐书籍 programming pig
- 推荐网站 http://pig.apache.org/docs/r0.10.0/basic.html
pig简介
pig是hadoop上层的衍生架构,与hive类似。对比hive(hive类似sql,是一种声明式的语言),pig是一种过程语言,类似于存储过程一步一步得进行数据转化。
pig数据类型
- double > float > long > int > bytearray
- tuple|bag|map|chararray > bytearray
double float long int chararray bytearray都相当于pig的基本类型
tuple相当于数组 ,但是可以类型不一,举例('dirkzhang','dallas',41)
Bag相当于tuple的一个集合,举例{('dirk',41),('kedde',2),('terre',31)},在group的时候会生成bag
Map相当于哈希表,key为chararray,value为任意类型,例如['name'#dirk,'age'#36,'num'#41
nulls 表示的不只是数据不存在,他更表示数据是unkown
pig latin语法
1:load
LOAD 'data' [USING function] [AS schema];
例如:
load = LOAD 'sql://{SELECT MONTH_ID,DAY_ID,PROV_ID FROM zb_d_bidwmb05009_010}' USING com.xxxx.dataplatform.bbdp.geniuspig.VerticaLoader('oracle','192.168.6.5','dev','1522','vbap','vbap','1') AS (MONTH_ID:chararray,DAY_ID:chararray,PROV_ID:chararray);
Table = load ‘url’ as (id,name…..); //table和load之间除了等号外 还必须有个空格 不然会出错,url一定要带引号,且只能是单引号。
2:filter
alias = FILTER alias BY expression;
Table = filter Table1 by + A; //A可以是 id > 10;not name matches ‘’,is not null 等,可以用and 和or连接各条件
例如:
filter = filter load20 by ( MONTH_ID == '1210' and DAY_ID == '18' and PROV_ID == '010' );
3:group
alias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING 'collected' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];
pig的分组,不仅是数据上的分组,在数据的schema形式上也进行分组为groupcolumn:bag
Table3 = group Table2 by id;也可以Table3 = group Table2 by (id,name);括号必须加
可以使用ALL实现对所有字段的分组
4:foreach
alias = FOREACH alias GENERATE expression [AS schema] [expression [AS schema]….];
alias = FOREACH nested_alias {
alias = {nested_op | nested_exp}; [{alias = {nested_op | nested_exp}; …]
GENERATE expression [AS schema] [expression [AS schema]….]
};
一般跟generate一块使用
Table = foreach Table generate (id,name);括号可加可不加。
avg = foreach Table generate group, AVG(age); MAX ,MIN..
在进行数据过滤时,建议尽早使用foreach generate将多余的数据过滤掉,减少数据交换
5:join
Inner join Syntax
alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n]; |
Outer join Syntax
alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; |
join/left join / right join
daily = load 'A' as (id,name, sex);
divs = load 'B' as (id,name, sex);
join
jnd = join daily by (id, name), divs by (id, name);
left join
jnd = join daily by (id, name) left outer, divs by (id, name);
也可以同时多个变量,但只用于inner join
A = load 'input1' as (x, y);
B = load 'input2' as (u, v);
C = load 'input3' as (e, f);
alpha = join A by x, B by u, C by e;
6: union
alias = UNION [ONSCHEMA] alias, alias [, alias …];
union 相当与sql中的union,但与sql不通的是pig中的union可以针对两个不同模式的变量:如果两个变量模式相同,那么union后的变量模式与 变量的模式一样;如果一个变量的模式可以由另一各变量的模式强制类型转换,那么union后的变量模式与转换后的变量模式相同;否则,union后的变量 没有模式。
A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:float);
C = union A, B;
describe C;
C: {x: int,y: float}
A = load 'input1' as (x:double, y:float);
B = load 'input2' as (x:int, y:double);
C = union A, B;
describe C;
C: {x: double,y: double}
A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:chararray);
C = union A, B;
describe C;
Schema for C unknown.
注意:在pig 1.0中 执行不了最后一种union。
如果需要对两个具有不通列名的变量union的话,可以使用onschema关键字
A = load 'input1' as (w: chararray, x:int, y:float);
B = load 'input2' as (x:int, y:double, z:chararray);
C = union onschema A, B;
describe C;
C: {w: chararray,x: int,y: double,z: chararray}
join和union之后alias的别名会变
7:Dump
dump alias
用于在屏幕上显示数据。
8:Order by
alias = ORDER alias BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [PARALLEL n];
A = order Table by id desc;
9:distinct
A = distinct alias;
10:limit
A = limit alias 10;
11:sample
SAMPLE alias size;
随机抽取指定比例(0到1)的数据。
some = sample divs 0.1;
13:cross
alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n];
将多个数据集中的数据按照字段名进行同值组合,形成笛卡尔积。
--cross.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,date:chararray, dividends:float);
tonsodata = cross daily, divs parallel 10;
15:split
Syntax
SPLIT alias INTO alias IF expression, alias IF expression [, alias IF expression …] [, alias OTHERWISE];
A = LOAD 'data' AS (f1:int,f2:int,f3:int);
DUMP A;
(1,2,3)
(4,5,6)
(7,8,9)
SPLIT A INTO X IF f1<7, Y IF f2==5, Z IF (f3<6 OR f3>6);
DUMP X;
(1,2,3)
(4,5,6)
DUMP Y;
(4,5,6)
DUMP Z;
(1,2,3)
(7,8,9)
16:store
Store … into … Using…
pig在别名维护上:
1、join
如e = join d by name,b by name;
g = foreach e generate $0 as one:chararray, $1 as two:int, $2 as three:chararray,$3 asfour:int;
他生成的schemal:
e: {d::name: chararray,d::position: int,b::name: chararray,b::age: int}
g: {one: chararray,two: int,three: chararray,four: int}
2、group
B = GROUP A BY age;
---------------------------------------------------------------------- | B | group: int | A: bag({name: chararray,age: int,gpa: float}) | ---------------------------------------------------------------------- | | 18 | {(John, 18, 4.0), (Joe, 18, 3.8)} | | | 20 | {(Bill, 20, 3.9)} | ----------------------------------------------------------------------
(18,{(John,18,4.0F),(Joe,18,3.8F)})
pig udf自定义
pig支持嵌入user defined function,一个简单的udf 继承于evalFunc,通常用在filter,foreach中
- public class MyUDF extends EvalFunc<String> {
- @Override
- public String exec(Tuple input) throws IOException {
- if(input == null || input.size() ==0)
- return null;
- try {
- String val = (String) input.get(0);
- return new StringBuffer(val).append(" pig").toString();
- } catch (Exception e) {
- throw new IOException(e.getMessage());
- }
- }
- }
pig支持udf in loader and store
udf loader 需要继承于LoadFunc
udf storer 需要继承于StoreFunc
这类似于hadoop中写inputformat和outputformat
其中vertica就是写了一个DB版本的
这里贴一个简单的loader的例子:
- public class MyLoader extends LoadFunc{
- protected RecordReader recordReader = null;
- private PreparedStatement ps;
- private Connection conn;
- private final String jdbcURL;
- private final String user;
- private final String pwd;
- private final String querySql;
- private ResultSet rs;
- public MyLoader(String driver,String jdbcURL,String user,String pwd,String querySql){
- try {
- Class.forName(driver);
- } catch (Exception e) {
- // TODO: handle exception
- }
- this.jdbcURL = jdbcURL;
- this.user = user;
- this.pwd = pwd;
- this.querySql = querySql;
- }
- @Override
- public InputFormat getInputFormat() throws IOException {
- return new PigTextInputFormat();
- }
- @Override
- public Tuple getNext() throws IOException {
- // TODO 重要的读取过程
- Text val = null;
- boolean next = false;
- try {
- next = rs.next();
- } catch (Exception e) {
- // TODO: handle exception
- }
- if(!next)
- return null;
- ResultSetMetaData rsmd;
- try {
- // rsmd = result
- } catch (Exception e) {
- // TODO: handle exception
- }
- return null;
- }
- @Override
- public void prepareToRead(RecordReader arg0, PigSplit arg1)
- throws IOException {
- this.recordReader = arg0;
- }
- @Override
- public void setLocation(String arg0, Job arg1) throws IOException {
- //no idea
- }
- public ResourceSchema getSchema(String location,Job job) throws IOException{
- Configuration conf = job.getConfiguration();
- Schema schema = new Schema();
- try {
- //TODO:reader from database table
- // Connection conn = DriverManager.getConnection(this.jdbcURL, this.user, this.pwd);
- FieldSchema fieldName = new FieldSchema("name", DataType.CHARARRAY);
- FieldSchema fieldPosition = new FieldSchema("position", DataType.INTEGER);
- schema.add(fieldName);
- schema.add(fieldPosition);
- } catch (Exception e) {
- //TODO log exception
- }
- return null;
- }
- public void prepareToRead(){
- }
- }
其中getNext方法就是如何处理reader读取出的数据
getSchema可以固定读取数据的schema
setLocation可以处理输入的数据源
prepareToRead是读取数据之前,可以在此做标识,等等
pig 衍生
1.penny:
1. Penny的描述
Penny是pig的贡献项目,是pig的调试和监控工具,而且支持根据API自定义penny的监视器和协作器,已实现不同的功能;
2. Penny的总架构
Penny将监视器插入到pig的工作操作中,主要用于监视pig数据流的变化,监视器可以调用协作器,完成各种功能。
3. Penny的总类图关系
ParsePigScript负责根据用户监视器生成新计划newPlan,在ToolsPigServer中根据以前的脚本执行新计划。在执行新计划时,当监视器监视对象数据发生变化,出发监视器,运行自定义的业务,也可以将数据流变化传回协作器里处理,总类图如下:
4. Penny的使用
Penny的使用需要自定义两个类,一个类继承于监视器基类MonitorAgent,另一个继承于协作器基类Coordinator。然后根据上边类图,就可以使用PennyServer和ParsePigScript进行监控和调试
5.在pig中就可以找到penny这个贡献的源码
Vertica:
vertica是pig loader和storer的udf
附件里是vertica,来自github,和vertica的介绍使用文档
贴一篇将vertica的帖子 http://blackproof.iteye.com/blog/1791995
推荐书籍
programming pig
推荐网址
http://pig.apache.org/docs/r0.10.0/basic.html 官网
pig pen开发工具,这个我现在玩得还不熟,就不介绍了,有兴趣的可以去搜搜玩玩
我在工作中pig的使用,主要是数据的ETL,所以比较适合。在选择pig hive还是其他非hadoop架构,如redis,这还是一个需要继续尝试探索的问题。
相关推荐
hadoop 第三版-权威指南-从入门到精通-中文pdf版本。介绍hadoop分布式文件系统,MapReduce的工作原理,并手把手教你如何构建hadoop集群,同时附带介绍了pig,hive,hbase,zookeeper,sqoop等hadoop家族的开源软件。
一、 HBase技术介绍 ...此外,Pig和Hive还为HBase提供了高层语言支持,使得在HBase上进行数据统计处理变的非常简单。 Sqoop则为HBase提供了方便的RDBMS数据导入功能,使得传统数据库数据向HBase中迁移变的非常方便。
Hadoop 安装 学习 入门教程 Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, ...
Hadoop背景及基础核心技术简介,包括mapreduce,HDFS,pig。。。
7.Hadoop入门进阶课程_第7周_Pig介绍、安装与应用案例.pdf
1、对hadoop、zookeeper、hbase、hive、sqoop、flume、pig的理论体系有个系统掌握,对核心知识能够掌握;阅读市面上的各种图书和各种文章不再有困难; 2、能够在linux环境搭建hadoop、hbase等的伪分布和分布模式,...
mortar-recsys, 一种基于砂浆数据的Hadoop和 Pig 定制推荐 Mortar推荐引擎一个定制推荐引擎的Hadoop和 Pig,通过迫击炮数据。入门/教程这个项目包含了几个完整的。可以运行的示例数据示例数据,以及一个模板项目。...
大数据与云计算教程课件 优质大数据课程 01.Hadoop简介与安装入门(共29页).pptx 大数据与云计算教程课件 优质大数据课程 02.MapReduce(共23页).pptx 大数据与云计算教程课件 优质大数据课程 03.Hadoop YARN(共...
从入门到实战,从多角度做了全面的修订和补充。不仅详细讲解了新一代的Hadoop技术,而且全面介绍了Hive、HBase、Mahout、Pig、ZooKeeper、Avro、Chukwa等重要技术,是系统学习Hadoop技术的首选之作
hadoop入门经典必读 Ready to unlock the power of your data? With this comprehensive guide, you’ll learn how to build and maintain reliable, scalable, distributed systems with Apache Hadoop. This book...
大数据与云计算教程课件 优质大数据课程 01.Hadoop简介与安装入门(共29页).pptx 大数据与云计算教程课件 优质大数据课程 02.MapReduce(共23页).pptx 大数据与云计算教程课件 优质大数据课程 03.Hadoop YARN(共...
大数据基础知识入门 社会保障事业部 张火磊 主要内容 大数据价值 03 大数据概念、特性、由来 01 大数据应用举例 04 02 Hadoop技术介绍 大数据概念、特性、由来 什么叫大数据? 麦肯锡全球研究所给出的定义是:一 种...
使用 Hadoop Yarn 的大数据分析/大数据科学的简化入门项目。 与 Spring Boot 和 Spring for Hadoop 集成,提供统一的配置模型和易于使用的 API 的访问,以便使用内置本地部署的 HDFS、MapReduce、Pig 和 Hive。 是的...
Apache Crunch 是基于 FlumeJava 实现的,...与 Pig 和 Hive 一样,Crunch 是为了降低 MapReduce 的入门成本。它们的区别是:Pig 是一个基于管道的框架,而 Crunch 则是一个 Java 库,它提供比 Pig 更高级别的灵活性。
05.Hadoop入门数据分析实战 06.HDFS 07.HDFS Shell命令 08.HDFS文件接口 09.MapReduce序列化 10.MapReduce MP过程进阶 11.MapReduce IO操作 12.序列化框架 13.深入MapReduce应用开发 14.Hadoop集群配置 15.Hive 16....
05.Hadoop入门数据分析实战 06.HDFS 07.HDFS Shell命令 08.HDFS文件接口 09.MapReduce序列化 10.MapReduce MP过程进阶 11.MapReduce IO操作 12.序列化框架 13.深入MapReduce应用开发 14.Hadoop集群配置 15.Hive 16....
05.Hadoop入门数据分析实战 06.HDFS 07.HDFS Shell命令 08.HDFS文件接口 09.MapReduce序列化 10.MapReduce MP过程进阶 11.MapReduce IO操作 12.序列化框架 13.深入MapReduce应用开发 14.Hadoop集群配置 15.Hive 16....
05.Hadoop入门数据分析实战 06.HDFS 07.HDFS Shell命令 08.HDFS文件接口 09.MapReduce序列化 10.MapReduce MP过程进阶 11.MapReduce IO操作 12.序列化框架 13.深入MapReduce应用开发 14.Hadoop集群配置 15.Hive 16....
pig的专业书籍,从部署到开发,让你轻松入门。
大数据与云计算教程课件 优质大数据课程 01.Hadoop简介与安装入门(共29页).pptx 大数据与云计算教程课件 优质大数据课程 02.MapReduce(共23页).pptx 大数据与云计算教程课件 优质大数据课程 03.Hadoop YARN(共...