hadoop的inputformat和outputformat
最好的例子vertica :虽然是在pig中实现的udf,但是就是hadoop的inputformat和outputformat,在hive里也可以照用,贴个下载的地址:http://blackproof.iteye.com/blog/1791995
再贴一个项目中,在实现hadoop join时,用的inputformat和outputformat的简单实例:
hadoop join在http://blackproof.iteye.com/blog/1757530
自定义inputformat(泛型是maper的input)
public class MyInputFormat extends FileInputFormat<MultiKey,Employee> { public MyInputFormat(){} @Override public RecordReader<MultiKey, Employee> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub return new MyRecordReader(); } public static class MyRecordReader extends RecordReader<MultiKey, Employee>{ public LineReader in; public MultiKey key; public Employee value; public StringTokenizer token = null; public Text line; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub FileSplit fileSplit = (FileSplit)split; Configuration job = context.getConfiguration(); Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(job); FSDataInputStream filein = fs.open(file); in = new LineReader(filein, job); key = new MultiKey(); value = new Employee(); line = new Text(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { int linesize = in.readLine(line); if(linesize==0) return false; String[] pieces = line.toString().split(","); int i = Integer.valueOf(pieces[0]); switch (i) { case 1: value.setEmpName(pieces[1]); value.setFlag(1); break; default: value.setDepartName(pieces[1]); value.setFlag(2); break; } value.setDepartId(pieces[2]); value.setDepartNo(pieces[3]); key.setDepartId(value.getDepartId()); key.setDepartNo(value.getDepartNo()); return true; } @Override public MultiKey getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return key; } @Override public Employee getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return value; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void close() throws IOException { // TODO Auto-generated method stub } } }
自定义outputformat(泛型是reduce的输出)
public class MyOutputFormat extends FileOutputFormat<Text, Employee> { @Override public RecordWriter<Text, Employee> getRecordWriter( TaskAttemptContext job) throws IOException, InterruptedException { // TODO Auto-generated method stub Configuration conf = job.getConfiguration(); Path file = getDefaultWorkFile(job, ""); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(file, false); return new MyRecordWriter(fileOut); } public static class MyRecordWriter extends RecordWriter<Text, Employee>{ protected DataOutputStream out; private final byte[] keyValueSeparator; public static final String NEW_LINE = System.getProperty("line.separator"); public MyRecordWriter(DataOutputStream out){ this(out,":"); } public MyRecordWriter(DataOutputStream out,String keyValueSeparator){ this.out = out; this.keyValueSeparator = keyValueSeparator.getBytes(); } @Override public void write(Text key, Employee value) throws IOException, InterruptedException { if(key!=null){ out.write(key.toString().getBytes()); out.write(keyValueSeparator); } out.write(value.toString().getBytes()); out.write(NEW_LINE.getBytes()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { out.close(); } } }
相关推荐
自定义inputFormat&&outputFormat1
3.3 读和写 3.3.1 InputFormat 3.3.2 OutputFormat 3.4 小结第二部分 实战 第4章 编写MapReduce基础程序 4.1 获得专利数据集 4.1.1 专利引用数据 4.1.2 专利描述数据 4.2 构建MapReduce 程序的基础模板 4.3...
MapReduce自定义OutputFormat和RecordWriter实现 Pig自定义LoadFunc加载和解析Apache HTTP日志事件 Pig的自定义EvalFunc使用MaxMind GEO API将IP地址转换为位置 另外,请查看,了解如何实现MapReduce联接。 包装库 ...
,可从轻松使用这些InputFormat和OutputFormat类 从 LOAD和STORE到ElasticSearch的 一些用于与ElasticSearch进行交互的 ... < groupId>com.infochimps</ groupId> < artifactId>elasticsearch ...
Hadoop组件3.1 HDFS 文件操作3.1.1 基本文件命令3.1.2 编程读写HDFS3.2 剖析MapReduce 程序3.2.1 Hadoop数据类型3.2.2 Mapper3.2.3 Reducer3.2.4 Partitioner:重定向Mapper输出3.2.5 Combiner:本地reduce3.2.6 ...
MapReduce编程模型3.1 MapReduce编程模型概述3.1.1 MapReduce编程接口体系...InputFormat接口的设计与实现3.3.3 OutputFormat接口的设计与实现3.3.4 Mapper与Reducer解析3.3.5 Partitioner接口的设计与实现3.4 非...
413.2.5 Combiner:本地reduce 433.2.6 预定义mapper和Reducer类的单词计数 433.3 读和写 433.3.1 InputFormat 443.3.2 OutputFormat 493.4 小结 50第二部分 实战第4章 编写MapReduce基础程序 524.1...
TensorFlow生态系统 该存储库包含将TensorFlow与其他开源框架集成的示例。... -Hadoop MapReduce和Spark的TFRecord文件InputFormat / OutputFormat。 -Spark TensorFlow连接器 -Python软件包,可帮助用户使用TensorF
从高层次角度来看,整个过程就是Hadoop接收输入文件、使用自定义转换(Map-Reduce步骤)获得内容流,以及将输出文件的结果写回磁盘。上个月InfoQ展示了怎样在第一个步骤中,使用InputFormat类来更好地对接收输入文件...
433.3 读和写 433.3.1 InputFormat 443.3.2 OutputFormat 493.4 小结 50第二部分 实战第4章 编写MapReduce基础程序 524.1 获得专利数据集 524.1.1 专利引用数据 534.1.2 专利描述数据 544.2 构建MapReduce程序的基础...
FocusBigData :elephant:Hadoop分布存储框架 Hadoop篇 HDFS篇 ...MapReduce之InputFormat数据输入 MapReduce之OutputFormat数据输出 MapReduce之Shuffle机制 MapReduce之MapJoin和ReduceJoin MapReduce之