HBASE-3610 : Improve RegionSplitter performance
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1081566 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4f5e8d29db
commit
7c8f6406a5
|
@ -108,6 +108,7 @@ Release 0.91.0 - Unreleased
|
|||
HBASE-3630 DemoClient.Java is outdated (Moaz Reyed via Stack)
|
||||
HBASE-3618 Add to HBase book, 'schema' chapter - pre-creating regions and
|
||||
key types (Doug Meil via Stack)
|
||||
HBASE-3610 Improve RegionSplitter performance
|
||||
|
||||
TASK
|
||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.HServerAddress;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.NoServerForRegionException;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
|
||||
|
@ -432,6 +433,7 @@ public class RegionSplitter {
|
|||
if (Bytes.equals(split, sk)) {
|
||||
LOG.debug("Region already split on "
|
||||
+ splitAlgo.rowToStr(split) + ". Skipping this region...");
|
||||
++splitCount;
|
||||
dr = null;
|
||||
continue;
|
||||
}
|
||||
|
@ -450,96 +452,54 @@ public class RegionSplitter {
|
|||
continue;
|
||||
|
||||
// we have a good region, time to split!
|
||||
|
||||
byte[] start = dr.getFirst();
|
||||
byte[] split = dr.getSecond();
|
||||
// request split
|
||||
LOG.debug("Splitting at " + splitAlgo.rowToStr(split));
|
||||
HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
|
||||
admin.split(table.getTableName(), split);
|
||||
|
||||
LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
|
||||
if (conf.getBoolean("split.verify", true)) {
|
||||
// wait for one of the daughter regions to come online
|
||||
boolean daughterOnline = false;
|
||||
int daughterSleep = 5; // seconds
|
||||
while (!daughterOnline) {
|
||||
LOG.debug("Waiting for daughter region to come online...");
|
||||
table.clearRegionCache();
|
||||
HRegionInfo hri = table.getRegionLocation(split).getRegionInfo();
|
||||
daughterOnline = Bytes.equals(hri.getStartKey(), split)
|
||||
&& !hri.isOffline();
|
||||
daughterSleep = Math.min(daughterSleep * 2, 60);
|
||||
Thread.sleep(daughterSleep * 1000); // sleep
|
||||
// we need to verify and rate-limit our splits
|
||||
outstanding.addLast(dr);
|
||||
// with too many outstanding splits, wait for some to finish
|
||||
while (outstanding.size() >= MAX_OUTSTANDING) {
|
||||
finished = splitScan(outstanding, table, splitAlgo);
|
||||
if (finished.isEmpty()) {
|
||||
Thread.sleep(30 * 1000);
|
||||
} else {
|
||||
outstanding.removeAll(finished);
|
||||
}
|
||||
LOG.debug("Daughter region is online.");
|
||||
}
|
||||
} else {
|
||||
finished.add(dr);
|
||||
}
|
||||
|
||||
// mark the region as successfully split.
|
||||
// NOTE: split done, but daughter regions still need to major compact
|
||||
splitOut.writeChars("- " + splitAlgo.rowToStr(dr.getFirst()) + " "
|
||||
+ splitAlgo.rowToStr(dr.getSecond()) + "\n");
|
||||
// mark each finished region as successfully split.
|
||||
for (Pair<byte[], byte[]> region : finished) {
|
||||
splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
|
||||
+ " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
|
||||
splitCount++;
|
||||
if (splitCount % 10 == 0) {
|
||||
long tDiff = (System.currentTimeMillis() - startTime) / splitCount;
|
||||
long tDiff = (System.currentTimeMillis() - startTime)
|
||||
/ splitCount;
|
||||
LOG.debug("STATUS UPDATE: " + splitCount + " / " + origCount
|
||||
+ ". Avg Time / Split = "
|
||||
+ org.apache.hadoop.util.StringUtils.formatTime(tDiff));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
if (conf.getBoolean("split.verify", true)) {
|
||||
// if we have too many outstanding splits, wait for oldest ones to
|
||||
// finish
|
||||
outstanding.addLast(Pair.newPair(start, split));
|
||||
if (outstanding.size() > MAX_OUTSTANDING) {
|
||||
Pair<byte[], byte[]> reg = outstanding.removeFirst();
|
||||
String outStart = splitAlgo.rowToStr(reg.getFirst());
|
||||
String outSplit = splitAlgo.rowToStr(reg.getSecond());
|
||||
LOG.debug("Waiting for " + outStart + " , " + outSplit
|
||||
+ " to finish compaction");
|
||||
// when a daughter region is opened, a compaction is triggered
|
||||
// wait until compaction completes for both daughter regions
|
||||
LinkedList<HRegionInfo> check = Lists.newLinkedList();
|
||||
// figure out where this region should be in HDFS
|
||||
check
|
||||
.add(table.getRegionLocation(reg.getFirst()).getRegionInfo());
|
||||
check.add(table.getRegionLocation(reg.getSecond())
|
||||
.getRegionInfo());
|
||||
while (!check.isEmpty()) {
|
||||
// compaction is completed when all reference files are gone
|
||||
for (HRegionInfo hri : check.toArray(new HRegionInfo[] {})) {
|
||||
boolean refFound = false;
|
||||
byte[] sk = hri.getStartKey();
|
||||
if (sk.length == 0)
|
||||
sk = splitAlgo.firstRow();
|
||||
String startKey = splitAlgo.rowToStr(sk);
|
||||
// check every Column Family for that region
|
||||
for (HColumnDescriptor c : hri.getTableDesc().getFamilies()) {
|
||||
Path cfDir = Store.getStoreHomedir(tableDir, hri
|
||||
.getEncodedName(), c.getName());
|
||||
if (fs.exists(cfDir)) {
|
||||
for (FileStatus file : fs.listStatus(cfDir)) {
|
||||
refFound |= StoreFile.isReference(file.getPath());
|
||||
if (refFound) {
|
||||
LOG.debug("Reference still exists for " + startKey
|
||||
+ " at " + file.getPath());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (refFound)
|
||||
break;
|
||||
}
|
||||
if (!refFound) {
|
||||
check.remove(hri);
|
||||
LOG.debug("- finished compaction of " + startKey);
|
||||
}
|
||||
}
|
||||
// sleep in between requests
|
||||
if (!check.isEmpty()) {
|
||||
LOG.debug("Waiting for " + check.size() + " compactions");
|
||||
while (!outstanding.isEmpty()) {
|
||||
LinkedList<Pair<byte[], byte[]>> finished = splitScan(outstanding,
|
||||
table, splitAlgo);
|
||||
if (finished.isEmpty()) {
|
||||
Thread.sleep(30 * 1000);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
outstanding.removeAll(finished);
|
||||
for (Pair<byte[], byte[]> region : finished) {
|
||||
splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
|
||||
+ " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -558,6 +518,86 @@ public class RegionSplitter {
|
|||
fs.delete(splitFile, false);
|
||||
}
|
||||
|
||||
static LinkedList<Pair<byte[], byte[]>> splitScan(
|
||||
LinkedList<Pair<byte[], byte[]>> regionList, HTable table,
|
||||
SplitAlgorithm splitAlgo)
|
||||
throws IOException, InterruptedException {
|
||||
LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
|
||||
LinkedList<Pair<byte[], byte[]>> logicalSplitting = Lists.newLinkedList();
|
||||
LinkedList<Pair<byte[], byte[]>> physicalSplitting = Lists.newLinkedList();
|
||||
|
||||
// get table info
|
||||
Path hbDir = new Path(table.getConfiguration().get(HConstants.HBASE_DIR));
|
||||
Path tableDir = HTableDescriptor.getTableDir(hbDir, table.getTableName());
|
||||
Path splitFile = new Path(tableDir, "_balancedSplit");
|
||||
FileSystem fs = FileSystem.get(table.getConfiguration());
|
||||
|
||||
// clear the cache to forcibly refresh region information
|
||||
table.clearRegionCache();
|
||||
|
||||
// for every region that hasn't been verified as a finished split
|
||||
for (Pair<byte[], byte[]> region : regionList) {
|
||||
byte[] start = region.getFirst();
|
||||
byte[] split = region.getSecond();
|
||||
|
||||
// see if the new split daughter region has come online
|
||||
HRegionInfo dri = table.getRegionLocation(split).getRegionInfo();
|
||||
if (dri.isOffline() || !Bytes.equals(dri.getStartKey(), split)) {
|
||||
logicalSplitting.add(region);
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
// when a daughter region is opened, a compaction is triggered
|
||||
// wait until compaction completes for both daughter regions
|
||||
LinkedList<HRegionInfo> check = Lists.newLinkedList();
|
||||
check.add(table.getRegionLocation(start).getRegionInfo());
|
||||
check.add(table.getRegionLocation(split).getRegionInfo());
|
||||
for (HRegionInfo hri : check.toArray(new HRegionInfo[] {})) {
|
||||
boolean refFound = false;
|
||||
byte[] sk = hri.getStartKey();
|
||||
if (sk.length == 0)
|
||||
sk = splitAlgo.firstRow();
|
||||
String startKey = splitAlgo.rowToStr(sk);
|
||||
// check every Column Family for that region
|
||||
for (HColumnDescriptor c : hri.getTableDesc().getFamilies()) {
|
||||
Path cfDir = Store.getStoreHomedir(tableDir, hri.getEncodedName(),
|
||||
c.getName());
|
||||
if (fs.exists(cfDir)) {
|
||||
for (FileStatus file : fs.listStatus(cfDir)) {
|
||||
refFound |= StoreFile.isReference(file.getPath());
|
||||
if (refFound)
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (refFound)
|
||||
break;
|
||||
}
|
||||
// compaction is completed when all reference files are gone
|
||||
if (!refFound) {
|
||||
check.remove(hri);
|
||||
}
|
||||
}
|
||||
if (check.isEmpty()) {
|
||||
finished.add(region);
|
||||
} else {
|
||||
physicalSplitting.add(region);
|
||||
}
|
||||
} catch (NoServerForRegionException nsfre) {
|
||||
LOG.debug("No Server Exception thrown for: "
|
||||
+ splitAlgo.rowToStr(start));
|
||||
physicalSplitting.add(region);
|
||||
table.clearRegionCache();
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("Split Scan: " + finished.size() + " finished / "
|
||||
+ logicalSplitting.size() + " split wait / "
|
||||
+ physicalSplitting.size() + " reference wait");
|
||||
|
||||
return finished;
|
||||
}
|
||||
|
||||
static LinkedList<Pair<byte[], byte[]>> getSplits(HTable table,
|
||||
SplitAlgorithm splitAlgo) throws IOException {
|
||||
Path hbDir = new Path(table.getConfiguration().get(HConstants.HBASE_DIR));
|
||||
|
|
Loading…
Reference in New Issue