From 7bebdff6a254b1ee8d58ef2a1d5a8200f69446de Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 9 Jan 2019 17:11:17 +0800 Subject: [PATCH] HBASE-21580 Support getting Hbck instance from AsyncConnection --- .../hadoop/hbase/client/AsyncConnection.java | 33 ++++++++++++ .../hbase/client/AsyncConnectionImpl.java | 53 ++++++++++++++----- .../client/ConnectionImplementation.java | 14 ++--- .../apache/hadoop/hbase/client/HBaseHbck.java | 6 +-- .../apache/hadoop/hbase/client/TestHbck.java | 42 +++++++++++---- 5 files changed, 116 insertions(+), 32 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java index eda239429b4..5640eb337f1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java @@ -18,9 +18,13 @@ package org.apache.hadoop.hbase.client; import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -181,4 +185,33 @@ public interface AsyncConnection extends Closeable { * @param pool the thread pool to use for executing callback */ AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, ExecutorService pool); + + /** + * Retrieve an Hbck implementation to fix an HBase cluster. The returned Hbck is not guaranteed to + * be thread-safe. A new instance should be created by each thread. This is a lightweight + * operation. Pooling or caching of the returned Hbck instance is not recommended. + *

+ * The caller is responsible for calling {@link Hbck#close()} on the returned Hbck instance. + *

+ * This will be used mostly by hbck tool. + * @return an Hbck instance for active master. Active master is fetched from the zookeeper. + */ + @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK) + CompletableFuture getHbck(); + + /** + * Retrieve an Hbck implementation to fix an HBase cluster. The returned Hbck is not guaranteed to + * be thread-safe. A new instance should be created by each thread. This is a lightweight + * operation. Pooling or caching of the returned Hbck instance is not recommended. + *

+ * The caller is responsible for calling {@link Hbck#close()} on the returned Hbck instance. + *

+ * This will be used mostly by hbck tool. This may only be used to by pass getting registered + * master from ZK. In situations where ZK is not available or active master is not registered with + * ZK and user can get master address by other means, master can be explicitly specified. + * @param masterServer explicit {@link ServerName} for master server + * @return an Hbck instance for a specified master server + */ + @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK) + Hbck getHbck(ServerName masterServer) throws IOException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 361d5b2f926..1b99f84ea20 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -20,13 +20,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; - -import org.apache.hadoop.hbase.AuthUtil; -import org.apache.hadoop.hbase.ChoreService; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; import java.util.concurrent.CompletableFuture; @@ -35,28 +29,35 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.AuthUtil; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.CollectionUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; + import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; -import org.apache.hadoop.hbase.util.CollectionUtils; -import org.apache.hadoop.hbase.util.Threads; /** * The implementation of AsyncConnection. @@ -325,4 +326,30 @@ class AsyncConnectionImpl implements AsyncConnection { return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool), RETRY_TIMER); } + + @Override + public CompletableFuture getHbck() { + CompletableFuture future = new CompletableFuture<>(); + addListener(registry.getMasterAddress(), (sn, error) -> { + if (error != null) { + future.completeExceptionally(error); + } else { + try { + future.complete(getHbck(sn)); + } catch (IOException e) { + future.completeExceptionally(e); + } + } + }); + return future; + } + + @Override + public Hbck getHbck(ServerName masterServer) throws IOException { + // we will not create a new connection when creating a new protobuf stub, and for hbck there + // will be no performance consideration, so for simplification we will create a new stub every + // time instead of caching the stub here. + return new HBaseHbck(MasterProtos.HbckService.newBlockingStub( + rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 992a95c5692..4e3543f4d66 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -438,14 +438,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable { throw new RegionServerStoppedException(masterServer + " is dead."); } String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(), - masterServer, this.hostnamesCanChange); + masterServer, this.hostnamesCanChange); - return new HBaseHbck(this, - (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { - BlockingRpcChannel channel = - this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout); - return MasterProtos.HbckService.newBlockingStub(channel); - })); + return new HBaseHbck( + (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { + BlockingRpcChannel channel = + this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout); + return MasterProtos.HbckService.newBlockingStub(channel); + }), rpcControllerFactory); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java index eb39a2d8195..a276017b0ce 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -67,9 +67,9 @@ public class HBaseHbck implements Hbck { private RpcControllerFactory rpcControllerFactory; - HBaseHbck(ClusterConnection connection, BlockingInterface hbck) throws IOException { + HBaseHbck(BlockingInterface hbck, RpcControllerFactory rpcControllerFactory) { this.hbck = hbck; - this.rpcControllerFactory = connection.getRpcControllerFactory(); + this.rpcControllerFactory = rpcControllerFactory; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java index 8d9380f9745..2951600be42 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java @@ -21,11 +21,9 @@ package org.apache.hadoop.hbase.client; import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; -import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; - import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.ServerName; @@ -49,9 +47,15 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; /** @@ -59,6 +63,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; * Spins up the minicluster once at test start and then takes it down afterward. * Add any testing of HBaseHbck functionality here. */ +@RunWith(Parameterized.class) @Category({LargeTests.class, ClientTests.class}) public class TestHbck { @ClassRule @@ -71,19 +76,39 @@ public class TestHbck { @Rule public TestName name = new TestName(); + @Parameter + public boolean async; + private static final TableName TABLE_NAME = TableName.valueOf(TestHbck.class.getSimpleName()); private static ProcedureExecutor procExec; + private static AsyncConnection ASYNC_CONN; + + @Parameters(name = "{index}: async={0}") + public static List params() { + return Arrays.asList(new Object[] { false }, new Object[] { true }); + } + + private Hbck getHbck() throws Exception { + if (async) { + return ASYNC_CONN.getHbck().get(); + } else { + return TEST_UTIL.getHbck(); + } + } + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniCluster(3); TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("family1"), 5); procExec = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); } @AfterClass public static void tearDownAfterClass() throws Exception { + Closeables.close(ASYNC_CONN, true); TEST_UTIL.shutdownMiniCluster(); } @@ -120,16 +145,15 @@ public class TestHbck { //bypass the procedure List pids = Arrays.asList(procId); - List results = - TEST_UTIL.getHbck().bypassProcedure(pids, 30000, false, false); + List results = getHbck().bypassProcedure(pids, 30000, false, false); assertTrue("Failed to by pass procedure!", results.get(0)); TEST_UTIL.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); LOG.info("{} finished", proc); } @Test - public void testSetTableStateInMeta() throws IOException { - Hbck hbck = TEST_UTIL.getHbck(); + public void testSetTableStateInMeta() throws Exception { + Hbck hbck = getHbck(); // set table state to DISABLED hbck.setTableStateInMeta(new TableState(TABLE_NAME, TableState.State.DISABLED)); // Method {@link Hbck#setTableStateInMeta()} returns previous state, which in this case @@ -141,8 +165,8 @@ public class TestHbck { } @Test - public void testAssigns() throws IOException { - Hbck hbck = TEST_UTIL.getHbck(); + public void testAssigns() throws Exception { + Hbck hbck = getHbck(); try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { List regions = admin.getRegions(TABLE_NAME); for (RegionInfo ri: regions) { @@ -183,7 +207,7 @@ public class TestHbck { TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), Bytes.toBytes("family1"), true); ServerName serverName = testRs.getServerName(); - Hbck hbck = TEST_UTIL.getHbck(); + Hbck hbck = getHbck(); List pids = hbck.scheduleServerCrashProcedure(Arrays.asList(ProtobufUtil.toServerName(serverName))); assertTrue(pids.get(0) > 0);