- 浏览: 1380660 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (346)
- linux (10)
- hbase (50)
- hadoop (23)
- java (52)
- java multi-thread (13)
- Oracle小记 (41)
- 机器学习 (12)
- 数据结构 (10)
- hadoop hive (16)
- java io (4)
- jms (1)
- web css (1)
- kafka (19)
- xml (2)
- j2ee (1)
- spring (6)
- ibatis (2)
- mysql (3)
- ext (3)
- lucene (3)
- hadoop pig (3)
- java nio (3)
- twemproxy (1)
- antlr (2)
- maven (6)
- mina (1)
- 列数据库 (1)
- oozie (2)
- mongodb (0)
- 报错 (0)
- jetty (1)
- neo4j (1)
- zookeeper (2)
- 数据挖掘 (3)
- jvm (1)
- 数据仓库 (4)
- shell (3)
- mahout (1)
- python (9)
- yarn (3)
- storm (6)
- scala (2)
- spark (5)
- tachyon (1)
最新评论
-
guokaiwhu:
赞啊!今晚遇到相同的问题,正追根溯源,就找到了博主!
hbase 报错gc wal.FSHLog: Error while AsyncSyncer sync, request close of hlog YouAr -
喁喁不止:
很清楚,有帮助。
hive常用函数 -
dsxwjhf:
Good job !!
kafka获得最新partition offset -
Locker.Xai:
参考了
freemaker教程 -
maoweiwer:
为啥EPHEMERAL_SEQUENTIAL类型的节点并没有自 ...
zookeeper 入门讲解实例 转
hadoop的join实现,实现符合关键字,多对多连接
key:
public class MultiKey implements WritableComparable<MultiKey> { private Text departId = new Text(); private Text departNo = new Text(); public Text getDepartId() { return departId; } public void setDepartId(String departId) { this.departId = new Text(departId); } public Text getDepartNo() { return departNo; } public void setDepartNo(String departNo) { this.departNo = new Text(departNo); } @Override public void write(DataOutput out) throws IOException { departId.write(out); departNo.write(out); } @Override public void readFields(DataInput in) throws IOException { this.departId.readFields(in); this.departNo.readFields(in); } @Override public int compareTo(MultiKey o) { return (this.departId.compareTo(o.departId) !=0)? this.departId.compareTo(o.departId) : this.departNo.compareTo(o.departNo); } @Override public String toString(){ return this.departId.toString()+" : "+this.departNo.toString(); } @Override public int hashCode(){ return 0; } }
value:
public class Employee implements WritableComparable<Employee> { private String empName=""; private String departId=""; private String departNo=""; private String departName=""; private int flag; public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } public String getEmpName() { return empName; } public void setEmpName(String empName) { this.empName = empName; } public String getDepartId() { return departId; } public void setDepartId(String departId) { this.departId = departId; } public String getDepartNo() { return departNo; } public void setDepartNo(String departNo) { this.departNo = departNo; } public String getDepartName() { return departName; } public void setDepartName(String departName) { this.departName = departName; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.empName); out.writeUTF(this.departId); out.writeUTF(this.departNo); out.writeUTF(this.departName); out.writeInt(this.flag); } @Override public void readFields(DataInput in) throws IOException { this.empName = in.readUTF(); this.departId = in.readUTF(); this.departNo = in.readUTF(); this.departName = in.readUTF(); this.flag = in.readInt(); } public static void writeAllProperties(DataOutput out,Class<? extends WritableComparable<?>> type,Object obj) throws IllegalArgumentException, IllegalAccessException{ Field[] fields = type.getDeclaredFields(); for (Field field : fields) { System.out.println(field.get(obj)); } } @Override public int compareTo(Employee o) { return 0; } @Override public String toString(){ return this.empName+" "+this.departName; } }
maper:
public class MyJoinMapper extends Mapper<LongWritable, Text, MultiKey, Employee>{ @Override public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String line = value.toString(); String[] array = line.split(","); visit(array,context); } private void visit(String[] array,Context context) throws IOException,InterruptedException{ int i = Integer.valueOf(array[0]); MultiKey key = new MultiKey(); Employee e = new Employee(); switch (i) { case 1://name e.setEmpName(array[1]); e.setFlag(1); break; default://depart e.setDepartName(array[1]); e.setFlag(2); break; } e.setDepartId(array[2]); e.setDepartNo(array[3]); key.setDepartId(e.getDepartId()); key.setDepartNo(e.getDepartNo()); context.write(key, e); } }
reducer:
public class MyJoinReducer extends Reducer<MultiKey, Employee, IntWritable, Text>{ List<emp> empList = new LinkedList<emp>(); List<depart> departList = new LinkedList<MyJoinReducer.depart>(); @Override public void reduce(MultiKey key,Iterable<Employee> values,Context context) throws IOException,InterruptedException{ for (Employee employee : values) { visite(employee); } System.out.println("----------"); System.out.println(key); for (emp em : empList) { for (depart de : departList) { Employee e = new Employee(); e.setDepartId(em.departId); e.setDepartName(de.departName); e.setDepartNo(em.departNo); e.setEmpName(em.empName); context.write(new IntWritable(1), new Text(e.toString())); } } empList = new LinkedList<emp>(); departList = new LinkedList<MyJoinReducer.depart>(); } private void visite(Employee e){ switch (e.getFlag()) { case 1: emp em = new emp(); em.departId = e.getDepartId(); em.departNo = e.getDepartName(); em.empName = e.getEmpName(); empList.add(em); break; default: depart de = new depart(); de.departName = e.getDepartName(); departList.add(de); break; } } private class emp{ public String empName; public String departId; public String departNo; } private class depart{ public String departName; } }
comparator
public class MyJoinComparator extends WritableComparator{ protected MyJoinComparator() { super(MultiKey.class,true); } }
groupcomparator:
public class MyJoinGroupComparator implements RawComparator<MultiKey> { private DataInputBuffer buffer = new DataInputBuffer(); @Override public int compare(MultiKey key1, MultiKey key2) { return key1.compareTo(key2); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return new MyJoinComparator().compare(b1, s1, l1, b2, s2, l2); } }
今天iteye的编辑器好坑爹啊,不断的崩溃
补个测试类
public class MyJoinTest { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { upload("dirk1.txt", "dirk.txt"); upload("dirk2.txt","dirk2.txt"); delete(); Configuration conf = new Configuration(); Job job = new Job(conf, "joinJob"); job.setMapperClass(MyJoinMapper.class); job.setReducerClass(MyJoinReducer.class); job.setMapOutputKeyClass(MultiKey.class); job.setMapOutputValueClass(Employee.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setGroupingComparatorClass(MyJoinGroupComparator.class); FileInputFormat.addInputPath(job, new Path("/user/dirk3/input")); FileOutputFormat.setOutputPath(job, new Path("/user/dirk3/output")); job.waitForCompletion(true); } public static void upload(String local,String remote) throws IOException{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); String l = MyJoinTest.class.getResource("").getPath()+"/"+local; fs.copyFromLocalFile(false, true, new Path(l), new Path("/user/dirk3/input/"+remote)); } public static void delete() throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); fs.delete(new Path("/user/dirk3/output"), true); } public static void run(){ JobConf jobConf = new JobConf(); jobConf.setOutputKeyComparatorClass(MyJoinComparator.class); jobConf.setOutputValueGroupingComparator(MyJoinComparator.class); } }
join的主要实现在reducer中
关于comparator,在通过maper向context中添加key value后,通过combine,partition之后,进入reducer阶段,进行groupComparator,决定哪些key同时进入一个reducer
发表评论
-
hadoop报错
2015-05-25 13:36 01.hadoop hdfs启动: Initializ ... -
hadoop 常用配置备忘
2015-04-30 16:04 0job名称 mapred.job.name job队列 ... -
protocal buffers入门实例
2014-09-22 21:08 1599hadoop yarn中新的系列化protocol buf ... -
hadoop MultipleOutputs规定多文件名
2014-09-18 20:58 1323在map或reduce中 1.初始化在configure或 ... -
基于hadoop的推荐算法-mahout版
2014-08-29 17:25 9447基于hadoop的推荐算法,讲其中mahout实现的基于项 ... -
Maven搭建hadoop环境报Missing artifact jdk.tools:jdk.tools:jar:1.6
2014-08-20 16:31 11086转http://blog.csdn.net/honglei9 ... -
hadoop hdfs读写
2014-07-20 14:04 986hadoop hdfs读写 hdfs读取文件 1 ... -
hadoop namenode报错
2014-06-06 19:40 957hadoop启动报错 2014-06-06 19:37:1 ... -
hadoop配置文件笔记
2014-05-15 23:13 1021mapred-site.xml n ... -
hadoop join
2014-03-09 23:09 1072转一个牛人的hadoop join博客 转 http:// ... -
hadoop 二次排序
2014-03-09 23:06 1662hadoop的工作流程: http://black ... -
hadoop 工作流程 图
2014-03-09 22:59 3897hadoop工作流程,用两张简单的map, redu ... -
hadoop secondnamenode配置
2014-02-28 20:26 2144一、secondnamenode是做什么的 ... -
hadoop map reduce参数
2014-01-21 21:06 0一个job会使用tasktracker的map任务槽数 ... -
hadoop 报错 org.apache.hadoop.mapred.TaskTracker: Process Thread Dump: lost task
2013-10-13 16:38 2330项目最近报错,形如: org.apache.hadoop. ... -
hadoop 调度器 capacityTaskScheduler
2013-09-27 20:24 1414贴两个不错的链接: http://blog.csdn ... -
hadoop 报错 org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException
2013-09-26 23:26 8753报错: org.apache.hadoop.hdfs.DF ... -
hadoop oozie 报错
2013-09-26 17:38 11441.oozie报异常泄露预警 关闭oozie,需要将tom ... -
hadoop自定义outputformat源码
2013-02-19 11:59 3470hadoop outputformat是reduceTask ... -
hadoop自定义inputformat源码
2013-02-17 18:14 2860hadoop的inputformat包括他 ...
相关推荐
展示使用MR方式实现表连接的代码示例。利用HIVE PIG之类的高层工具也可以实现,本代码旨在展示手工连接的流程
文件汗有三个java类,两个测试文件txt ReduceClass.java MapClass.java TaggedRecordWritable.java customers.txt ...经过亲自测试,可以将两个文件中的信息以groupby的key关联起来,查出类似数据库的join.
Join 算子在Hadoop 中的实现; 配置Hive 元数据DB 为PostgreSQL; ZooKeeper 权限管理机制; ZooKeeper 服务器工作原理和流程; ZooKeeper 实现共享锁; Hadoop 最佳实践; 通过Hadoop 的API 管理Job; Hadoop 集群...
它介于nosql和RDBMS之间,仅能通过主键(row key)和主键的range来检索数据,仅支持单行事务(可通过hive支持来实现多表join等复杂操作)。主要用来存储非结构化和半结构化的松散数据。 与hadoop一样,Hbase目标主要依靠...
大数据模式4 处理大数据的MapReduce 模式4.1 Join4.1.1 Repartition Join技术点19 优化repartition join 4.1.2 Replicated Join 4.1.3 Semi-join技术点20 实现semi-join4.1.4 为你的数据挑选最优的...
Join 算子在Hadoop 中的实现........................... 20 配置Hive 元数据DB 为PostgreSQL....................... 32 ZooKeeper 权限管理机制............................... 36 ZooKeeper 服务器工作原理和...
数据架构师第009节实战.对join实现的改进——优化reducer.mp4
MapReduceJoinExample 一个reduce join实现的例子运行示例hadoop jar MapReduceJoinExample-1.0-SNAPSHOT-job.jar s3://dags-public/wikistats/s3://dags-public/dbpedia/ /intermediate- date +%Y-%m-%d-%H-%M-%S /...
数据架构师第008节实战.join的实现原理和实战.mp4
技术点20 实现semi-join 4.1.4 为你的数据挑选最优的合并策略 4.2 排序 4.2.1 二次排序 技术点21 二次排序的实现 4.2.2 整体并行排序 技术点22 通过多个reducer 对key 进行排序 4.3 抽样 技术点23 ...
包org.dan.mr.order_pro_mapjoin MapReduce实现订单信息和产品信息的join逻辑,在Mapper端实现,避免数据倾斜 包org.dan.mr.wordindex MapReduce单词索引 包org.dan.mr.shared_friends MapReduce查找共同好友 包org....
03_MapReduce 二次排序回顾及Reduce Join实现详解 04_MapReduce 中Map Join实现思路及伪代码详解 05_Hive重点知识回顾总结及小表与大表关联时MapJoin优化 06_Hive中大表与大表关联时SMB Join优化 07_Hive中高级...
java实现数据同步源码 BigData-In-Practice 大数据项目仓库、涉及 Hadoop、Spark、Kafka、Hbase..... 等,更新中... 综合实践项目 项目名 说明 使用 Spark SQL imooc 访问日志,数据清洗,统计,可视化 入门学习示例...
为了解决分布式数据库下,复杂的SQL(如全局性的排序、分组、join、子查询,特别是非均衡字段的这些逻辑操作)难以实现的问题;在有了一些分布式数据库和 Hadoop实际应用经验的基础上,对比两者的优点和不足,加上...
这是哈希连接算法的 Java 实现,它在给定... 该项目的目标是比较 Hash Join 的不同实现。 在这里,您可以找到内存中散列连接的实现以及基于磁盘(具有基本文件系统)的其他实现。 将很快添加 Hadoop 上的另一个实现
本文对Hadoop中最基本的join方法进行简单介绍,这也是其它许多方法和优化措施的基础。文中所采用的例子来自于《HadoopinAction》一书中的5.2节。假设两个表所在的文件分别为Customers和Orders,以CSV格式存储在HDFS...
这是哈希连接算法的 Java 实现,它在给定连接键的几个表上应用连接操作。 该算法已经在机场频率数据集和机场通用数据集上进行了测试。... 在 src/HashJoinMapReduce 中: 您可以找到使用 Hadoop 的分布式实现