HBASE-21580 Support getting Hbck instance from AsyncConnection

This commit is contained in:
Duo Zhang 2019-01-09 17:11:17 +08:00
parent 579edf668f
commit 3f750955b5
5 changed files with 113 additions and 29 deletions

View File

@ -18,9 +18,13 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -181,4 +185,33 @@ public interface AsyncConnection extends Closeable {
* @param pool the thread pool to use for executing callback * @param pool the thread pool to use for executing callback
*/ */
AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, ExecutorService pool); AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, ExecutorService pool);
/**
* Retrieve an Hbck implementation to fix an HBase cluster. The returned Hbck is not guaranteed to
* be thread-safe. A new instance should be created by each thread. This is a lightweight
* operation. Pooling or caching of the returned Hbck instance is not recommended.
* <p/>
* The caller is responsible for calling {@link Hbck#close()} on the returned Hbck instance.
* <p/>
* This will be used mostly by hbck tool.
* @return an Hbck instance for active master. Active master is fetched from the zookeeper.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
CompletableFuture<Hbck> getHbck();
/**
* Retrieve an Hbck implementation to fix an HBase cluster. The returned Hbck is not guaranteed to
* be thread-safe. A new instance should be created by each thread. This is a lightweight
* operation. Pooling or caching of the returned Hbck instance is not recommended.
* <p/>
* The caller is responsible for calling {@link Hbck#close()} on the returned Hbck instance.
* <p/>
* This will be used mostly by hbck tool. This may only be used to by pass getting registered
* master from ZK. In situations where ZK is not available or active master is not registered with
* ZK and user can get master address by other means, master can be explicitly specified.
* @param masterServer explicit {@link ServerName} for master server
* @return an Hbck instance for a specified master server
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
Hbck getHbck(ServerName masterServer) throws IOException;
} }

View File

@ -20,10 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -32,28 +29,32 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.Threads;
/** /**
* The implementation of AsyncConnection. * The implementation of AsyncConnection.
@ -309,4 +310,30 @@ class AsyncConnectionImpl implements AsyncConnection {
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool), return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
RETRY_TIMER); RETRY_TIMER);
} }
@Override
public CompletableFuture<Hbck> getHbck() {
CompletableFuture<Hbck> future = new CompletableFuture<>();
addListener(registry.getMasterAddress(), (sn, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {
try {
future.complete(getHbck(sn));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
});
return future;
}
@Override
public Hbck getHbck(ServerName masterServer) throws IOException {
// we will not create a new connection when creating a new protobuf stub, and for hbck there
// will be no performance consideration, so for simplification we will create a new stub every
// time instead of caching the stub here.
return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
}
} }

View File

@ -421,12 +421,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(), String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(),
masterServer, this.hostnamesCanChange); masterServer, this.hostnamesCanChange);
return new HBaseHbck(this, return new HBaseHbck(
(MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
BlockingRpcChannel channel = BlockingRpcChannel channel =
this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout); this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
return MasterProtos.HbckService.newBlockingStub(channel); return MasterProtos.HbckService.newBlockingStub(channel);
})); }), rpcControllerFactory);
} }
@Override @Override

View File

@ -1,4 +1,4 @@
/* /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -67,9 +67,9 @@ public class HBaseHbck implements Hbck {
private RpcControllerFactory rpcControllerFactory; private RpcControllerFactory rpcControllerFactory;
HBaseHbck(ClusterConnection connection, BlockingInterface hbck) throws IOException { HBaseHbck(BlockingInterface hbck, RpcControllerFactory rpcControllerFactory) {
this.hbck = hbck; this.hbck = hbck;
this.rpcControllerFactory = connection.getRpcControllerFactory(); this.rpcControllerFactory = rpcControllerFactory;
} }
@Override @Override

View File

@ -21,11 +21,9 @@ package org.apache.hadoop.hbase.client;
import static junit.framework.TestCase.assertTrue; import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
@ -49,9 +47,15 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/** /**
@ -59,6 +63,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
* Spins up the minicluster once at test start and then takes it down afterward. * Spins up the minicluster once at test start and then takes it down afterward.
* Add any testing of HBaseHbck functionality here. * Add any testing of HBaseHbck functionality here.
*/ */
@RunWith(Parameterized.class)
@Category({LargeTests.class, ClientTests.class}) @Category({LargeTests.class, ClientTests.class})
public class TestHbck { public class TestHbck {
@ClassRule @ClassRule
@ -71,19 +76,39 @@ public class TestHbck {
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
@Parameter
public boolean async;
private static final TableName TABLE_NAME = TableName.valueOf(TestHbck.class.getSimpleName()); private static final TableName TABLE_NAME = TableName.valueOf(TestHbck.class.getSimpleName());
private static ProcedureExecutor<MasterProcedureEnv> procExec; private static ProcedureExecutor<MasterProcedureEnv> procExec;
private static AsyncConnection ASYNC_CONN;
@Parameters(name = "{index}: async={0}")
public static List<Object[]> params() {
return Arrays.asList(new Object[] { false }, new Object[] { true });
}
private Hbck getHbck() throws Exception {
if (async) {
return ASYNC_CONN.getHbck().get();
} else {
return TEST_UTIL.getHbck();
}
}
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("family1"), 5); TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("family1"), 5);
procExec = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); procExec = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
} }
@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
Closeables.close(ASYNC_CONN, true);
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
} }
@ -120,16 +145,15 @@ public class TestHbck {
//bypass the procedure //bypass the procedure
List<Long> pids = Arrays.<Long>asList(procId); List<Long> pids = Arrays.<Long>asList(procId);
List<Boolean> results = List<Boolean> results = getHbck().bypassProcedure(pids, 30000, false, false);
TEST_UTIL.getHbck().bypassProcedure(pids, 30000, false, false);
assertTrue("Failed to by pass procedure!", results.get(0)); assertTrue("Failed to by pass procedure!", results.get(0));
TEST_UTIL.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); TEST_UTIL.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
LOG.info("{} finished", proc); LOG.info("{} finished", proc);
} }
@Test @Test
public void testSetTableStateInMeta() throws IOException { public void testSetTableStateInMeta() throws Exception {
Hbck hbck = TEST_UTIL.getHbck(); Hbck hbck = getHbck();
// set table state to DISABLED // set table state to DISABLED
hbck.setTableStateInMeta(new TableState(TABLE_NAME, TableState.State.DISABLED)); hbck.setTableStateInMeta(new TableState(TABLE_NAME, TableState.State.DISABLED));
// Method {@link Hbck#setTableStateInMeta()} returns previous state, which in this case // Method {@link Hbck#setTableStateInMeta()} returns previous state, which in this case
@ -141,8 +165,8 @@ public class TestHbck {
} }
@Test @Test
public void testAssigns() throws IOException { public void testAssigns() throws Exception {
Hbck hbck = TEST_UTIL.getHbck(); Hbck hbck = getHbck();
try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
List<RegionInfo> regions = admin.getRegions(TABLE_NAME); List<RegionInfo> regions = admin.getRegions(TABLE_NAME);
for (RegionInfo ri: regions) { for (RegionInfo ri: regions) {
@ -183,7 +207,7 @@ public class TestHbck {
TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), Bytes.toBytes("family1"), TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), Bytes.toBytes("family1"),
true); true);
ServerName serverName = testRs.getServerName(); ServerName serverName = testRs.getServerName();
Hbck hbck = TEST_UTIL.getHbck(); Hbck hbck = getHbck();
List<Long> pids = List<Long> pids =
hbck.scheduleServerCrashProcedure(Arrays.asList(ProtobufUtil.toServerName(serverName))); hbck.scheduleServerCrashProcedure(Arrays.asList(ProtobufUtil.toServerName(serverName)));
assertTrue(pids.get(0) > 0); assertTrue(pids.get(0) > 0);