HBASE-23604: Clarify AsyncRegistry usage in the code. (#957)

* HBASE-23604: Cleanup AsyncRegistry interface

- Cleans up the method names to make more sense and adds a little
more javadocs for context. In future patches we can revisit
the name of the actual class to make it more self explanatory.

- Does AsyncRegistry -> ConnectionRegistry rename.
"async" ness of the registry is kind of implicit based on
the interface contents and need not be reflected in the name.

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: stack <stack@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Bharath Vissapragada 2020-01-03 14:27:01 -08:00
parent 023078ffa4
commit 0a1c3b2055
29 changed files with 124 additions and 111 deletions

View File

@ -85,7 +85,7 @@ class AsyncConnectionImpl implements AsyncConnection {
private final User user; private final User user;
final AsyncRegistry registry; final ConnectionRegistry registry;
private final int rpcTimeout; private final int rpcTimeout;
@ -122,7 +122,7 @@ class AsyncConnectionImpl implements AsyncConnection {
private volatile ConnectionOverAsyncConnection conn; private volatile ConnectionOverAsyncConnection conn;
public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId, public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
SocketAddress localAddress, User user) { SocketAddress localAddress, User user) {
this.conf = conf; this.conf = conf;
this.user = user; this.user = user;
@ -136,7 +136,8 @@ class AsyncConnectionImpl implements AsyncConnection {
} else { } else {
this.metrics = Optional.empty(); this.metrics = Optional.empty();
} }
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null)); this.rpcClient = RpcClientFactory.createClient(
conf, clusterId, localAddress, metrics.orElse(null));
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
this.rpcTimeout = this.rpcTimeout =
@ -257,7 +258,7 @@ class AsyncConnectionImpl implements AsyncConnection {
CompletableFuture<MasterService.Interface> getMasterStub() { CompletableFuture<MasterService.Interface> getMasterStub() {
return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> { return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
CompletableFuture<MasterService.Interface> future = new CompletableFuture<>(); CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
addListener(registry.getMasterAddress(), (addr, error) -> { addListener(registry.getActiveMaster(), (addr, error) -> {
if (error != null) { if (error != null) {
future.completeExceptionally(error); future.completeExceptionally(error);
} else if (addr == null) { } else if (addr == null) {
@ -368,7 +369,7 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override @Override
public CompletableFuture<Hbck> getHbck() { public CompletableFuture<Hbck> getHbck() {
CompletableFuture<Hbck> future = new CompletableFuture<>(); CompletableFuture<Hbck> future = new CompletableFuture<>();
addListener(registry.getMasterAddress(), (sn, error) -> { addListener(registry.getActiveMaster(), (sn, error) -> {
if (error != null) { if (error != null) {
future.completeExceptionally(error); future.completeExceptionally(error);
} else { } else {

View File

@ -38,14 +38,14 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
@InterfaceAudience.Private @InterfaceAudience.Private
class AsyncMetaRegionLocator { class AsyncMetaRegionLocator {
private final AsyncRegistry registry; private final ConnectionRegistry registry;
private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>(); private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>();
private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture = private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture =
new AtomicReference<>(); new AtomicReference<>();
AsyncMetaRegionLocator(AsyncRegistry registry) { AsyncMetaRegionLocator(ConnectionRegistry registry) {
this.registry = registry; this.registry = registry;
} }
@ -60,7 +60,7 @@ class AsyncMetaRegionLocator {
*/ */
CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) { CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload, return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload,
registry::getMetaRegionLocation, locs -> isGood(locs, replicaId), "meta region location"); registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location");
} }
private HRegionLocation getCacheLocation(HRegionLocation loc) { private HRegionLocation getCacheLocation(HRegionLocation loc) {

View File

@ -55,7 +55,7 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
@Override @Override
public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() { public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
if (TableName.isMetaTableName(tableName)) { if (TableName.isMetaTableName(tableName)) {
return conn.registry.getMetaRegionLocation() return conn.registry.getMetaRegionLocations()
.thenApply(locs -> Arrays.asList(locs.getRegionLocations())); .thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
} }
return AsyncMetaTableAccessor.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), return AsyncMetaTableAccessor.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME),

View File

@ -279,7 +279,7 @@ public class ConnectionFactory {
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
final User user) { final User user) {
CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf); ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
addListener(registry.getClusterId(), (clusterId, error) -> { addListener(registry.getClusterId(), (clusterId, error) -> {
if (error != null) { if (error != null) {
registry.close(); registry.close();

View File

@ -24,16 +24,17 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
* Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc.. * Registry for meta information needed for connection setup to a HBase cluster. Implementations
* hold cluster information such as this cluster's id, location of hbase:meta, etc..
* Internal use only. * Internal use only.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
interface AsyncRegistry extends Closeable { interface ConnectionRegistry extends Closeable {
/** /**
* Get the location of meta region. * Get the location of meta region(s).
*/ */
CompletableFuture<RegionLocations> getMetaRegionLocation(); CompletableFuture<RegionLocations> getMetaRegionLocations();
/** /**
* Should only be called once. * Should only be called once.
@ -43,9 +44,9 @@ interface AsyncRegistry extends Closeable {
CompletableFuture<String> getClusterId(); CompletableFuture<String> getClusterId();
/** /**
* Get the address of HMaster. * Get the address of active HMaster.
*/ */
CompletableFuture<ServerName> getMasterAddress(); CompletableFuture<ServerName> getActiveMaster();
/** /**
* Closes this instance and releases any system resources associated with it * Closes this instance and releases any system resources associated with it

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -18,26 +18,28 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
/** /**
* Get instance of configured Registry. * Factory class to get the instance of configured connection registry.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
final class AsyncRegistryFactory { final class ConnectionRegistryFactory {
static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
"hbase.client.connection.registry.impl";
private AsyncRegistryFactory() { private ConnectionRegistryFactory() {
} }
/** /**
* @return The cluster registry implementation to use. * @return The connection registry implementation to use.
*/ */
static AsyncRegistry getRegistry(Configuration conf) { static ConnectionRegistry getRegistry(Configuration conf) {
Class<? extends AsyncRegistry> clazz = Class<? extends ConnectionRegistry> clazz = conf.getClass(
conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKAsyncRegistry.class, AsyncRegistry.class); CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
ConnectionRegistry.class);
return ReflectionUtils.newInstance(clazz, conf); return ReflectionUtils.newInstance(clazz, conf);
} }
} }

View File

@ -715,7 +715,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override @Override
public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
if (TableName.isMetaTableName(tableName)) { if (TableName.isMetaTableName(tableName)) {
return connection.registry.getMetaRegionLocation().thenApply(locs -> Stream return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
.of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
} }
CompletableFuture<Boolean> future = new CompletableFuture<>(); CompletableFuture<Boolean> future = new CompletableFuture<>();
@ -853,7 +853,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override @Override
public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
if (tableName.equals(META_TABLE_NAME)) { if (tableName.equals(META_TABLE_NAME)) {
return connection.registry.getMetaRegionLocation() return connection.registry.getMetaRegionLocations()
.thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
.collect(Collectors.toList())); .collect(Collectors.toList()));
} else { } else {
@ -1081,8 +1081,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
if (TableName.META_TABLE_NAME.equals(tableName)) { if (TableName.META_TABLE_NAME.equals(tableName)) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
// For meta table, we use zk to fetch all locations. // For meta table, we use zk to fetch all locations.
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration()); ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(
addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> { connection.getConfiguration());
addListener(registry.getMetaRegionLocations(), (metaRegions, err) -> {
if (err != null) { if (err != null) {
future.completeExceptionally(err); future.completeExceptionally(err);
} else if (metaRegions == null || metaRegions.isEmpty() || } else if (metaRegions == null || metaRegions.isEmpty() ||
@ -1110,7 +1111,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
switch (compactType) { switch (compactType) {
case MOB: case MOB:
addListener(connection.registry.getMasterAddress(), (serverName, err) -> { addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
if (err != null) { if (err != null) {
future.completeExceptionally(err); future.completeExceptionally(err);
return; return;
@ -2349,7 +2350,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
String encodedName = Bytes.toString(regionNameOrEncodedRegionName); String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
// old format encodedName, should be meta region // old format encodedName, should be meta region
future = connection.registry.getMetaRegionLocation() future = connection.registry.getMetaRegionLocations()
.thenApply(locs -> Stream.of(locs.getRegionLocations()) .thenApply(locs -> Stream.of(locs.getRegionLocations())
.filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
} else { } else {
@ -2360,7 +2361,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
RegionInfo regionInfo = RegionInfo regionInfo =
MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
if (regionInfo.isMetaRegion()) { if (regionInfo.isMetaRegion()) {
future = connection.registry.getMetaRegionLocation() future = connection.registry.getMetaRegionLocations()
.thenApply(locs -> Stream.of(locs.getRegionLocations()) .thenApply(locs -> Stream.of(locs.getRegionLocations())
.filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
.findFirst()); .findFirst());
@ -2933,7 +2934,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
switch (compactType) { switch (compactType) {
case MOB: case MOB:
addListener(connection.registry.getMasterAddress(), (serverName, err) -> { addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
if (err != null) { if (err != null) {
future.completeExceptionally(err); future.completeExceptionally(err);
return; return;

View File

@ -50,15 +50,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
* Zookeeper based registry implementation. * Zookeeper based registry implementation.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class ZKAsyncRegistry implements AsyncRegistry { class ZKConnectionRegistry implements ConnectionRegistry {
private static final Logger LOG = LoggerFactory.getLogger(ZKAsyncRegistry.class); private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class);
private final ReadOnlyZKClient zk; private final ReadOnlyZKClient zk;
private final ZNodePaths znodePaths; private final ZNodePaths znodePaths;
ZKAsyncRegistry(Configuration conf) { ZKConnectionRegistry(Configuration conf) {
this.znodePaths = new ZNodePaths(conf); this.znodePaths = new ZNodePaths(conf);
this.zk = new ReadOnlyZKClient(conf); this.zk = new ReadOnlyZKClient(conf);
} }
@ -93,7 +93,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
@Override @Override
public CompletableFuture<String> getClusterId() { public CompletableFuture<String> getClusterId() {
return getAndConvert(znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId); return getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId);
} }
@VisibleForTesting @VisibleForTesting
@ -144,7 +144,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
int replicaId = znodePaths.getMetaReplicaIdFromZnode(metaReplicaZNode); int replicaId = znodePaths.getMetaReplicaIdFromZnode(metaReplicaZNode);
String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode); String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode);
if (replicaId == DEFAULT_REPLICA_ID) { if (replicaId == DEFAULT_REPLICA_ID) {
addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> { addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
if (error != null) { if (error != null) {
future.completeExceptionally(error); future.completeExceptionally(error);
return; return;
@ -162,7 +162,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
tryComplete(remaining, locs, future); tryComplete(remaining, locs, future);
}); });
} else { } else {
addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> { addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
if (future.isDone()) { if (future.isDone()) {
return; return;
} }
@ -191,7 +191,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
} }
@Override @Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() { public CompletableFuture<RegionLocations> getMetaRegionLocations() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>(); CompletableFuture<RegionLocations> future = new CompletableFuture<>();
addListener( addListener(
zk.list(znodePaths.baseZNode) zk.list(znodePaths.baseZNode)
@ -217,8 +217,8 @@ class ZKAsyncRegistry implements AsyncRegistry {
} }
@Override @Override
public CompletableFuture<ServerName> getMasterAddress() { public CompletableFuture<ServerName> getActiveMaster() {
return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto) return getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
.thenApply(proto -> { .thenApply(proto -> {
if (proto == null) { if (proto == null) {
return null; return null;

View File

@ -27,13 +27,13 @@ import org.apache.yetus.audience.InterfaceAudience;
* Registry that does nothing. Otherwise, default Registry wants zookeeper up and running. * Registry that does nothing. Otherwise, default Registry wants zookeeper up and running.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class DoNothingAsyncRegistry implements AsyncRegistry { class DoNothingConnectionRegistry implements ConnectionRegistry {
public DoNothingAsyncRegistry(Configuration conf) { public DoNothingConnectionRegistry(Configuration conf) {
} }
@Override @Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() { public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
@ -43,7 +43,7 @@ class DoNothingAsyncRegistry implements AsyncRegistry {
} }
@Override @Override
public CompletableFuture<ServerName> getMasterAddress() { public CompletableFuture<ServerName> getActiveMaster() {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }

View File

@ -142,7 +142,7 @@ public class TestAsyncAdminRpcPriority {
}).when(adminStub).stopServer(any(HBaseRpcController.class), any(StopServerRequest.class), }).when(adminStub).stopServer(any(HBaseRpcController.class), any(StopServerRequest.class),
any()); any());
conn = new AsyncConnectionImpl(CONF, new DoNothingAsyncRegistry(CONF), "test", null, conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", null,
UserProvider.instantiate(CONF).getCurrent()) { UserProvider.instantiate(CONF).getCurrent()) {
@Override @Override

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -43,21 +43,21 @@ public class TestAsyncMetaRegionLocatorFailFast {
private static AsyncMetaRegionLocator LOCATOR; private static AsyncMetaRegionLocator LOCATOR;
private static final class FaultyAsyncRegistry extends DoNothingAsyncRegistry { private static final class FaultyConnectionRegistry extends DoNothingConnectionRegistry {
public FaultyAsyncRegistry(Configuration conf) { public FaultyConnectionRegistry(Configuration conf) {
super(conf); super(conf);
} }
@Override @Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() { public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return FutureUtils.failedFuture(new DoNotRetryRegionException("inject error")); return FutureUtils.failedFuture(new DoNotRetryRegionException("inject error"));
} }
} }
@BeforeClass @BeforeClass
public static void setUp() { public static void setUp() {
LOCATOR = new AsyncMetaRegionLocator(new FaultyAsyncRegistry(CONF)); LOCATOR = new AsyncMetaRegionLocator(new FaultyConnectionRegistry(CONF));
} }
@Test(expected = DoNotRetryIOException.class) @Test(expected = DoNotRetryIOException.class)

View File

@ -175,7 +175,7 @@ public class TestAsyncTableRpcPriority {
return null; return null;
} }
}).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any()); }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
conn = new AsyncConnectionImpl(CONF, new DoNothingAsyncRegistry(CONF), "test", null, conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", null,
UserProvider.instantiate(CONF).getCurrent()) { UserProvider.instantiate(CONF).getCurrent()) {
@Override @Override

View File

@ -38,17 +38,17 @@ import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@Category({ ClientTests.class, SmallTests.class }) @Category({ ClientTests.class, SmallTests.class })
public class TestAsyncRegistryLeak { public class TestConnectionRegistryLeak {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncRegistryLeak.class); HBaseClassTestRule.forClass(TestConnectionRegistryLeak.class);
public static final class AsyncRegistryForTest extends DoNothingAsyncRegistry { public static final class ConnectionRegistryForTest extends DoNothingConnectionRegistry {
private boolean closed = false; private boolean closed = false;
public AsyncRegistryForTest(Configuration conf) { public ConnectionRegistryForTest(Configuration conf) {
super(conf); super(conf);
CREATED.add(this); CREATED.add(this);
} }
@ -64,14 +64,14 @@ public class TestAsyncRegistryLeak {
} }
} }
private static final List<AsyncRegistryForTest> CREATED = new ArrayList<>(); private static final List<ConnectionRegistryForTest> CREATED = new ArrayList<>();
private static Configuration CONF = HBaseConfiguration.create(); private static Configuration CONF = HBaseConfiguration.create();
@BeforeClass @BeforeClass
public static void setUp() { public static void setUp() {
CONF.setClass(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, AsyncRegistryForTest.class, CONF.setClass(ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
AsyncRegistry.class); ConnectionRegistryForTest.class, ConnectionRegistry.class);
} }
@Test @Test

View File

@ -50,8 +50,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpeci
@InterfaceAudience.Private @InterfaceAudience.Private
class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClusterConnection { class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClusterConnection {
public AsyncClusterConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId, public AsyncClusterConnectionImpl(Configuration conf, ConnectionRegistry registry,
SocketAddress localAddress, User user) { String clusterId, SocketAddress localAddress, User user) {
super(conf, registry, clusterId, localAddress, user); super(conf, registry, clusterId, localAddress, user);
} }

View File

@ -49,7 +49,7 @@ public final class ClusterConnectionFactory {
*/ */
public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf, public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
SocketAddress localAddress, User user) throws IOException { SocketAddress localAddress, User user) throws IOException {
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf); ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
String clusterId = FutureUtils.get(registry.getClusterId()); String clusterId = FutureUtils.get(registry.getClusterId());
Class<? extends AsyncClusterConnection> clazz = Class<? extends AsyncClusterConnection> clazz =
conf.getClass(HBASE_SERVER_CLUSTER_CONNECTION_IMPL, AsyncClusterConnectionImpl.class, conf.getClass(HBASE_SERVER_CLUSTER_CONNECTION_IMPL, AsyncClusterConnectionImpl.class,

View File

@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -40,7 +39,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.DummyAsyncRegistry; import org.apache.hadoop.hbase.client.DummyConnectionRegistry;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool; import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
@ -92,7 +91,7 @@ public class TestZooKeeperTableArchiveClient {
private static RegionServerServices rss; private static RegionServerServices rss;
private static DirScanPool POOL; private static DirScanPool POOL;
public static final class MockRegistry extends DummyAsyncRegistry { public static final class MockRegistry extends DummyConnectionRegistry {
public MockRegistry(Configuration conf) { public MockRegistry(Configuration conf) {
} }
@ -110,8 +109,8 @@ public class TestZooKeeperTableArchiveClient {
public static void setupCluster() throws Exception { public static void setupCluster() throws Exception {
setupConf(UTIL.getConfiguration()); setupConf(UTIL.getConfiguration());
UTIL.startMiniZKCluster(); UTIL.startMiniZKCluster();
UTIL.getConfiguration().setClass("hbase.client.registry.impl", MockRegistry.class, UTIL.getConfiguration().setClass(MockRegistry.REGISTRY_IMPL_CONF_KEY, MockRegistry.class,
DummyAsyncRegistry.class); DummyConnectionRegistry.class);
CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration()); CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION); archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION);
// make hfile archiving node so we can archive files // make hfile archiving node so we can archive files
@ -147,9 +146,13 @@ public class TestZooKeeperTableArchiveClient {
@AfterClass @AfterClass
public static void cleanupTest() throws Exception { public static void cleanupTest() throws Exception {
CONNECTION.close(); if (CONNECTION != null) {
CONNECTION.close();
}
UTIL.shutdownMiniZKCluster(); UTIL.shutdownMiniZKCluster();
POOL.shutdownNow(); if (POOL != null) {
POOL.shutdownNow();
}
} }
/** /**
@ -353,6 +356,7 @@ public class TestZooKeeperTableArchiveClient {
* @throws IOException on failure * @throws IOException on failure
* @throws KeeperException on failure * @throws KeeperException on failure
*/ */
@SuppressWarnings("checkstyle:EmptyBlock")
private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner) private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner)
throws IOException, KeeperException { throws IOException, KeeperException {
// turn on hfile retention // turn on hfile retention

View File

@ -58,7 +58,8 @@ public abstract class AbstractTestRegionLocator {
} }
UTIL.getAdmin().createTable(td, SPLIT_KEYS); UTIL.getAdmin().createTable(td, SPLIT_KEYS);
UTIL.waitTableAvailable(TABLE_NAME); UTIL.waitTableAvailable(TABLE_NAME);
try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(UTIL.getConfiguration())) { try (ConnectionRegistry registry =
ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration())) {
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(UTIL.getConfiguration(), RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(UTIL.getConfiguration(),
registry, REGION_REPLICATION); registry, REGION_REPLICATION);
} }

View File

@ -23,14 +23,15 @@ import org.apache.hadoop.hbase.ServerName;
/** /**
* Can be overridden in UT if you only want to implement part of the methods in * Can be overridden in UT if you only want to implement part of the methods in
* {@link AsyncRegistry}. * {@link ConnectionRegistry}.
*/ */
public class DummyAsyncRegistry implements AsyncRegistry { public class DummyConnectionRegistry implements ConnectionRegistry {
public static final String REGISTRY_IMPL_CONF_KEY = AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY; public static final String REGISTRY_IMPL_CONF_KEY =
ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
@Override @Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() { public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return null; return null;
} }
@ -40,7 +41,7 @@ public class DummyAsyncRegistry implements AsyncRegistry {
} }
@Override @Override
public CompletableFuture<ServerName> getMasterAddress() { public CompletableFuture<ServerName> getActiveMaster() {
return null; return null;
} }

View File

@ -44,7 +44,7 @@ public final class RegionReplicaTestHelper {
// waits for all replicas to have region location // waits for all replicas to have region location
static void waitUntilAllMetaReplicasHavingRegionLocation(Configuration conf, static void waitUntilAllMetaReplicasHavingRegionLocation(Configuration conf,
AsyncRegistry registry, int regionReplication) throws IOException { ConnectionRegistry registry, int regionReplication) throws IOException {
Waiter.waitFor(conf, conf.getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true, Waiter.waitFor(conf, conf.getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
new ExplainingPredicate<IOException>() { new ExplainingPredicate<IOException>() {
@Override @Override
@ -55,7 +55,7 @@ public final class RegionReplicaTestHelper {
@Override @Override
public boolean evaluate() throws IOException { public boolean evaluate() throws IOException {
try { try {
RegionLocations locs = registry.getMetaRegionLocation().get(); RegionLocations locs = registry.getMetaRegionLocations().get();
if (locs.size() < regionReplication) { if (locs.size() < regionReplication) {
return false; return false;
} }
@ -66,7 +66,7 @@ public final class RegionReplicaTestHelper {
} }
return true; return true;
} catch (Exception e) { } catch (Exception e) {
TestZKAsyncRegistry.LOG.warn("Failed to get meta region locations", e); TestZKConnectionRegistry.LOG.warn("Failed to get meta region locations", e);
return false; return false;
} }
} }

View File

@ -53,7 +53,8 @@ public class TestAsyncAdminWithRegionReplicas extends TestAsyncAdminBase {
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
TestAsyncAdminBase.setUpBeforeClass(); TestAsyncAdminBase.setUpBeforeClass();
try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) { try (ConnectionRegistry registry =
ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) {
RegionReplicaTestHelper RegionReplicaTestHelper
.waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), registry, 3); .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), registry, 3);
} }

View File

@ -44,7 +44,7 @@ public class TestAsyncMetaRegionLocator {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static AsyncRegistry REGISTRY; private static ConnectionRegistry REGISTRY;
private static AsyncMetaRegionLocator LOCATOR; private static AsyncMetaRegionLocator LOCATOR;
@ -53,7 +53,7 @@ public class TestAsyncMetaRegionLocator {
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
TEST_UTIL.waitUntilNoRegionsInTransition(); TEST_UTIL.waitUntilNoRegionsInTransition();
REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
RegionReplicaTestHelper RegionReplicaTestHelper
.waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3); .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3);
TEST_UTIL.getAdmin().balancerSwitch(false, true); TEST_UTIL.getAdmin().balancerSwitch(false, true);

View File

@ -79,7 +79,8 @@ public class TestAsyncNonMetaRegionLocator {
public static void setUp() throws Exception { public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
TEST_UTIL.getAdmin().balancerSwitch(false, true); TEST_UTIL.getAdmin().balancerSwitch(false, true);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); ConnectionRegistry registry =
ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), null, User.getCurrent()); registry.getClusterId().get(), null, User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN); LOCATOR = new AsyncNonMetaRegionLocator(CONN);

View File

@ -123,7 +123,8 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
conf.setInt(MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, MAX_ALLOWED); conf.setInt(MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, MAX_ALLOWED);
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
TEST_UTIL.getAdmin().balancerSwitch(false, true); TEST_UTIL.getAdmin().balancerSwitch(false, true);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); ConnectionRegistry registry =
ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), null, User.getCurrent()); registry.getClusterId().get(), null, User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN); LOCATOR = new AsyncNonMetaRegionLocator(CONN);

View File

@ -98,7 +98,8 @@ public class TestAsyncRegionLocator {
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME); TEST_UTIL.waitTableAvailable(TABLE_NAME);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); ConnectionRegistry registry =
ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), null, User.getCurrent()); registry.getClusterId().get(), null, User.getCurrent());
LOCATOR = CONN.getLocator(); LOCATOR = CONN.getLocator();

View File

@ -71,7 +71,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
TEST_UTIL.getAdmin().balancerSwitch(false, true); TEST_UTIL.getAdmin().balancerSwitch(false, true);
TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME); TEST_UTIL.waitTableAvailable(TABLE_NAME);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); ConnectionRegistry registry =
ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), null, User.getCurrent()); registry.getClusterId().get(), null, User.getCurrent());
} }

View File

@ -91,7 +91,7 @@ public class TestAsyncTableUseMetaReplicas {
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
FailPrimaryMetaScanCp.class.getName()); FailPrimaryMetaScanCp.class.getName());
UTIL.startMiniCluster(3); UTIL.startMiniCluster(3);
try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf)) { try (ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf)) {
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(conf, registry, 3); RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(conf, registry, 3);
} }
try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) { try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {

View File

@ -53,13 +53,13 @@ public class TestMetaRegionLocationCache {
HBaseClassTestRule.forClass(TestMetaRegionLocationCache.class); HBaseClassTestRule.forClass(TestMetaRegionLocationCache.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static AsyncRegistry REGISTRY; private static ConnectionRegistry REGISTRY;
@BeforeClass @BeforeClass
public static void setUp() throws Exception { public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation( RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(
TEST_UTIL.getConfiguration(), REGISTRY, 3); TEST_UTIL.getConfiguration(), REGISTRY, 3);
TEST_UTIL.getAdmin().balancerSwitch(false, true); TEST_UTIL.getAdmin().balancerSwitch(false, true);

View File

@ -48,16 +48,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@Category({ MediumTests.class, ClientTests.class }) @Category({ MediumTests.class, ClientTests.class })
public class TestZKAsyncRegistry { public class TestZKConnectionRegistry {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestZKAsyncRegistry.class); HBaseClassTestRule.forClass(TestZKConnectionRegistry.class);
static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class); static final Logger LOG = LoggerFactory.getLogger(TestZKConnectionRegistry.class);
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static ZKAsyncRegistry REGISTRY; private static ZKConnectionRegistry REGISTRY;
@BeforeClass @BeforeClass
public static void setUp() throws Exception { public static void setUp() throws Exception {
@ -67,7 +67,7 @@ public class TestZKAsyncRegistry {
// make sure that we do not depend on this config when getting locations for meta replicas, see // make sure that we do not depend on this config when getting locations for meta replicas, see
// HBASE-21658. // HBASE-21658.
conf.setInt(META_REPLICAS_NUM, 1); conf.setInt(META_REPLICAS_NUM, 1);
REGISTRY = new ZKAsyncRegistry(conf); REGISTRY = new ZKConnectionRegistry(conf);
} }
@AfterClass @AfterClass
@ -84,10 +84,10 @@ public class TestZKAsyncRegistry {
assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, expectedClusterId, assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, expectedClusterId,
clusterId); clusterId);
assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(), assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
REGISTRY.getMasterAddress().get()); REGISTRY.getActiveMaster().get());
RegionReplicaTestHelper RegionReplicaTestHelper
.waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3); .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3);
RegionLocations locs = REGISTRY.getMetaRegionLocation().get(); RegionLocations locs = REGISTRY.getMetaRegionLocations().get();
assertEquals(3, locs.getRegionLocations().length); assertEquals(3, locs.getRegionLocations().length);
IntStream.range(0, 3).forEach(i -> { IntStream.range(0, 3).forEach(i -> {
HRegionLocation loc = locs.getRegionLocation(i); HRegionLocation loc = locs.getRegionLocation(i);
@ -102,7 +102,7 @@ public class TestZKAsyncRegistry {
try (ReadOnlyZKClient zk1 = REGISTRY.getZKClient()) { try (ReadOnlyZKClient zk1 = REGISTRY.getZKClient()) {
Configuration otherConf = new Configuration(TEST_UTIL.getConfiguration()); Configuration otherConf = new Configuration(TEST_UTIL.getConfiguration());
otherConf.set(HConstants.ZOOKEEPER_QUORUM, "localhost"); otherConf.set(HConstants.ZOOKEEPER_QUORUM, "localhost");
try (ZKAsyncRegistry otherRegistry = new ZKAsyncRegistry(otherConf)) { try (ZKConnectionRegistry otherRegistry = new ZKConnectionRegistry(otherConf)) {
ReadOnlyZKClient zk2 = otherRegistry.getZKClient(); ReadOnlyZKClient zk2 = otherRegistry.getZKClient();
assertNotSame("Using a different configuration / quorum should result in different " + assertNotSame("Using a different configuration / quorum should result in different " +
"backing zk connection.", zk1, zk2); "backing zk connection.", zk1, zk2);
@ -119,9 +119,9 @@ public class TestZKAsyncRegistry {
public void testNoMetaAvailable() throws InterruptedException { public void testNoMetaAvailable() throws InterruptedException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.set("zookeeper.znode.metaserver", "whatever"); conf.set("zookeeper.znode.metaserver", "whatever");
try (ZKAsyncRegistry registry = new ZKAsyncRegistry(conf)) { try (ZKConnectionRegistry registry = new ZKConnectionRegistry(conf)) {
try { try {
registry.getMetaRegionLocation().get(); registry.getMetaRegionLocations().get();
fail("Should have failed since we set an incorrect meta znode prefix"); fail("Should have failed since we set an incorrect meta znode prefix");
} catch (ExecutionException e) { } catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(IOException.class)); assertThat(e.getCause(), instanceOf(IOException.class));

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
@ -43,8 +42,8 @@ import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.DummyAsyncClusterConnection; import org.apache.hadoop.hbase.client.DummyAsyncClusterConnection;
import org.apache.hadoop.hbase.client.DummyAsyncRegistry;
import org.apache.hadoop.hbase.client.DummyAsyncTable; import org.apache.hadoop.hbase.client.DummyAsyncTable;
import org.apache.hadoop.hbase.client.DummyConnectionRegistry;
import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@ -57,9 +56,7 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
/** /**
@ -108,12 +105,12 @@ public class TestWALEntrySinkFilter {
public void testWALEntryFilter() throws IOException { public void testWALEntryFilter() throws IOException {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
// Make it so our filter is instantiated on construction of ReplicationSink. // Make it so our filter is instantiated on construction of ReplicationSink.
conf.setClass(DummyAsyncRegistry.REGISTRY_IMPL_CONF_KEY, DevNullAsyncRegistry.class, conf.setClass(DummyConnectionRegistry.REGISTRY_IMPL_CONF_KEY, DevNullConnectionRegistry.class,
DummyAsyncRegistry.class); DummyConnectionRegistry.class);
conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY,
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL, conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL,
DevNullAsyncClusterConnection.class, AsyncClusterConnection.class); DevNullAsyncClusterConnection.class, AsyncClusterConnection.class);
ReplicationSink sink = new ReplicationSink(conf, STOPPABLE); ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
// Create some dumb walentries. // Create some dumb walentries.
List<AdminProtos.WALEntry> entries = new ArrayList<>(); List<AdminProtos.WALEntry> entries = new ArrayList<>();
@ -190,9 +187,9 @@ public class TestWALEntrySinkFilter {
} }
} }
public static class DevNullAsyncRegistry extends DummyAsyncRegistry { public static class DevNullConnectionRegistry extends DummyConnectionRegistry {
public DevNullAsyncRegistry(Configuration conf) { public DevNullConnectionRegistry(Configuration conf) {
} }
@Override @Override