HBASE-3328 Added Admin API to specify explicit split points

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1049379 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Nicolas Spiegelberg 2010-12-15 01:44:06 +00:00
parent e6e0eea777
commit c74c097573
6 changed files with 167 additions and 85 deletions

View File

@ -1047,8 +1047,8 @@ public class HBaseAdmin implements Abortable {
} }
/** /**
* Split a table or an individual region. * Split a table or an individual region. Implicitly finds an optimal split
* Asynchronous operation. * point. Asynchronous operation.
* *
* @param tableNameOrRegionName table to region to split * @param tableNameOrRegionName table to region to split
* @throws IOException if a remote or network exception occurs * @throws IOException if a remote or network exception occurs
@ -1056,6 +1056,20 @@ public class HBaseAdmin implements Abortable {
*/ */
public void split(final byte [] tableNameOrRegionName) public void split(final byte [] tableNameOrRegionName)
throws IOException, InterruptedException { throws IOException, InterruptedException {
split(tableNameOrRegionName, null);
}
/**
* Split a table or an individual region.
* Asynchronous operation.
*
* @param tableNameOrRegionName table to region to split
* @param splitPoint the explicit position to split on
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException interrupt exception occurred
*/
public void split(final byte [] tableNameOrRegionName,
final byte [] splitPoint) throws IOException, InterruptedException {
CatalogTracker ct = getCatalogTracker(); CatalogTracker ct = getCatalogTracker();
try { try {
if (isRegionName(tableNameOrRegionName)) { if (isRegionName(tableNameOrRegionName)) {
@ -1066,7 +1080,7 @@ public class HBaseAdmin implements Abortable {
LOG.info("No server in .META. for " + LOG.info("No server in .META. for " +
Bytes.toString(tableNameOrRegionName) + "; pair=" + pair); Bytes.toString(tableNameOrRegionName) + "; pair=" + pair);
} else { } else {
split(pair.getSecond(), pair.getFirst()); split(pair.getSecond(), pair.getFirst(), splitPoint);
} }
} else { } else {
List<Pair<HRegionInfo, HServerAddress>> pairs = List<Pair<HRegionInfo, HServerAddress>> pairs =
@ -1075,7 +1089,12 @@ public class HBaseAdmin implements Abortable {
for (Pair<HRegionInfo, HServerAddress> pair: pairs) { for (Pair<HRegionInfo, HServerAddress> pair: pairs) {
// May not be a server for a particular row // May not be a server for a particular row
if (pair.getSecond() == null) continue; if (pair.getSecond() == null) continue;
split(pair.getSecond(), pair.getFirst()); if (splitPoint != null) {
// if a split point given, only split that particular region
HRegionInfo r = pair.getFirst();
if (!r.containsRow(splitPoint)) continue;
}
split(pair.getSecond(), pair.getFirst(), splitPoint);
} }
} }
} finally { } finally {
@ -1083,10 +1102,10 @@ public class HBaseAdmin implements Abortable {
} }
} }
private void split(final HServerAddress hsa, final HRegionInfo hri) private void split(final HServerAddress hsa, final HRegionInfo hri,
throws IOException { byte[] splitPoint) throws IOException {
HRegionInterface rs = this.connection.getHRegionConnection(hsa); HRegionInterface rs = this.connection.getHRegionConnection(hsa);
rs.splitRegion(hri); rs.splitRegion(hri, splitPoint);
} }
/** /**

View File

@ -371,6 +371,20 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion, Stoppable, Ab
void splitRegion(HRegionInfo regionInfo) void splitRegion(HRegionInfo regionInfo)
throws NotServingRegionException, IOException; throws NotServingRegionException, IOException;
/**
* Splits the specified region.
* <p>
* This method currently flushes the region and then forces a compaction which
* will then trigger a split. The flush is done synchronously but the
* compaction is asynchronous.
* @param regionInfo region to split
* @param splitPoint the explicit row to split on
* @throws NotServingRegionException
* @throws IOException
*/
void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
throws NotServingRegionException, IOException;
/** /**
* Compacts the specified region. Performs a major compaction if specified. * Compacts the specified region. Performs a major compaction if specified.
* <p> * <p>

View File

@ -242,6 +242,7 @@ public class HRegion implements HeapSize { // , Writable{
private final ReentrantReadWriteLock updatesLock = private final ReentrantReadWriteLock updatesLock =
new ReentrantReadWriteLock(); new ReentrantReadWriteLock();
private boolean splitRequest; private boolean splitRequest;
private byte[] splitPoint = null;
private final ReadWriteConsistencyControl rwcc = private final ReadWriteConsistencyControl rwcc =
new ReadWriteConsistencyControl(); new ReadWriteConsistencyControl();
@ -829,6 +830,10 @@ public class HRegion implements HeapSize { // , Writable{
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
} }
if (splitRow != null) {
assert splitPoint == null || Bytes.equals(splitRow, splitPoint);
this.splitPoint = null; // clear the split point (if set)
}
return splitRow; return splitRow;
} }
@ -3277,8 +3282,8 @@ public class HRegion implements HeapSize { // , Writable{
} }
public static final long FIXED_OVERHEAD = ClassSize.align( public static final long FIXED_OVERHEAD = ClassSize.align(
(4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN + (4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN + ClassSize.ARRAY +
(23 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); (24 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
(ClassSize.OBJECT * 2) + (2 * ClassSize.ATOMIC_BOOLEAN) + (ClassSize.OBJECT * 2) + (2 * ClassSize.ATOMIC_BOOLEAN) +
@ -3464,15 +3469,21 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
/** boolean shouldForceSplit() {
* For internal use in forcing splits ahead of file size limit. return this.splitRequest;
* @param b }
* @return previous value
*/ byte[] getSplitPoint() {
public boolean shouldSplit(boolean b) { return this.splitPoint;
boolean old = this.splitRequest; }
this.splitRequest = b;
return old; void forceSplit(byte[] sp) {
// NOTE : this HRegion will go away after the forced split is successfull
// therefore, no reason to clear this value
this.splitRequest = true;
if (sp != null) {
this.splitPoint = sp;
}
} }
/** /**

View File

@ -2133,9 +2133,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
@Override @Override
public void splitRegion(HRegionInfo regionInfo) public void splitRegion(HRegionInfo regionInfo)
throws NotServingRegionException, IOException { throws NotServingRegionException, IOException {
splitRegion(regionInfo, null);
}
@Override
public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
throws NotServingRegionException, IOException {
HRegion region = getRegion(regionInfo.getRegionName()); HRegion region = getRegion(regionInfo.getRegionName());
region.flushcache(); region.flushcache();
region.shouldSplit(true); region.forceSplit(splitPoint);
// force a compaction, split will be side-effect // force a compaction, split will be side-effect
// TODO: flush/compact/split refactor will make it trivial to do this // TODO: flush/compact/split refactor will make it trivial to do this
// sync/async (and won't require us to do a compaction to split!) // sync/async (and won't require us to do a compaction to split!)

View File

@ -614,7 +614,7 @@ public class Store implements HeapSize {
* @throws IOException * @throws IOException
*/ */
StoreSize compact(final boolean forceMajor) throws IOException { StoreSize compact(final boolean forceMajor) throws IOException {
boolean forceSplit = this.region.shouldSplit(false); boolean forceSplit = this.region.shouldForceSplit();
synchronized (compactLock) { synchronized (compactLock) {
this.lastCompactSize = 0; // reset first in case compaction is aborted this.lastCompactSize = 0; // reset first in case compaction is aborted
@ -1334,6 +1334,10 @@ public class Store implements HeapSize {
largestSf = sf; largestSf = sf;
} }
} }
// if the user explicit set a split point, use that
if (this.region.getSplitPoint() != null) {
return new StoreSize(maxSize, this.region.getSplitPoint());
}
StoreFile.Reader r = largestSf.getReader(); StoreFile.Reader r = largestSf.getReader();
if (r == null) { if (r == null) {
LOG.warn("Storefile " + largestSf + " Reader is null"); LOG.warn("Storefile " + largestSf + " Reader is null");

View File

@ -149,7 +149,8 @@ public class TestAdmin {
* Verify schema modification takes. * Verify schema modification takes.
* @throws IOException * @throws IOException
*/ */
@Test public void testChangeTableSchema() throws IOException { @Test
public void testChangeTableSchema() throws IOException {
final byte [] tableName = Bytes.toBytes("changeTableSchema"); final byte [] tableName = Bytes.toBytes("changeTableSchema");
HTableDescriptor [] tables = admin.listTables(); HTableDescriptor [] tables = admin.listTables();
int numTables = tables.length; int numTables = tables.length;
@ -474,85 +475,112 @@ public class TestAdmin {
*/ */
@Test @Test
public void testForceSplit() throws Exception { public void testForceSplit() throws Exception {
splitTest(null);
splitTest(Bytes.toBytes("pwn"));
}
void splitTest(byte[] splitPoint) throws Exception {
byte [] familyName = HConstants.CATALOG_FAMILY; byte [] familyName = HConstants.CATALOG_FAMILY;
byte [] tableName = Bytes.toBytes("testForceSplit"); byte [] tableName = Bytes.toBytes("testForceSplit");
assertFalse(admin.tableExists(tableName));
final HTable table = TEST_UTIL.createTable(tableName, familyName); final HTable table = TEST_UTIL.createTable(tableName, familyName);
byte[] k = new byte[3]; try {
int rowCount = 0; byte[] k = new byte[3];
for (byte b1 = 'a'; b1 < 'z'; b1++) { int rowCount = 0;
for (byte b2 = 'a'; b2 < 'z'; b2++) { for (byte b1 = 'a'; b1 < 'z'; b1++) {
for (byte b3 = 'a'; b3 < 'z'; b3++) { for (byte b2 = 'a'; b2 < 'z'; b2++) {
k[0] = b1; for (byte b3 = 'a'; b3 < 'z'; b3++) {
k[1] = b2; k[0] = b1;
k[2] = b3; k[1] = b2;
Put put = new Put(k); k[2] = b3;
put.add(familyName, new byte[0], k); Put put = new Put(k);
table.put(put); put.add(familyName, new byte[0], k);
rowCount++; table.put(put);
rowCount++;
}
} }
} }
}
// get the initial layout (should just be one region) // get the initial layout (should just be one region)
Map<HRegionInfo,HServerAddress> m = table.getRegionsInfo(); Map<HRegionInfo,HServerAddress> m = table.getRegionsInfo();
System.out.println("Initial regions (" + m.size() + "): " + m); System.out.println("Initial regions (" + m.size() + "): " + m);
assertTrue(m.size() == 1); assertTrue(m.size() == 1);
// Verify row count // Verify row count
Scan scan = new Scan(); Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan); ResultScanner scanner = table.getScanner(scan);
int rows = 0; int rows = 0;
for(@SuppressWarnings("unused") Result result : scanner) { for(@SuppressWarnings("unused") Result result : scanner) {
rows++; rows++;
} }
scanner.close(); scanner.close();
assertEquals(rowCount, rows); assertEquals(rowCount, rows);
// Have an outstanding scan going on to make sure we can scan over splits. // Have an outstanding scan going on to make sure we can scan over splits.
scan = new Scan(); scan = new Scan();
scanner = table.getScanner(scan); scanner = table.getScanner(scan);
// Scan first row so we are into first region before split happens. // Scan first row so we are into first region before split happens.
scanner.next(); scanner.next();
final AtomicInteger count = new AtomicInteger(0); final AtomicInteger count = new AtomicInteger(0);
Thread t = new Thread("CheckForSplit") { Thread t = new Thread("CheckForSplit") {
public void run() { public void run() {
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
try { try {
sleep(1000); sleep(1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
continue; continue;
}
// check again table = new HTable(conf, tableName);
Map<HRegionInfo, HServerAddress> regions = null;
try {
regions = table.getRegionsInfo();
} catch (IOException e) {
e.printStackTrace();
}
if (regions == null) continue;
count.set(regions.size());
if (count.get() >= 2) break;
LOG.debug("Cycle waiting on split");
} }
// check again table = new HTable(conf, tableName); }
Map<HRegionInfo, HServerAddress> regions = null; };
try { t.start();
regions = table.getRegionsInfo(); // Split the table
} catch (IOException e) { this.admin.split(tableName, splitPoint);
e.printStackTrace(); t.join();
}
if (regions == null) continue; // Verify row count
count.set(regions.size()); rows = 1; // We counted one row above.
if (count.get() >= 2) break; for (@SuppressWarnings("unused") Result result : scanner) {
LOG.debug("Cycle waiting on split"); rows++;
if (rows > rowCount) {
scanner.close();
assertTrue("Scanned more than expected (" + rowCount + ")", false);
} }
} }
}; scanner.close();
t.start(); assertEquals(rowCount, rows);
// Split the table
this.admin.split(Bytes.toString(tableName));
t.join();
// Verify row count if (splitPoint != null) {
rows = 1; // We counted one row above. // make sure the split point matches our explicit configuration
for (@SuppressWarnings("unused") Result result : scanner) { Map<HRegionInfo, HServerAddress> regions = null;
rows++; try {
if (rows > rowCount) { regions = table.getRegionsInfo();
scanner.close(); } catch (IOException e) {
assertTrue("Scanned more than expected (" + rowCount + ")", false); e.printStackTrace();
}
assertEquals(2, regions.size());
HRegionInfo[] r = regions.keySet().toArray(new HRegionInfo[0]);
assertEquals(Bytes.toString(splitPoint),
Bytes.toString(r[0].getEndKey()));
assertEquals(Bytes.toString(splitPoint),
Bytes.toString(r[1].getStartKey()));
LOG.debug("Properly split on " + Bytes.toString(splitPoint));
} }
} finally {
TEST_UTIL.deleteTable(tableName);
} }
scanner.close();
assertEquals(rowCount, rows);
} }
/** /**