hbase increase
increase代码
1.将数据封装为increment对象
2.从increment对象中封装get
3.封装新kv
4.对HRegion下的Storm做upsert或add操作
5.查看是否需要flush并添加队列
6.返回kvs
HRegion代码,如下
/** * Perform one or more increment operations on a row. * @param increment * @return new keyvalues after increment * @throws IOException */ public Result increment(Increment increment, long nonceGroup, long nonce) throws IOException { byte [] row = increment.getRow(); checkRow(row, "increment"); TimeRange tr = increment.getTimeRange(); boolean flush = false; Durability durability = getEffectiveDurability(increment.getDurability()); boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; List<Cell> allKVs = new ArrayList<Cell>(increment.size()); Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>(); long size = 0; long txid = 0; checkReadOnly(); checkResources(); // Lock row startRegionOperation(Operation.INCREMENT); this.writeRequestsCount.increment(); WriteEntry w = null; try { RowLock rowLock = getRowLock(row); try { lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); // now start my own transaction w = mvcc.beginMemstoreInsert(); try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family for (Map.Entry<byte [], List<Cell>> family: increment.getFamilyCellMap().entrySet()) { Store store = stores.get(family.getKey()); List<Cell> kvs = new ArrayList<Cell>(family.getValue().size()); // Sort the cells so that they match the order that they // appear in the Get results. Otherwise, we won't be able to // find the existing values if the cells are not specified // in order by the client since cells are in an array list. Collections.sort(family.getValue(), store.getComparator()); // Get previous values for all columns in this family // 从increment封装get请求 Get get = new Get(row); for (Cell cell: family.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); get.addColumn(family.getKey(), kv.getQualifier()); } get.setTimeRange(tr.getMin(), tr.getMax()); List<Cell> results = get(get, false);//获得此increase的row // Iterate the input columns and update existing values if they were // found, otherwise add new column initialized to the increment amount int idx = 0; for (Cell kv: family.getValue()) { long amount = Bytes.toLong(CellUtil.cloneValue(kv));//获得当前value值 boolean noWriteBack = (amount == 0); Cell c = null; if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) { c = results.get(idx); if(c.getValueLength() == Bytes.SIZEOF_LONG) { amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG); } else { // throw DoNotRetryIOException instead of IllegalArgumentException throw new org.apache.hadoop.hbase.DoNotRetryIOException( "Attempted to increment field that isn't 64 bits wide"); } idx++; } // Append new incremented KeyValue to list byte[] q = CellUtil.cloneQualifier(kv); byte[] val = Bytes.toBytes(amount); int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength(); int incCellTagsLen = kv.getTagsLength(); KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, now, KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen); System.arraycopy(row, 0, newKV.getBuffer(), newKV.getRowOffset(), row.length); System.arraycopy(family.getKey(), 0, newKV.getBuffer(), newKV.getFamilyOffset(), family.getKey().length); System.arraycopy(q, 0, newKV.getBuffer(), newKV.getQualifierOffset(), q.length); // copy in the value System.arraycopy(val, 0, newKV.getBuffer(), newKV.getValueOffset(), val.length); // copy tags if (oldCellTagsLen > 0) { System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getBuffer(), newKV.getTagsOffset(), oldCellTagsLen); } if (incCellTagsLen > 0) { System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getBuffer(), newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen); } newKV.setMvccVersion(w.getWriteNumber()); // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL( RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKV)); } allKVs.add(newKV); if (!noWriteBack) { kvs.add(newKV); // Prepare WAL updates if (writeToWAL) { if (walEdits == null) { walEdits = new WALEdit(); } walEdits.add(newKV); } } } //store the kvs to the temporary memstore before writing HLog if (!kvs.isEmpty()) { tempMemstore.put(store, kvs); } } // Actually write to WAL now if (walEdits != null && !walEdits.isEmpty()) { if (writeToWAL) { // Using default cluster id, as this can only happen in the orginating // cluster. A slave cluster receives the final value (not the delta) // as a Put. txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce); } else { recordMutationWithoutWal(increment.getFamilyCellMap()); } } //Actually write to Memstore now if (!tempMemstore.isEmpty()) {//更新hbase kv for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 size += store.upsert(entry.getValue(), getSmallestReadPoint()); } else { // otherwise keep older versions around for (Cell cell : entry.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); size += store.add(kv); } } } size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } } finally { this.updatesLock.readLock().unlock(); } } finally { rowLock.release(); } if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) { // sync the transaction log outside the rowlock syncOrDefer(txid, durability); } } finally { if (w != null) { mvcc.completeMemstoreInsert(w); } closeRegionOperation(Operation.INCREMENT); if (this.metricsRegion != null) { this.metricsRegion.updateIncrement(); } } if (flush) { // Request a cache flush. Do it outside update lock. requestFlush(); } return Result.create(allKVs); }
在一些情况下,如increment压力过大时,会出现下列错误,startNonceOperation方法:
regionserver.ServerNonceManager: Conflict detected by nonce
一个mutation里边有多个相同的nonce的操作,如increment,这样会产生此日志,影响相应速度
相关推荐
经过3天测试,总结出可运行成功的C#For HBase示例代码 经过3天测试,总结出可运行成功的C#For HBase示例代码
VC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 ...
HBase基本操作 增删改查 java代码 要使用须导入对应的jar包
本代码是java链接并操作hbase的实例代码,原本供同事参考所用,引用的jar包并没有经过详细分析,可能有冗余。
hbase源码,适合研究分析底层实现。对hbase的原理的理解很有好处
scala语言编写的spark streamming消费kafka数据存入hbase示例代码。打包成jar包可以在spark2.4下运行,测试环境是CDH6.2,运行没有问题。
中国移动storm练习项目hbase代码
Hbase权威指南 随书源代码 源码包 绝对完整版 maven工程,带pom文件,可以直接作为一个完整工程导入eclipse等ide。
hbase 的java代码 集合 hbase 0.96
NULL 博文链接:https://xaocaotanghui.iteye.com/blog/2154210
增量式的Apriori算法,有点像分布式的Apriori,因为我们可以把已挖掘的事务集和新增的事务集看作两个互相独立的数据集,挖掘新增的事务集,获取所有新增频繁集,然后与已有的频繁集做并集,对于两边都同时频繁的项集...
hbase权威指南.源代码.绝对经典。。 下载后评分+评论,即可返回分数.
使用Java API连接虚拟机HBase并进行数据库操作,Java源代码
《hbase权威指南》随书示例源代码.方便学习
本代码是java链接hbase数据库并对hbase进行增删改查操作的实例代码,包括批量操作
《HBase权威指南》 示例代码, 其英文名为《HBase The Definitive Guide》
hbase0.94java源代码 希望对大家有帮助
HBase 中的高效无读增量 很多时候,当用于存储计数器时,为更新它们而执行的增量操作的结果会被忽略。 然而,HBase 会完成所有必要的工作来计算和返回结果值,这会通过执行冗余读取操作来降低效率。 (CDAP) 提供了...
HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。...通过阅读HBase开发包源代码有助于开发人员提升自己HBase的技术能力。