`

hadoop 自定义inputformat和outputformat

阅读更多

 

hadoop的inputformat和outputformat

 

最好的例子vertica :虽然是在pig中实现的udf,但是就是hadoop的inputformat和outputformat,在hive里也可以照用,贴个下载的地址:http://blackproof.iteye.com/blog/1791995

 

再贴一个项目中,在实现hadoop join时,用的inputformat和outputformat的简单实例:

hadoop join在http://blackproof.iteye.com/blog/1757530

   自定义inputformat(泛型是maper的input)

public class MyInputFormat extends FileInputFormat<MultiKey,Employee> {
	
	public MyInputFormat(){}

	@Override
	public RecordReader<MultiKey, Employee> createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {
		// TODO Auto-generated method stub
		return new MyRecordReader();
	}
	
	public static class MyRecordReader extends RecordReader<MultiKey, Employee>{

		public LineReader in;
		public MultiKey key;
		public Employee value;
		public StringTokenizer token = null;
		
		public Text line;
		
		@Override
		public void initialize(InputSplit split, TaskAttemptContext context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			FileSplit fileSplit = (FileSplit)split;
			Configuration job = context.getConfiguration();
			Path file = fileSplit.getPath();
			FileSystem fs = file.getFileSystem(job);
			
			FSDataInputStream filein = fs.open(file);
			in = new LineReader(filein, job);
			
			key = new MultiKey();
			value = new Employee();
			line = new Text();
		}

		@Override
		public boolean nextKeyValue() throws IOException, InterruptedException {

			int linesize = in.readLine(line);
			if(linesize==0)
				return false;
			String[] pieces = line.toString().split(",");
			int i = Integer.valueOf(pieces[0]);
			switch (i) {
			case 1:
				value.setEmpName(pieces[1]);
				value.setFlag(1);
				break;

			default:
				value.setDepartName(pieces[1]);
				value.setFlag(2);
				break;
			}
			value.setDepartId(pieces[2]);
			value.setDepartNo(pieces[3]);
			
			key.setDepartId(value.getDepartId());
			key.setDepartNo(value.getDepartNo());
			return true;
		}

		@Override
		public MultiKey getCurrentKey() throws IOException,
				InterruptedException {
			// TODO Auto-generated method stub
			return key;
		}

		@Override
		public Employee getCurrentValue() throws IOException,
				InterruptedException {
			// TODO Auto-generated method stub
			return value;
		}

		@Override
		public float getProgress() throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			return 0;
		}

		@Override
		public void close() throws IOException {
			// TODO Auto-generated method stub
			
		}
		
	}

}

 

 

   自定义outputformat(泛型是reduce的输出)

public class MyOutputFormat extends FileOutputFormat<Text, Employee> {

	@Override
	public RecordWriter<Text, Employee> getRecordWriter(
			TaskAttemptContext job) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf = job.getConfiguration();
		Path file = getDefaultWorkFile(job, "");
		FileSystem fs = file.getFileSystem(conf);
		FSDataOutputStream fileOut = fs.create(file, false);
		return new MyRecordWriter(fileOut);
	}
	
	public static class MyRecordWriter extends RecordWriter<Text, Employee>{

		protected DataOutputStream out;
		private final byte[] keyValueSeparator;
		 public static final String NEW_LINE = System.getProperty("line.separator");
		
		public MyRecordWriter(DataOutputStream out){
			this(out,":");
		}
		
		public MyRecordWriter(DataOutputStream out,String keyValueSeparator){
			this.out = out;
			this.keyValueSeparator = keyValueSeparator.getBytes();
		}
		
		@Override
		public void write(Text key, Employee value) throws IOException,
				InterruptedException {
			if(key!=null){
				out.write(key.toString().getBytes());
				out.write(keyValueSeparator);
			}
			out.write(value.toString().getBytes());
			out.write(NEW_LINE.getBytes());
		}

		@Override
		public void close(TaskAttemptContext context) throws IOException,
				InterruptedException {
			out.close();
		}
		
	}

}

 

分享到:
评论

相关推荐

    自定义inputFormat&&outputFormat1

    自定义inputFormat&&outputFormat1

    Hadoop实战中文版

    3.3 读和写 3.3.1 InputFormat 3.3.2 OutputFormat 3.4 小结第二部分 实战 第4章 编写MapReduce基础程序 4.1 获得专利数据集 4.1.1 专利引用数据 4.1.2 专利描述数据 4.2 构建MapReduce 程序的基础模板 4.3...

    mapreduce_training:用于教学目的的MapReduce应用程序集

    MapReduce自定义OutputFormat和RecordWriter实现 Pig自定义LoadFunc加载和解析Apache HTTP日志事件 Pig的自定义EvalFunc使用MaxMind GEO API将IP地址转换为位置 另外,请查看,了解如何实现MapReduce联接。 包装库 ...

    wonderdog:批量加载以进行弹性搜索

    ,可从轻松使用这些InputFormat和OutputFormat类 从 LOAD和STORE到ElasticSearch的 一些用于与ElasticSearch进行交互的 ... &lt; groupId&gt;com.infochimps&lt;/ groupId&gt; &lt; artifactId&gt;elasticsearch ...

    Hadoop实战(陆嘉恒)译

    Hadoop组件3.1 HDFS 文件操作3.1.1 基本文件命令3.1.2 编程读写HDFS3.2 剖析MapReduce 程序3.2.1 Hadoop数据类型3.2.2 Mapper3.2.3 Reducer3.2.4 Partitioner:重定向Mapper输出3.2.5 Combiner:本地reduce3.2.6 ...

    Hadoop技术内幕:深入解析MapReduce架构设计与实现原理

    MapReduce编程模型3.1 MapReduce编程模型概述3.1.1 MapReduce编程接口体系...InputFormat接口的设计与实现3.3.3 OutputFormat接口的设计与实现3.3.4 Mapper与Reducer解析3.3.5 Partitioner接口的设计与实现3.4 非...

    Hadoop实战中文版.PDF

    413.2.5 Combiner:本地reduce 433.2.6 预定义mapper和Reducer类的单词计数 433.3 读和写 433.3.1 InputFormat 443.3.2 OutputFormat 493.4 小结 50第二部分 实战第4章 编写MapReduce基础程序 524.1...

    ecosystem:TensorFlow与其他开源框架的集成

    TensorFlow生态系统 该存储库包含将TensorFlow与其他开源框架集成的示例。... -Hadoop MapReduce和Spark的TFRecord文件InputFormat / OutputFormat。 -Spark TensorFlow连接器 -Python软件包,可帮助用户使用TensorF

    探索HadoopOutputFormat

    从高层次角度来看,整个过程就是Hadoop接收输入文件、使用自定义转换(Map-Reduce步骤)获得内容流,以及将输出文件的结果写回磁盘。上个月InfoQ展示了怎样在第一个步骤中,使用InputFormat类来更好地对接收输入文件...

    Hadoop实战

    433.3 读和写 433.3.1 InputFormat 443.3.2 OutputFormat 493.4 小结 50第二部分 实战第4章 编写MapReduce基础程序 524.1 获得专利数据集 524.1.1 专利引用数据 534.1.2 专利描述数据 544.2 构建MapReduce程序的基础...

    FocusBigData:【大数据成神之路学习路径+面经+简历】

    FocusBigData :elephant:Hadoop分布存储框架 Hadoop篇 HDFS篇 ...MapReduce之InputFormat数据输入 MapReduce之OutputFormat数据输出 MapReduce之Shuffle机制 MapReduce之MapJoin和ReduceJoin MapReduce之

Global site tag (gtag.js) - Google Analytics