From 5f98ad2053ddc31e0abc6863478db594e4447cf8 Mon Sep 17 00:00:00 2001 From: huzheng Date: Wed, 29 Mar 2017 18:37:33 +0800 Subject: [PATCH] HBASE-17668: Implement async assgin/offline/move/unassign methods Signed-off-by: zhangduo --- .../hadoop/hbase/client/AsyncAdmin.java | 36 +++++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 147 +++++++++++++++++- .../hbase/client/TestAsyncAdminBase.java | 2 +- .../hbase/client/TestAsyncRegionAdminApi.java | 146 ++++++++++++++++- 4 files changed, 326 insertions(+), 5 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 9945c40cf30..ef7a4f2badd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -431,4 +431,40 @@ public interface AsyncAdmin { * @param splitPoint the explicit position to split on */ CompletableFuture splitRegion(final byte[] regionName, final byte[] splitPoint); + + /** + * @param regionName Encoded or full name of region to assign. + */ + CompletableFuture assign(final byte[] regionName); + + /** + * Unassign a region from current hosting regionserver. Region will then be assigned to a + * regionserver chosen at random. Region could be reassigned back to the same server. Use + * {@link #move(byte[], byte[])} if you want to control the region movement. + * @param regionName Encoded or full name of region to unassign. Will clear any existing + * RegionPlan if one found. + * @param force If true, force unassign (Will remove region from regions-in-transition too if + * present. If results in double assignment use hbck -fix to resolve. To be used by + * experts). + */ + CompletableFuture unassign(final byte[] regionName, final boolean force); + + /** + * Offline specified region from master's in-memory state. It will not attempt to reassign the + * region as in unassign. This API can be used when a region not served by any region server and + * still online as per Master's in memory state. If this API is incorrectly used on active region + * then master will loose track of that region. This is a special method that should be used by + * experts or hbck. + * @param regionName Encoded or full name of region to offline + */ + CompletableFuture offline(final byte[] regionName); + + /** + * Move the region r to dest. + * @param regionName Encoded or full name of region to move. + * @param destServerName The servername of the destination regionserver. If passed the empty byte + * array we'll assign to a random server. A server name is made of host, port and + * startcode. Here is an example: host187.example.com,60020,1289493121758 + */ + CompletableFuture move(final byte[] regionName, final byte[] destServerName); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 54f0766f3c3..e42ee5712a7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.regex.Pattern; @@ -68,6 +67,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegion import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; @@ -105,10 +106,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColu import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.Pair; @@ -260,7 +267,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin { LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex); failed.add(table); } - })).toArray(size -> new CompletableFuture[size]); + })). toArray(size -> new CompletableFuture[size]); CompletableFuture.allOf(futures).thenAccept((v) -> { future.complete(failed.toArray(new HTableDescriptor[failed.size()])); }); @@ -616,7 +623,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin { return this. procedureCall( RequestConverter.buildDeleteNamespaceRequest(name), (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), - new ModifyNamespaceProcedureBiConsumer(this, name)); + new DeleteNamespaceProcedureBiConsumer(this, name)); } @Override @@ -1008,6 +1015,140 @@ public class AsyncHBaseAdmin implements AsyncAdmin { .serverName(sn).call(); } + /** + * Turn regionNameOrEncodedRegionName into regionName, if region does not found, then it'll throw + * an IllegalArgumentException wrapped by a {@link CompletableFuture} + * @param regionNameOrEncodedRegionName + * @return + */ + CompletableFuture getRegionName(byte[] regionNameOrEncodedRegionName) { + CompletableFuture future = new CompletableFuture<>(); + if (Bytes + .equals(regionNameOrEncodedRegionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName()) + || Bytes.equals(regionNameOrEncodedRegionName, + HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) { + future.complete(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); + return future; + } + + getRegion(regionNameOrEncodedRegionName).whenComplete((p, err) -> { + if (err != null) { + future.completeExceptionally(err); + } + if (p != null && p.getFirst() != null) { + future.complete(p.getFirst().getRegionName()); + } else { + future.completeExceptionally( + new IllegalArgumentException("Invalid region name or encoded region name: " + + Bytes.toStringBinary(regionNameOrEncodedRegionName))); + } + }); + return future; + } + + @Override + public CompletableFuture assign(byte[] regionName) { + CompletableFuture future = new CompletableFuture<>(); + getRegionName(regionName).whenComplete((fullRegionName, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else { + this. newMasterCaller() + .action( + ((controller, stub) -> this. call( + controller, stub, RequestConverter.buildAssignRegionRequest(fullRegionName), + (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))) + .call().whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + } + }); + return future; + } + + @Override + public CompletableFuture unassign(byte[] regionName, boolean force) { + CompletableFuture future = new CompletableFuture<>(); + getRegionName(regionName).whenComplete((fullRegionName, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else { + this. newMasterCaller() + .action(((controller, stub) -> this + . call(controller, stub, + RequestConverter.buildUnassignRegionRequest(fullRegionName, force), + (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))) + .call().whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + } + }); + return future; + } + + @Override + public CompletableFuture offline(byte[] regionName) { + CompletableFuture future = new CompletableFuture<>(); + getRegionName(regionName).whenComplete((fullRegionName, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else { + this. newMasterCaller() + .action( + ((controller, stub) -> this. call( + controller, stub, RequestConverter.buildOfflineRegionRequest(fullRegionName), + (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))) + .call().whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + } + }); + return future; + } + + @Override + public CompletableFuture move(byte[] regionName, byte[] destServerName) { + CompletableFuture future = new CompletableFuture<>(); + getRegionName(regionName).whenComplete((fullRegionName, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else { + final MoveRegionRequest request; + try { + request = RequestConverter.buildMoveRegionRequest( + Bytes.toBytes(HRegionInfo.encodeRegionName(fullRegionName)), destServerName); + } catch (DeserializationException e) { + future.completeExceptionally(e); + return; + } + this. newMasterCaller() + .action((controller, stub) -> this. call( + controller, stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), + resp -> null)) + .call().whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + } + }); + return future; + } + private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { if (numRegions < 3) { throw new IllegalArgumentException("Must create at least three regions"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java index 583ec64681b..f0dee0a1276 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java @@ -50,7 +50,7 @@ public abstract class TestAsyncAdminBase { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1000); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 3000); TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); - TEST_UTIL.startMiniCluster(1); + TEST_UTIL.startMiniCluster(2); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java index 980e07a6300..038d6d4fb74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -30,13 +31,20 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Pair; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -209,7 +217,7 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { HColumnDescriptor cd = new HColumnDescriptor("d"); HTableDescriptor td = new HTableDescriptor(tableName); td.addFamily(cd); - byte[][] splitRows = new byte[][] { "3".getBytes(), "6".getBytes() }; + byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") }; Admin syncAdmin = TEST_UTIL.getAdmin(); try { TEST_UTIL.createTable(td, splitRows); @@ -296,4 +304,140 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { assertEquals(count, 2); } + @Test + public void testAssignRegionAndUnassignRegion() throws Exception { + final TableName tableName = TableName.valueOf("testAssignRegionAndUnassignRegion"); + try { + // create test table + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin.createTable(desc).get(); + + // add region to meta. + Table meta = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); + HRegionInfo hri = + new HRegionInfo(desc.getTableName(), Bytes.toBytes("A"), Bytes.toBytes("Z")); + MetaTableAccessor.addRegionToMeta(meta, hri); + + // assign region. + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + AssignmentManager am = master.getAssignmentManager(); + admin.assign(hri.getRegionName()).get(); + am.waitForAssignment(hri); + + // assert region on server + RegionStates regionStates = am.getRegionStates(); + ServerName serverName = regionStates.getRegionServerOfRegion(hri); + TEST_UTIL.assertRegionOnServer(hri, serverName, 200); + assertTrue(regionStates.getRegionState(hri).isOpened()); + + // Region is assigned now. Let's assign it again. + // Master should not abort, and region should be assigned. + admin.assign(hri.getRegionName()).get(); + am.waitForAssignment(hri); + assertTrue(regionStates.getRegionState(hri).isOpened()); + + // unassign region + admin.unassign(hri.getRegionName(), true).get(); + am.waitForAssignment(hri); + assertTrue(regionStates.getRegionState(hri).isOpened()); + } finally { + TEST_UTIL.deleteTable(tableName); + } + } + + HRegionInfo createTableAndGetOneRegion(final TableName tableName) + throws IOException, InterruptedException { + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5); + + // wait till the table is assigned + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + long timeoutTime = System.currentTimeMillis() + 3000; + while (true) { + List regions = + master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName); + if (regions.size() > 3) { + return regions.get(2); + } + long now = System.currentTimeMillis(); + if (now > timeoutTime) { + fail("Could not find an online region"); + } + Thread.sleep(10); + } + } + + @Test + public void testOfflineRegion() throws Exception { + final TableName tableName = TableName.valueOf("testOfflineRegion"); + try { + HRegionInfo hri = createTableAndGetOneRegion(tableName); + + RegionStates regionStates = + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); + ServerName serverName = regionStates.getRegionServerOfRegion(hri); + TEST_UTIL.assertRegionOnServer(hri, serverName, 200); + admin.offline(hri.getRegionName()).get(); + + long timeoutTime = System.currentTimeMillis() + 3000; + while (true) { + if (regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OFFLINE) + .contains(hri)) + break; + long now = System.currentTimeMillis(); + if (now > timeoutTime) { + fail("Failed to offline the region in time"); + break; + } + Thread.sleep(10); + } + RegionState regionState = regionStates.getRegionState(hri); + assertTrue(regionState.isOffline()); + } finally { + TEST_UTIL.deleteTable(tableName); + } + } + + @Test + public void testMoveRegion() throws Exception { + final TableName tableName = TableName.valueOf("testMoveRegion"); + try { + HRegionInfo hri = createTableAndGetOneRegion(tableName); + + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + RegionStates regionStates = master.getAssignmentManager().getRegionStates(); + ServerName serverName = regionStates.getRegionServerOfRegion(hri); + ServerManager serverManager = master.getServerManager(); + ServerName destServerName = null; + List regionServers = + TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads(); + for (JVMClusterUtil.RegionServerThread regionServer : regionServers) { + HRegionServer destServer = regionServer.getRegionServer(); + destServerName = destServer.getServerName(); + if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) { + break; + } + } + assertTrue(destServerName != null && !destServerName.equals(serverName)); + admin.move(hri.getEncodedNameAsBytes(), Bytes.toBytes(destServerName.getServerName())).get(); + + long timeoutTime = System.currentTimeMillis() + 30000; + while (true) { + ServerName sn = regionStates.getRegionServerOfRegion(hri); + if (sn != null && sn.equals(destServerName)) { + TEST_UTIL.assertRegionOnServer(hri, sn, 200); + break; + } + long now = System.currentTimeMillis(); + if (now > timeoutTime) { + fail("Failed to move the region in time: " + regionStates.getRegionState(hri)); + } + regionStates.waitForUpdate(50); + } + } finally { + TEST_UTIL.deleteTable(tableName); + } + } }