HBASE-17669: Implement async mergeRegion/splitRegion methods

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
huzheng 2017-03-02 11:26:20 +08:00 committed by zhangduo
parent f1c1f258e5
commit faf81d5133
5 changed files with 389 additions and 14 deletions

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.util.HashKey;
import org.apache.hadoop.hbase.util.JenkinsHash;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.util.StringUtils;
/**
* Information about a region. A region is a range of keys in the whole keyspace of a table, an
@ -582,6 +583,19 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
return elements;
}
public static boolean isEncodedRegionName(byte[] regionName) throws IOException {
try {
HRegionInfo.parseRegionName(regionName);
return false;
} catch (IOException e) {
if (StringUtils.stringifyException(e)
.contains(HRegionInfo.INVALID_REGION_NAME_FORMAT_MESSAGE)) {
return true;
}
throw e;
}
}
/** @return the regionId */
public long getRegionId(){
return regionId;

View File

@ -364,4 +364,40 @@ public interface AsyncAdmin {
* @param hri
*/
CompletableFuture<Void> closeRegion(ServerName sn, HRegionInfo hri);
/**
* Merge two regions.
* @param nameOfRegionA encoded or full name of region a
* @param nameOfRegionB encoded or full name of region b
* @param forcible true if do a compulsory merge, otherwise we will only merge two adjacent
* regions
*/
CompletableFuture<Void> mergeRegions(final byte[] nameOfRegionA, final byte[] nameOfRegionB,
final boolean forcible);
/**
* Split a table. The method will execute split action for each region in table.
* @param tableName table to split
*/
CompletableFuture<Void> split(final TableName tableName);
/**
* Split an individual region.
* @param regionName region to split
*/
CompletableFuture<Void> splitRegion(final byte[] regionName);
/**
* Split a table.
* @param tableName table to split
* @param splitPoint the explicit position to split on
*/
CompletableFuture<Void> split(final TableName tableName, final byte[] splitPoint);
/**
* Split an individual region.
* @param regionName region to split
* @param splitPoint the explicit position to split on
*/
CompletableFuture<Void> splitRegion(final byte[] regionName, final byte[] splitPoint);
}

View File

@ -20,14 +20,18 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -35,6 +39,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
@ -49,6 +54,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@ -56,6 +62,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
@ -90,6 +98,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancer
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
@ -670,6 +680,227 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
.serverName(sn).call();
}
private byte[] toEncodeRegionName(byte[] regionName) {
try {
return HRegionInfo.isEncodedRegionName(regionName) ? regionName
: Bytes.toBytes(HRegionInfo.encodeRegionName(regionName));
} catch (IOException e) {
return regionName;
}
}
private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
CompletableFuture<TableName> result) {
getRegion(encodeRegionName).whenComplete((p, err) -> {
if (err != null) {
result.completeExceptionally(err);
return;
}
if (p == null) {
result.completeExceptionally(new UnknownRegionException(
"Can't invoke merge on unknown region " + Bytes.toStringBinary(encodeRegionName)));
return;
}
if (p.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
result.completeExceptionally(
new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
return;
}
if (!tableName.compareAndSet(null, p.getFirst().getTable())) {
if (!tableName.get().equals(p.getFirst().getTable())) {
// tables of this two region should be same.
result.completeExceptionally(
new IllegalArgumentException("Cannot merge regions from two different tables "
+ tableName.get() + " and " + p.getFirst().getTable()));
} else {
result.complete(tableName.get());
}
}
});
}
private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA,
byte[] encodeRegionNameB) {
AtomicReference<TableName> tableNameRef = new AtomicReference<>();
CompletableFuture<TableName> future = new CompletableFuture<>();
checkAndGetTableName(encodeRegionNameA, tableNameRef, future);
checkAndGetTableName(encodeRegionNameB, tableNameRef, future);
return future;
}
@Override
public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB,
boolean forcible) {
CompletableFuture<Void> future = new CompletableFuture<>();
final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA);
final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB);
checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB)
.whenComplete((tableName, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
MergeTableRegionsRequest request = null;
try {
request = RequestConverter.buildMergeTableRegionsRequest(
new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
ng.newNonce());
} catch (DeserializationException e) {
future.completeExceptionally(e);
return;
}
this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
(s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
new MergeTableRegionProcedureBiConsumer(this, tableName)).whenComplete((ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret);
}
});
});
return future;
}
@Override
public CompletableFuture<Void> split(TableName tableName) {
CompletableFuture<Void> future = new CompletableFuture<>();
tableExists(tableName).whenComplete((exist, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
if (!exist) {
future.completeExceptionally(new TableNotFoundException(tableName));
return;
}
metaTable
.scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
.withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
.withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION)))
.whenComplete((results, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
return;
}
if (results != null && !results.isEmpty()) {
List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
for (Result r : results) {
if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue;
RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
if (rl != null) {
for (HRegionLocation h : rl.getRegionLocations()) {
if (h != null && h.getServerName() != null) {
HRegionInfo hri = h.getRegionInfo();
if (hri == null || hri.isSplitParent()
|| hri.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID)
continue;
splitFutures.add(split(h.getServerName(), hri, null));
}
}
}
}
CompletableFuture
.allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()]))
.whenComplete((ret, exception) -> {
if (exception != null) {
future.completeExceptionally(exception);
return;
}
future.complete(ret);
});
} else {
future.complete(null);
}
});
});
return future;
}
@Override
public CompletableFuture<Void> splitRegion(byte[] regionName) {
return splitRegion(regionName, null);
}
@Override
public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
CompletableFuture<Void> result = new CompletableFuture<>();
if (splitPoint == null) {
return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
}
connection.getRegionLocator(tableName).getRegionLocation(splitPoint)
.whenComplete((loc, err) -> {
if (err != null) {
result.completeExceptionally(err);
} else if (loc == null || loc.getRegionInfo() == null) {
result.completeExceptionally(new IllegalArgumentException(
"Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
} else {
splitRegion(loc.getRegionInfo().getRegionName(), splitPoint)
.whenComplete((ret, err2) -> {
if (err2 != null) {
result.completeExceptionally(err2);
} else {
result.complete(ret);
}
});
}
});
return result;
}
@Override
public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
CompletableFuture<Void> future = new CompletableFuture<>();
getRegion(regionName).whenComplete((p, err) -> {
if (p == null) {
future.completeExceptionally(
new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName)));
return;
}
if (p.getFirst() != null && p.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
+ "Replicas are auto-split when their primary is split."));
return;
}
if (p.getSecond() == null) {
future.completeExceptionally(
new NoServerForRegionException(Bytes.toStringBinary(regionName)));
return;
}
split(p.getSecond(), p.getFirst(), splitPoint).whenComplete((ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret);
}
});
});
return future;
}
@VisibleForTesting
public CompletableFuture<Void> split(final ServerName sn, final HRegionInfo hri,
byte[] splitPoint) {
if (hri.getStartKey() != null && splitPoint != null
&& Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {
return failedFuture(
new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
}
return this.<Void> newAdminCaller()
.action(
(controller, stub) -> this.<SplitRegionRequest, SplitRegionResponse, Void> adminCall(
controller, stub, ProtobufUtil.buildSplitRegionRequest(hri.getRegionName(), splitPoint),
(s, c, req, done) -> s.splitRegion(controller, req, done), resp -> null))
.serverName(sn).call();
}
private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
if (numRegions < 3) {
throw new IllegalArgumentException("Must create at least three regions");
@ -885,6 +1116,17 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
}
}
private class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
MergeTableRegionProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
super(admin, tableName);
}
String getOperationType() {
return "MERGE_REGIONS";
}
}
private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
CompletableFuture<Void> future = new CompletableFuture<>();
procFuture.whenComplete((procId, error) -> {

View File

@ -1497,19 +1497,6 @@ public class HBaseAdmin implements Admin {
});
}
private boolean isEncodedRegionName(byte[] regionName) throws IOException {
try {
HRegionInfo.parseRegionName(regionName);
return false;
} catch (IOException e) {
if (StringUtils.stringifyException(e)
.contains(HRegionInfo.INVALID_REGION_NAME_FORMAT_MESSAGE)) {
return true;
}
throw e;
}
}
/**
* Merge two regions. Synchronous operation.
* Note: It is not feasible to predict the length of merge.
@ -1582,7 +1569,7 @@ public class HBaseAdmin implements Admin {
assert(nameofRegionsToMerge.length >= 2);
byte[][] encodedNameofRegionsToMerge = new byte[nameofRegionsToMerge.length][];
for(int i = 0; i < nameofRegionsToMerge.length; i++) {
encodedNameofRegionsToMerge[i] = isEncodedRegionName(nameofRegionsToMerge[i]) ?
encodedNameofRegionsToMerge[i] = HRegionInfo.isEncodedRegionName(nameofRegionsToMerge[i]) ?
nameofRegionsToMerge[i] : HRegionInfo.encodeRegionName(nameofRegionsToMerge[i]).getBytes();
}

View File

@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -200,4 +202,98 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
assertTrue(Bytes.equals(regionName, pair.getFirst().getRegionName()));
}
}
@Test
public void testMergeRegions() throws Exception {
final TableName tableName = TableName.valueOf("testMergeRegions");
HColumnDescriptor cd = new HColumnDescriptor("d");
HTableDescriptor td = new HTableDescriptor(tableName);
td.addFamily(cd);
byte[][] splitRows = new byte[][] { "3".getBytes(), "6".getBytes() };
Admin syncAdmin = TEST_UTIL.getAdmin();
try {
TEST_UTIL.createTable(td, splitRows);
TEST_UTIL.waitTableAvailable(tableName);
List<HRegionInfo> tableRegions;
HRegionInfo regionA;
HRegionInfo regionB;
// merge with full name
tableRegions = syncAdmin.getTableRegions(tableName);
assertEquals(3, syncAdmin.getTableRegions(tableName).size());
regionA = tableRegions.get(0);
regionB = tableRegions.get(1);
admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
assertEquals(2, syncAdmin.getTableRegions(tableName).size());
// merge with encoded name
tableRegions = syncAdmin.getTableRegions(tableName);
regionA = tableRegions.get(0);
regionB = tableRegions.get(1);
admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
assertEquals(1, syncAdmin.getTableRegions(tableName).size());
} finally {
syncAdmin.disableTable(tableName);
syncAdmin.deleteTable(tableName);
}
}
@Test
public void testSplitTable() throws Exception {
splitTests(TableName.valueOf("testSplitTable"), 3000, false, null);
splitTests(TableName.valueOf("testSplitTableWithSplitPoint"), 3000, false, Bytes.toBytes("3"));
splitTests(TableName.valueOf("testSplitRegion"), 3000, true, null);
splitTests(TableName.valueOf("testSplitRegionWithSplitPoint"), 3000, true, Bytes.toBytes("3"));
}
private void splitTests(TableName tableName, int rowCount, boolean isSplitRegion,
byte[] splitPoint) throws Exception {
int count = 0;
// create table
HColumnDescriptor cd = new HColumnDescriptor("d");
HTableDescriptor td = new HTableDescriptor(tableName);
td.addFamily(cd);
Table table = TEST_UTIL.createTable(td, null);
TEST_UTIL.waitTableAvailable(tableName);
List<HRegionInfo> regions = TEST_UTIL.getAdmin().getTableRegions(tableName);
assertEquals(regions.size(), 1);
List<Put> puts = new ArrayList<>();
for (int i = 0; i < rowCount; i++) {
Put put = new Put(Bytes.toBytes(i));
put.addColumn(Bytes.toBytes("d"), null, Bytes.toBytes("value" + i));
puts.add(put);
}
table.put(puts);
if (isSplitRegion) {
admin.splitRegion(regions.get(0).getRegionName(), splitPoint).get();
} else {
if (splitPoint == null) {
admin.split(tableName).get();
} else {
admin.split(tableName, splitPoint).get();
}
}
for (int i = 0; i < 45; i++) {
try {
List<HRegionInfo> hRegionInfos = TEST_UTIL.getAdmin().getTableRegions(tableName);
count = hRegionInfos.size();
if (count >= 2) {
break;
}
Thread.sleep(1000L);
} catch (Exception e) {
LOG.error(e);
}
}
assertEquals(count, 2);
}
}