再次吐槽公司的sb环境,不让上网不能插优盘,今天有事回家写一下笔记HBase region split
在管理集群时,最容易导致hbase节点发生故障的恐怕就是hbase region split和compact的了,日志有split时间太长;文件找不到;split的时候response too slow等等,所以先看看hbase region split源码,希望对以后能有帮助
HBase region split源码分析
一、流程概述
1.HBaseAdmin 发起 hbase split
2.HRegionServer 确定分割点 region split point
3.CompactSplitThread和SplitRequest 进行region分割
3.1SplitTransaction st.prepare()初始化两个子region
3.2splitTransaction execute执行分割
3.2.1两个子region DaughterOpener线程 start
3.2.2若region 需要compact,进行compact路程
3.2.3HRegionServer添加子region到meta表,加入到RegionServer里
3.3修改zk节点状态,等待split结束
二 、hbase region split UML图
三、详细分析
1.HBaseAdmin 发起 hbase split
public void split(final byte [] tableNameOrRegionName,
final byte [] splitPoint) throws IOException, InterruptedException {
CatalogTracker ct = getCatalogTracker();
try {
Pair<HRegionInfo, ServerName> regionServerPair
= getRegion(tableNameOrRegionName, ct);//获得HRI,若是但region
if (regionServerPair != null) {
if (regionServerPair.getSecond() == null) {
throw new NoServerForRegionException(Bytes.toStringBinary(tableNameOrRegionName));
} else {
//split region 重点分析方法
split(regionServerPair.getSecond(), regionServerPair.getFirst(), splitPoint);
}
} else {
//table split流程
final String tableName = tableNameString(tableNameOrRegionName, ct);
List<Pair<HRegionInfo, ServerName>> pairs =
MetaReader.getTableRegionsAndLocations(ct,
tableName);
for (Pair<HRegionInfo, ServerName> pair: pairs) {
// May not be a server for a particular row
if (pair.getSecond() == null) continue;
HRegionInfo r = pair.getFirst();
// check for parents
if (r.isSplitParent()) continue;
// if a split point given, only split that particular region
if (splitPoint != null && !r.containsRow(splitPoint)) continue;
// call out to region server to do split now
split(pair.getSecond(), pair.getFirst(), splitPoint);
}
}
} finally {
cleanupCatalogTracker(ct);
}
}
2.HRegionServer 确定分割点 region split point
@Override
public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
throws NotServingRegionException, IOException {
checkOpen();//检查server和hdfs是否可用
HRegion region = getRegion(regionInfo.getRegionName());//根据HRI获取region
region.flushcache();//flush cache 有几种情况不进行flush
//the cache is empte | the region is closed.| a flush is already in progress | writes are disabled
region.forceSplit(splitPoint);//设置split point
compactSplitThread.requestSplit(region, region.checkSplit());//获取split point,进行split
}
获得split point详细过程,获取最适合的store-hbase现在就是取最大的,获取store的midkey作为splitpoint
3.CompactSplitThread和SplitRequest 进行region分割
这里是split中较为复杂的过程
public void run() {
if (this.server.isStopping() || this.server.isStopped()) {
LOG.debug("Skipping split because server is stopping=" +
this.server.isStopping() + " or stopped=" + this.server.isStopped());
return;
}
try {
final long startTime = System.currentTimeMillis();
SplitTransaction st = new SplitTransaction(parent, midKey);
// If prepare does not return true, for some reason -- logged inside in
// the prepare call -- we are not ready to split just now. Just return.
// 3.1SplitTransaction st.prepare()初始化两个子region
if (!st.prepare()) return;
try {
st.execute(this.server, this.server);//3.2splitTransaction execute执行分割
this.server.getMetrics().incrementSplitSuccessCount();
} catch (Exception e) {
。。。。。。。。。。。。
3.2splitTransaction execute执行分割
public PairOfSameType<HRegion> execute(final Server server,
final RegionServerServices services)
throws IOException {
PairOfSameType<HRegion> regions = createDaughters(server, services);
//创建split临时目录,改变region zk状态,关闭region,停止所有store服务
//创建daughter目录,将region storefile放入目录中
//创建子region A、B,在zk上注册,并且设置原HRI下线
openDaughters(server, services, regions.getFirst(), regions.getSecond());
transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
return regions;
}
加一个同样复杂的
3.2.0 createDaughters函数的操作
这里创建两个子Region,包括他们的regioninfo,并且将父region的hfile引用写入子Region中
生成两个子region的代码:.stepsBeforePONR
- public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
- final RegionServerServices services, boolean testing) throws IOException {
-
-
- if (server != null && server.getZooKeeper() != null) {
- try {
-
- createNodeSplitting(server.getZooKeeper(),
- parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
- } catch (KeeperException e) {
- throw new IOException("Failed creating PENDING_SPLIT znode on " +
- this.parent.getRegionNameAsString(), e);
- }
- }
- this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
- if (server != null && server.getZooKeeper() != null) {
-
-
-
-
- znodeVersion = getZKNode(server, services);
- }
-
-
- this.parent.getRegionFileSystem().createSplitsDir();
- this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
-
- Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
- Exception exceptionToThrow = null;
- try{
-
- hstoreFilesToSplit = this.parent.close(false);
- } catch (Exception e) {
- exceptionToThrow = e;
- }
- if (exceptionToThrow == null && hstoreFilesToSplit == null) {
-
-
-
-
-
- exceptionToThrow = closedByOtherException;
- }
- if (exceptionToThrow != closedByOtherException) {
- this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
- }
- if (exceptionToThrow != null) {
- if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
- throw new IOException(exceptionToThrow);
- }
- if (!testing) {
-
- services.removeFromOnlineRegions(this.parent, null);
- }
- this.journal.add(JournalEntry.OFFLINED_PARENT);
-
-
-
-
-
-
-
-
- splitStoreFiles(hstoreFilesToSplit);
-
-
-
-
-
-
- this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
- HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
-
-
- this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
- HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
- return new PairOfSameType<HRegion>(a, b);
- }
1.RegionSplitPolicy.getSplitPoint()获得region split的split point ,最大store的中间点midpoint最为split point
2.SplitRequest.run()
实例化SplitTransaction
st.prepare():split前准备:region是否关闭,所有hfile是否被引用
st.execute:执行split操作
1.createDaughters 创建两个region,获得parent region的写锁
1在zk上创建一个临时的node splitting point,
2等待master直到这个region转为splitting状态
3之后建立splitting的文件夹,
4等待region的flush和compact都完成后,关闭这个region
5从HRegionServer上移除,加入到下线region中
6进行regionsplit操作,创建线程池,用StoreFileSplitter类将region下的所有Hfile(StoreFile)进行split,
(split row在hfile中的不管,其他的都进行引用,把引用文件分别写到region下边)
7.生成左右两个子region,删除meta上parent,根据引用文件生成子region的regioninfo,写到hdfs上
2.stepsAfterPONR 调用DaughterOpener类run打开两个子region,调用initilize
a)向hdfs上写入.regionInfo文件以便meta挂掉以便恢复
b)初始化其下的HStore,主要是LoadStoreFiles函数:
对于该store函数会构造storefile对象,从hdfs上获取路径和文件,每个文件一个
storefile对象,对每个storefile对象会读取文件上的内容创建一个
HalfStoreFileReader读对象来操作该region的父region上的相应的文件,及该
region上目前存储的是引用文件,其指向的是其父region上的相应的文件,对该
region的所有读或写都将关联到父region上
将子Region添加到rs的online region列表上,并添加到meta表上
(0.98版本,包含以下3.2.1~3)
(0.94版本,两个方法之后给合在了stepsAfterPONR里边)
3.2.1两个子region DaughterOpener线程 start
final RegionServerServices services, HRegion a, HRegion b)
throws IOException {
boolean stopped = server != null && server.isStopped();
boolean stopping = services != null && services.isStopping();
// TODO: Is this check needed here?
if (stopped || stopping) {
LOG.info("Not opening daughters " +
b.getRegionInfo().getRegionNameAsString() +
" and " +
a.getRegionInfo().getRegionNameAsString() +
" because stopping=" + stopping + ", stopped=" + stopped);
} else {
// Open daughters in parallel.创建两个字region打开操作类
DaughterOpener aOpener = new DaughterOpener(server, a);
DaughterOpener bOpener = new DaughterOpener(server, b);
aOpener.start();
bOpener.start();
try {
aOpener.join();
bOpener.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted " + e.getMessage());
}
if (aOpener.getException() != null) {
throw new IOException("Failed " +
aOpener.getName(), aOpener.getException());
}
if (bOpener.getException() != null) {
throw new IOException("Failed " +
bOpener.getName(), bOpener.getException());
}
if (services != null) {
try {
// add 2nd daughter first (see HBASE-4335)
services.postOpenDeployTasks(b, server.getCatalogTracker(), true);
// Should add it to OnlineRegions
services.addToOnlineRegions(b);
services.postOpenDeployTasks(a, server.getCatalogTracker(), true);
services.addToOnlineRegions(a);
} catch (KeeperException ke) {
throw new IOException(ke);
}
}
}
}
调用HRegion 打开方法openHRegion
protected HRegion openHRegion(final CancelableProgressable reporter)
throws IOException {
checkCompressionCodecs();
long seqid = initialize(reporter);
//初始化region,
//1.checkRegionInfoOnFilesystem将HRegionInfo写入文件
//2.cleanupTempDir 清空老region临时目录
//3.初始化HRegion store,加载hfile
//4.获得recover.edit文件,找到对应的store,将读取的keyvalue输出到store,恢复hregion
if (this.log != null) {
this.log.setSequenceNumber(seqid);
}
return this;
}
3.2.2若region 需要compact,进行compact过程
compact过程有点复杂,过程如下:
1.将所有storefile放入compact候选者
2.交给coprocessor做处理,选择compact storefile
3.若coprocessor没有做处理,则采用系统算法选择
3.1必须要进行compact的文件,文件大小大于compact最大值并且没有其他被引用
3.2必须要进行compact文件小于compact文件最小数
3.3 isMajorCompaction判断是否需要major compact
3.3.1当ttl大于storefile中最大文件compact time,则不需要
3.3.2 以上反之,需要
3.3.3 最后一次major compaction时间大于majorCompactionTime,需要
3.4 当compact文件大于compact文件最大数,且需要major compaction活强制major compaction,则进行major compaction
3.5或则进行minor compact,他两个的区别在于一个compact文件数是所有并且删除就tts和version的数据,一个compact文件数不大于maxcompactfile配置
public CompactionRequest requestCompaction(int priority) throws IOException {
// don't even select for compaction if writes are disabled
if (!this.region.areWritesEnabled()) {
return null;
}
CompactionRequest ret = null;
this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
// candidates = all storefiles not already in compaction queue
List<StoreFile> candidates = Lists.newArrayList(storefiles);
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = candidates.indexOf(last);
Preconditions.checkArgument(idx != -1);
candidates.subList(0, idx + 1).clear();
}
boolean override = false;
if (region.getCoprocessorHost() != null) {
override = region.getCoprocessorHost().preCompactSelection(
this, candidates);
}
CompactSelection filesToCompact;
if (override) {
// coprocessor is overriding normal file selection
filesToCompact = new CompactSelection(conf, candidates);
} else {
filesToCompact = compactSelection(candidates, priority);
}
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompactSelection(this,
ImmutableList.copyOf(filesToCompact.getFilesToCompact()));
}
// no files to compact
if (filesToCompact.getFilesToCompact().isEmpty()) {
return null;
}
// basic sanity check: do not try to compact the same StoreFile twice.
if (!Collections.disjoint(filesCompacting, filesToCompact.getFilesToCompact())) {
// TODO: change this from an IAE to LOG.error after sufficient testing
Preconditions.checkArgument(false, "%s overlaps with %s",
filesToCompact, filesCompacting);
}
filesCompacting.addAll(filesToCompact.getFilesToCompact());
Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
// major compaction iff all StoreFiles are included
boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
if (isMajor) {
// since we're enqueuing a major, update the compaction wait interval
this.forceMajor = false;
}
// everything went better than expected. create a compaction request
int pri = getCompactPriority(priority);
ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
}
} finally {
this.lock.readLock().unlock();
}
if (ret != null) {
CompactionRequest.preRequest(ret);
}
return ret;
}
在贴一段选compact文件的
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
final List<StoreFile> filesCompacting, final boolean isUserCompaction,
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
// Preliminary compaction subject to filters
ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
// Stuck and not compacting enough (estimate). It is not guaranteed that we will be
// able to compact more if stuck and compacting, because ratio policy excludes some
// non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
>= storeConfigInfo.getBlockingFileCount();
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
filesCompacting.size() + " compacting, " + candidateSelection.size() +
" eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
long cfTtl = this.storeConfigInfo.getStoreFileTtl();
if (!forceMajor) {
// If there are expired files, only select them so that compaction deletes them
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
if (expiredSelection != null) {
return new CompactionRequest(expiredSelection);
}
}
candidateSelection = skipLargeFiles(candidateSelection);
}
// Force a major compaction if this is a user-requested major compaction,
// or if we do not have too many files to compact and this was requested
// as a major compaction.
// Or, if there are any references among the candidates.
boolean majorCompaction = (
(forceMajor && isUserCompaction)
|| ((forceMajor || isMajorCompaction(candidateSelection))
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()))
|| StoreUtils.hasReferences(candidateSelection)
);
if (!majorCompaction) {
// we're doing a minor compaction, let's see what files are applicable
candidateSelection = filterBulk(candidateSelection);
candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
candidateSelection = checkMinFilesCriteria(candidateSelection);
}
candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
CompactionRequest result = new CompactionRequest(candidateSelection);
result.setOffPeak(!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak);
return result;
}
3.2.3HRegionServer添加子region到meta表,加入到RegionServer里
更新meta表
// If daughter of a split, update whole row, not just location.更新meta表 loaction和rowkey
MetaEditor.addDaughter(ct, r.getRegionInfo(),
this.serverNameFromMasterPOV);
加入regionserver
public void addToOnlineRegions(HRegion region) {
this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
}
3.3修改zk节点状态,等待split结束
/* package */void transitionZKNode(final Server server,
final RegionServerServices services, HRegion a, HRegion b)
throws IOException {
// Tell master about split by updating zk. If we fail, abort.
if (server != null && server.getZooKeeper() != null) {
try {
this.znodeVersion = transitionNodeSplit(server.getZooKeeper(),
parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
server.getServerName(), this.znodeVersion);
int spins = 0;
// Now wait for the master to process the split. We know it's done
// when the znode is deleted. The reason we keep tickling the znode is
// that it's possible for the master to miss an event.
do {
if (spins % 10 == 0) {
LOG.debug("Still waiting on the master to process the split for " +
this.parent.getRegionInfo().getEncodedName());
}
Thread.sleep(100);
// When this returns -1 it means the znode doesn't exist
this.znodeVersion = tickleNodeSplit(server.getZooKeeper(),
parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
server.getServerName(), this.znodeVersion);
spins++;
} while (this.znodeVersion != -1 && !server.isStopped()
&& !services.isStopping());
结束了,有时间再看看compact过程,其实在split中已经包含compact的过程,不知道是不是所有的compact流程都一样
相关推荐
hbase权威指南源码
HBASERegion数量增多问题描述及解决方案.docx
HBase源码分析,详细的源码分析,专业的知识分析,绝对难得
hbase-0.98.1-src.tar.gz hbase 0.98源码包
HBase实战 hbase in action 源码
HBase源码分析与开发实战视频技术讲解高阶视频教程以及课件,内部讲解资料 内容非常详细 值得想要提高薪水的人去学习了解
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业和毕设...基于spark streaming+kafka+hbase的日志统计分析系统源码+项目说明.zip
HBase的region split策略一共有以下几种: 1、ConstantSizeRegionSplitPolicy 0.94版本前默认切分策略 当region大小大于某个阈值(hbase.hregion.max.filesize=10G)之后就会触发切分,一个region等分为2个region。 ...
Hbase权威指南源码,找的不易请谅解
hbase源码分析,了解hbase原理和api的简单实用。
Hbase1.3.1源码
本文来自于36大数据,这篇文章将会对这些细节进行基本的说明,一方面可以让大家对HBase中Region自动切分有更加深入的理解,另一方面如果想实现类似的功能也可以参考HBase的实现方案。Region自动切分是HBase能够拥有...
hbase 1.2.0源码,学习大数据nosql数据库时小白们可以用得到
java操作Hbase之从Hbase中读取数据写入hdfs中源码,附带全部所需jar包,欢迎下载学习。
HBase性能深度分析HBase性能深度分析
hbase分析 原理分析 详细分析 bigtable解析
详解hbase负载均衡算法分析, hbase是一个非关系型列式数据库。