HBASE-21580 Support getting Hbck instance from AsyncConnection

This commit is contained in:
Duo Zhang 2019-01-09 17:11:17 +08:00
parent 5d32e80f9e
commit 7bebdff6a2
5 changed files with 116 additions and 32 deletions

View File

@ -18,9 +18,13 @@
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
@ -181,4 +185,33 @@ public interface AsyncConnection extends Closeable {
* @param pool the thread pool to use for executing callback
*/
AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, ExecutorService pool);
/**
* Retrieve an Hbck implementation to fix an HBase cluster. The returned Hbck is not guaranteed to
* be thread-safe. A new instance should be created by each thread. This is a lightweight
* operation. Pooling or caching of the returned Hbck instance is not recommended.
* <p/>
* The caller is responsible for calling {@link Hbck#close()} on the returned Hbck instance.
* <p/>
* This will be used mostly by hbck tool.
* @return an Hbck instance for active master. Active master is fetched from the zookeeper.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
CompletableFuture<Hbck> getHbck();
/**
* Retrieve an Hbck implementation to fix an HBase cluster. The returned Hbck is not guaranteed to
* be thread-safe. A new instance should be created by each thread. This is a lightweight
* operation. Pooling or caching of the returned Hbck instance is not recommended.
* <p/>
* The caller is responsible for calling {@link Hbck#close()} on the returned Hbck instance.
* <p/>
* This will be used mostly by hbck tool. This may only be used to by pass getting registered
* master from ZK. In situations where ZK is not available or active master is not registered with
* ZK and user can get master address by other means, master can be explicitly specified.
* @param masterServer explicit {@link ServerName} for master server
* @return an Hbck instance for a specified master server
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
Hbck getHbck(ServerName masterServer) throws IOException;
}

View File

@ -20,13 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@ -35,28 +29,35 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.Threads;
/**
* The implementation of AsyncConnection.
@ -325,4 +326,30 @@ class AsyncConnectionImpl implements AsyncConnection {
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
RETRY_TIMER);
}
@Override
public CompletableFuture<Hbck> getHbck() {
CompletableFuture<Hbck> future = new CompletableFuture<>();
addListener(registry.getMasterAddress(), (sn, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {
try {
future.complete(getHbck(sn));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
});
return future;
}
@Override
public Hbck getHbck(ServerName masterServer) throws IOException {
// we will not create a new connection when creating a new protobuf stub, and for hbck there
// will be no performance consideration, so for simplification we will create a new stub every
// time instead of caching the stub here.
return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
}
}

View File

@ -438,14 +438,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
throw new RegionServerStoppedException(masterServer + " is dead.");
}
String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(),
masterServer, this.hostnamesCanChange);
masterServer, this.hostnamesCanChange);
return new HBaseHbck(this,
(MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
BlockingRpcChannel channel =
this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
return MasterProtos.HbckService.newBlockingStub(channel);
}));
return new HBaseHbck(
(MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
BlockingRpcChannel channel =
this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
return MasterProtos.HbckService.newBlockingStub(channel);
}), rpcControllerFactory);
}
@Override

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -67,9 +67,9 @@ public class HBaseHbck implements Hbck {
private RpcControllerFactory rpcControllerFactory;
HBaseHbck(ClusterConnection connection, BlockingInterface hbck) throws IOException {
HBaseHbck(BlockingInterface hbck, RpcControllerFactory rpcControllerFactory) {
this.hbck = hbck;
this.rpcControllerFactory = connection.getRpcControllerFactory();
this.rpcControllerFactory = rpcControllerFactory;
}
@Override

View File

@ -21,11 +21,9 @@ package org.apache.hadoop.hbase.client;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
@ -49,9 +47,15 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
@ -59,6 +63,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
* Spins up the minicluster once at test start and then takes it down afterward.
* Add any testing of HBaseHbck functionality here.
*/
@RunWith(Parameterized.class)
@Category({LargeTests.class, ClientTests.class})
public class TestHbck {
@ClassRule
@ -71,19 +76,39 @@ public class TestHbck {
@Rule
public TestName name = new TestName();
@Parameter
public boolean async;
private static final TableName TABLE_NAME = TableName.valueOf(TestHbck.class.getSimpleName());
private static ProcedureExecutor<MasterProcedureEnv> procExec;
private static AsyncConnection ASYNC_CONN;
@Parameters(name = "{index}: async={0}")
public static List<Object[]> params() {
return Arrays.asList(new Object[] { false }, new Object[] { true });
}
private Hbck getHbck() throws Exception {
if (async) {
return ASYNC_CONN.getHbck().get();
} else {
return TEST_UTIL.getHbck();
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("family1"), 5);
procExec = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
Closeables.close(ASYNC_CONN, true);
TEST_UTIL.shutdownMiniCluster();
}
@ -120,16 +145,15 @@ public class TestHbck {
//bypass the procedure
List<Long> pids = Arrays.<Long>asList(procId);
List<Boolean> results =
TEST_UTIL.getHbck().bypassProcedure(pids, 30000, false, false);
List<Boolean> results = getHbck().bypassProcedure(pids, 30000, false, false);
assertTrue("Failed to by pass procedure!", results.get(0));
TEST_UTIL.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
LOG.info("{} finished", proc);
}
@Test
public void testSetTableStateInMeta() throws IOException {
Hbck hbck = TEST_UTIL.getHbck();
public void testSetTableStateInMeta() throws Exception {
Hbck hbck = getHbck();
// set table state to DISABLED
hbck.setTableStateInMeta(new TableState(TABLE_NAME, TableState.State.DISABLED));
// Method {@link Hbck#setTableStateInMeta()} returns previous state, which in this case
@ -141,8 +165,8 @@ public class TestHbck {
}
@Test
public void testAssigns() throws IOException {
Hbck hbck = TEST_UTIL.getHbck();
public void testAssigns() throws Exception {
Hbck hbck = getHbck();
try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
List<RegionInfo> regions = admin.getRegions(TABLE_NAME);
for (RegionInfo ri: regions) {
@ -183,7 +207,7 @@ public class TestHbck {
TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), Bytes.toBytes("family1"),
true);
ServerName serverName = testRs.getServerName();
Hbck hbck = TEST_UTIL.getHbck();
Hbck hbck = getHbck();
List<Long> pids =
hbck.scheduleServerCrashProcedure(Arrays.asList(ProtobufUtil.toServerName(serverName)));
assertTrue(pids.get(0) > 0);