diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java index 045f866af14..b98d2106dec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.util.HashKey; import org.apache.hadoop.hbase.util.JenkinsHash; import org.apache.hadoop.hbase.util.MD5Hash; import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.util.StringUtils; /** * Information about a region. A region is a range of keys in the whole keyspace of a table, an @@ -582,6 +583,19 @@ public class HRegionInfo implements Comparable { return elements; } + public static boolean isEncodedRegionName(byte[] regionName) throws IOException { + try { + HRegionInfo.parseRegionName(regionName); + return false; + } catch (IOException e) { + if (StringUtils.stringifyException(e) + .contains(HRegionInfo.INVALID_REGION_NAME_FORMAT_MESSAGE)) { + return true; + } + throw e; + } + } + /** @return the regionId */ public long getRegionId(){ return regionId; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 630ae47c1d1..5a13edee5a7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -364,4 +364,40 @@ public interface AsyncAdmin { * @param hri */ CompletableFuture closeRegion(ServerName sn, HRegionInfo hri); + + /** + * Merge two regions. + * @param nameOfRegionA encoded or full name of region a + * @param nameOfRegionB encoded or full name of region b + * @param forcible true if do a compulsory merge, otherwise we will only merge two adjacent + * regions + */ + CompletableFuture mergeRegions(final byte[] nameOfRegionA, final byte[] nameOfRegionB, + final boolean forcible); + + /** + * Split a table. The method will execute split action for each region in table. + * @param tableName table to split + */ + CompletableFuture split(final TableName tableName); + + /** + * Split an individual region. + * @param regionName region to split + */ + CompletableFuture splitRegion(final byte[] regionName); + + /** + * Split a table. + * @param tableName table to split + * @param splitPoint the explicit position to split on + */ + CompletableFuture split(final TableName tableName, final byte[] splitPoint); + + /** + * Split an individual region. + * @param regionName region to split + * @param splitPoint the explicit position to split on + */ + CompletableFuture splitRegion(final byte[] regionName, final byte[] splitPoint); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index e2dc3d521c1..5ae30d75d37 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -20,14 +20,18 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -35,6 +39,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.MetaTableAccessor.QueryType; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; @@ -49,6 +54,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; import org.apache.hadoop.hbase.client.Scan.ReadType; +import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -56,6 +62,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; @@ -90,6 +98,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancer import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; @@ -670,6 +680,227 @@ public class AsyncHBaseAdmin implements AsyncAdmin { .serverName(sn).call(); } + private byte[] toEncodeRegionName(byte[] regionName) { + try { + return HRegionInfo.isEncodedRegionName(regionName) ? regionName + : Bytes.toBytes(HRegionInfo.encodeRegionName(regionName)); + } catch (IOException e) { + return regionName; + } + } + + private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference tableName, + CompletableFuture result) { + getRegion(encodeRegionName).whenComplete((p, err) -> { + if (err != null) { + result.completeExceptionally(err); + return; + } + if (p == null) { + result.completeExceptionally(new UnknownRegionException( + "Can't invoke merge on unknown region " + Bytes.toStringBinary(encodeRegionName))); + return; + } + if (p.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { + result.completeExceptionally( + new IllegalArgumentException("Can't invoke merge on non-default regions directly")); + return; + } + if (!tableName.compareAndSet(null, p.getFirst().getTable())) { + if (!tableName.get().equals(p.getFirst().getTable())) { + // tables of this two region should be same. + result.completeExceptionally( + new IllegalArgumentException("Cannot merge regions from two different tables " + + tableName.get() + " and " + p.getFirst().getTable())); + } else { + result.complete(tableName.get()); + } + } + }); + } + + private CompletableFuture checkRegionsAndGetTableName(byte[] encodeRegionNameA, + byte[] encodeRegionNameB) { + AtomicReference tableNameRef = new AtomicReference<>(); + CompletableFuture future = new CompletableFuture<>(); + + checkAndGetTableName(encodeRegionNameA, tableNameRef, future); + checkAndGetTableName(encodeRegionNameB, tableNameRef, future); + return future; + } + + @Override + public CompletableFuture mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB, + boolean forcible) { + CompletableFuture future = new CompletableFuture<>(); + final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA); + final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB); + + checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB) + .whenComplete((tableName, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + + MergeTableRegionsRequest request = null; + try { + request = RequestConverter.buildMergeTableRegionsRequest( + new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(), + ng.newNonce()); + } catch (DeserializationException e) { + future.completeExceptionally(e); + return; + } + + this. procedureCall(request, + (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(), + new MergeTableRegionProcedureBiConsumer(this, tableName)).whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + + }); + return future; + } + + @Override + public CompletableFuture split(TableName tableName) { + CompletableFuture future = new CompletableFuture<>(); + tableExists(tableName).whenComplete((exist, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + if (!exist) { + future.completeExceptionally(new TableNotFoundException(tableName)); + return; + } + metaTable + .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) + .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION)) + .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))) + .whenComplete((results, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + if (results != null && !results.isEmpty()) { + List> splitFutures = new ArrayList<>(); + for (Result r : results) { + if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue; + RegionLocations rl = MetaTableAccessor.getRegionLocations(r); + if (rl != null) { + for (HRegionLocation h : rl.getRegionLocations()) { + if (h != null && h.getServerName() != null) { + HRegionInfo hri = h.getRegionInfo(); + if (hri == null || hri.isSplitParent() + || hri.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) + continue; + splitFutures.add(split(h.getServerName(), hri, null)); + } + } + } + } + CompletableFuture + .allOf(splitFutures.toArray(new CompletableFuture[splitFutures.size()])) + .whenComplete((ret, exception) -> { + if (exception != null) { + future.completeExceptionally(exception); + return; + } + future.complete(ret); + }); + } else { + future.complete(null); + } + }); + }); + return future; + } + + @Override + public CompletableFuture splitRegion(byte[] regionName) { + return splitRegion(regionName, null); + } + + @Override + public CompletableFuture split(TableName tableName, byte[] splitPoint) { + CompletableFuture result = new CompletableFuture<>(); + if (splitPoint == null) { + return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); + } + connection.getRegionLocator(tableName).getRegionLocation(splitPoint) + .whenComplete((loc, err) -> { + if (err != null) { + result.completeExceptionally(err); + } else if (loc == null || loc.getRegionInfo() == null) { + result.completeExceptionally(new IllegalArgumentException( + "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); + } else { + splitRegion(loc.getRegionInfo().getRegionName(), splitPoint) + .whenComplete((ret, err2) -> { + if (err2 != null) { + result.completeExceptionally(err2); + } else { + result.complete(ret); + } + + }); + } + }); + return result; + } + + @Override + public CompletableFuture splitRegion(byte[] regionName, byte[] splitPoint) { + CompletableFuture future = new CompletableFuture<>(); + getRegion(regionName).whenComplete((p, err) -> { + if (p == null) { + future.completeExceptionally( + new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName))); + return; + } + if (p.getFirst() != null && p.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { + future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " + + "Replicas are auto-split when their primary is split.")); + return; + } + if (p.getSecond() == null) { + future.completeExceptionally( + new NoServerForRegionException(Bytes.toStringBinary(regionName))); + return; + } + split(p.getSecond(), p.getFirst(), splitPoint).whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + + @VisibleForTesting + public CompletableFuture split(final ServerName sn, final HRegionInfo hri, + byte[] splitPoint) { + if (hri.getStartKey() != null && splitPoint != null + && Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) { + return failedFuture( + new IllegalArgumentException("should not give a splitkey which equals to startkey!")); + } + return this. newAdminCaller() + .action( + (controller, stub) -> this. adminCall( + controller, stub, ProtobufUtil.buildSplitRegionRequest(hri.getRegionName(), splitPoint), + (s, c, req, done) -> s.splitRegion(controller, req, done), resp -> null)) + .serverName(sn).call(); + } + private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { if (numRegions < 3) { throw new IllegalArgumentException("Must create at least three regions"); @@ -885,6 +1116,17 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } } + private class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { + + MergeTableRegionProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { + super(admin, tableName); + } + + String getOperationType() { + return "MERGE_REGIONS"; + } + } + private CompletableFuture waitProcedureResult(CompletableFuture procFuture) { CompletableFuture future = new CompletableFuture<>(); procFuture.whenComplete((procId, error) -> { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 13680389604..155a272accc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1497,19 +1497,6 @@ public class HBaseAdmin implements Admin { }); } - private boolean isEncodedRegionName(byte[] regionName) throws IOException { - try { - HRegionInfo.parseRegionName(regionName); - return false; - } catch (IOException e) { - if (StringUtils.stringifyException(e) - .contains(HRegionInfo.INVALID_REGION_NAME_FORMAT_MESSAGE)) { - return true; - } - throw e; - } - } - /** * Merge two regions. Synchronous operation. * Note: It is not feasible to predict the length of merge. @@ -1582,7 +1569,7 @@ public class HBaseAdmin implements Admin { assert(nameofRegionsToMerge.length >= 2); byte[][] encodedNameofRegionsToMerge = new byte[nameofRegionsToMerge.length][]; for(int i = 0; i < nameofRegionsToMerge.length; i++) { - encodedNameofRegionsToMerge[i] = isEncodedRegionName(nameofRegionsToMerge[i]) ? + encodedNameofRegionsToMerge[i] = HRegionInfo.isEncodedRegionName(nameofRegionsToMerge[i]) ? nameofRegionsToMerge[i] : HRegionInfo.encodeRegionName(nameofRegionsToMerge[i]).getBytes(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java index 383b28f5694..980e07a6300 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hbase.client; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -200,4 +202,98 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { assertTrue(Bytes.equals(regionName, pair.getFirst().getRegionName())); } } + + @Test + public void testMergeRegions() throws Exception { + final TableName tableName = TableName.valueOf("testMergeRegions"); + HColumnDescriptor cd = new HColumnDescriptor("d"); + HTableDescriptor td = new HTableDescriptor(tableName); + td.addFamily(cd); + byte[][] splitRows = new byte[][] { "3".getBytes(), "6".getBytes() }; + Admin syncAdmin = TEST_UTIL.getAdmin(); + try { + TEST_UTIL.createTable(td, splitRows); + TEST_UTIL.waitTableAvailable(tableName); + + List tableRegions; + HRegionInfo regionA; + HRegionInfo regionB; + + // merge with full name + tableRegions = syncAdmin.getTableRegions(tableName); + assertEquals(3, syncAdmin.getTableRegions(tableName).size()); + regionA = tableRegions.get(0); + regionB = tableRegions.get(1); + admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get(); + + assertEquals(2, syncAdmin.getTableRegions(tableName).size()); + + // merge with encoded name + tableRegions = syncAdmin.getTableRegions(tableName); + regionA = tableRegions.get(0); + regionB = tableRegions.get(1); + admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get(); + + assertEquals(1, syncAdmin.getTableRegions(tableName).size()); + } finally { + syncAdmin.disableTable(tableName); + syncAdmin.deleteTable(tableName); + } + } + + @Test + public void testSplitTable() throws Exception { + splitTests(TableName.valueOf("testSplitTable"), 3000, false, null); + splitTests(TableName.valueOf("testSplitTableWithSplitPoint"), 3000, false, Bytes.toBytes("3")); + splitTests(TableName.valueOf("testSplitRegion"), 3000, true, null); + splitTests(TableName.valueOf("testSplitRegionWithSplitPoint"), 3000, true, Bytes.toBytes("3")); + } + + private void splitTests(TableName tableName, int rowCount, boolean isSplitRegion, + byte[] splitPoint) throws Exception { + int count = 0; + // create table + HColumnDescriptor cd = new HColumnDescriptor("d"); + HTableDescriptor td = new HTableDescriptor(tableName); + td.addFamily(cd); + Table table = TEST_UTIL.createTable(td, null); + TEST_UTIL.waitTableAvailable(tableName); + + List regions = TEST_UTIL.getAdmin().getTableRegions(tableName); + assertEquals(regions.size(), 1); + + List puts = new ArrayList<>(); + for (int i = 0; i < rowCount; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.addColumn(Bytes.toBytes("d"), null, Bytes.toBytes("value" + i)); + puts.add(put); + } + table.put(puts); + + if (isSplitRegion) { + admin.splitRegion(regions.get(0).getRegionName(), splitPoint).get(); + } else { + if (splitPoint == null) { + admin.split(tableName).get(); + } else { + admin.split(tableName, splitPoint).get(); + } + } + + for (int i = 0; i < 45; i++) { + try { + List hRegionInfos = TEST_UTIL.getAdmin().getTableRegions(tableName); + count = hRegionInfos.size(); + if (count >= 2) { + break; + } + Thread.sleep(1000L); + } catch (Exception e) { + LOG.error(e); + } + } + + assertEquals(count, 2); + } + }