`

hadoop pig vertica

阅读更多

 

hadoop pig vertica是hadoop pig udf loader and storer的DB版,在github上开源,感谢感谢

本文就不贴代码了,附件里有源码,想了解的可以下载

 

先贴一张vertica的目录

 其中verticaLoader,verticaStorer是vertica给出的接口类,是hadoop pig的loader和storer的UDF

verticaLoader对于读取数据库分解为多个maper,其在于读取数据库的分页,以及sql语句的等价拆分。

verticalStorer则相对简单,只是简单的类型转换,insert语句到数据库

 

verticalLoader和verticaStorer现在只是做了简单的功能实现,对于连接池,多数据库支持,元数据等都没有考虑,并且loader的split策略过于单一。

 

这里贴出verticaLoader:

 

public class VerticaLoader extends LoadFunc implements LoadMetadata{
	......
        private RecordReader<?, ?> reader = null;

        //设置用户签名
	@Override
	public void setUDFContextSignature(String signature) {
    	System.out.println("setUDFContextSignature invocation...."+signature);
        this.contextSignature = signature;
    }

    @Override
	public InputFormat<?, ?> getInputFormat() throws IOException {
		System.out.println("getInputFormat invocation..."+getQuery()+"-----------"+getParameters());
        return new VerticaInputFormat(getQuery(), getParameters());
    }

	private Tuple translate(VerticaRecord v) throws IOException {
		int columns = v.size();
          Tuple result = TupleFactory.getInstance().newTuple(columns);

		for (int i = 0; i < columns; i++) {
			Object obj = v.get(i);
			Integer type = v.getType(i);
                              ......
			}
		}
		return result;
	}

   //获取reader读取后的数据,返回数据tuple
    @Override
	public Tuple getNext() throws IOException {
        ....
        return translate(value);
    }

	//读取前的准备
	@Override
	@SuppressWarnings("rawtypes")
	public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
		.....
  		this.reader = reader;
  	}
        //固定读取的schemal,这里去读db的datameta
	@Override
	public ResourceSchema getSchema(String location, Job job) throws IOException
	{
		。。。。。
	}
	
        //设置loader的地址,在pig script loader中的location,这里包含数据库   
       //信息,以及sql语句
	@Override
	public void setLocation(String location, Job job) throws IOException {
		......
	}

	@Override
	public String[] getPartitionKeys(String location, Job job) {
		return null;
	}

	@Override
	public void setPartitionFilter(Expression filter) {
	}

	@Override
	public ResourceStatistics getStatistics(String location, Job job) {
		return null;
	}

	//Important to override because of PIG-1378
	@Override
	public String relativeToAbsolutePath(String location, Path curdir) throws IOException {
		try {
			String enc = java.net.URLEncoder.encode(location, "UTF-8");
			return enc;
		} catch (java.io.UnsupportedEncodingException e) {
			throw new IOException(e);
		}
	}
	
}
 在verticaloader中方法getInputFormat,返回verticaInputFormat,它继承于org.apache.hadoop.mapreduce.InputFormat<LongWritable, VerticaRecord>。
想必不用多说它的实现,自定义hadoop inputFormat的帖子有不少,不在累述

 

在verticaInputFormat中的getSplit方法,就可以看到使用参数和分页去分解loader这个巨大得让数据库直接崩溃的操作。

 

在verticaIinputFormat中的createRecordReader方法中,返回的是VerticaRecordReader((VerticaInputSplit) split,context.getConfiguration());它继承于org.apache.hadoop.mapreduce.RecordReader<LongWritable, VerticaRecord>,自定义hadoop RecordReader的帖子也有不少,不在累述

贴出来它的nextKeyValue方法,就明了他的功能

@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		key.set(pos + start);
		pos++;
		try {
			if (results.next()) {
				for (int i = 0; i < nColumns; i++) {
					value.set(i, results.getObject(i+1));
				}
				return true;
			}
		} catch (SQLException e) {
			throw new IOException(e);
		}
		return false;
	}

 

这里还忘记说在inputformat中,getSplit方法中的verticaInputSplit

他的方法executeQuery在reader中调用,获取数据库的一次session

 

这里verticaInputFormat就分析完了,verticaOutputFormat就简略了,他就一个insert的操作

不属于vertica项目的DBStorage,已经实现了这个简单的功能,也在附件中可以参考一下

 

 

 

 

 

 

 

 

 
 
  • 大小: 24.7 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics