HBASE-26834 Adapt ConnectionRule for both sync and async connections

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Nick Dimiduk 2022-03-21 12:41:22 +01:00 committed by GitHub
parent e56ed404cb
commit f78f232b28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 109 additions and 36 deletions

View File

@ -116,13 +116,13 @@ public class TestCoprocessorEndpointTracing {
}) })
.build(); .build();
private static final ConnectionRule connectionRule = private static final ConnectionRule connectionRule =
new ConnectionRule(miniclusterRule::createConnection); ConnectionRule.createAsyncConnectionRule(miniclusterRule::createAsyncConnection);
private static final class Setup extends ExternalResource { private static final class Setup extends ExternalResource {
@Override @Override
protected void before() throws Throwable { protected void before() throws Throwable {
final HBaseTestingUtil util = miniclusterRule.getTestingUtility(); final HBaseTestingUtil util = miniclusterRule.getTestingUtility();
final AsyncConnection connection = connectionRule.getConnection(); final AsyncConnection connection = connectionRule.getAsyncConnection();
final AsyncAdmin admin = connection.getAdmin(); final AsyncAdmin admin = connection.getAdmin();
final TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TEST_TABLE) final TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TEST_TABLE)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY)).build(); .setColumnFamily(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY)).build();
@ -149,7 +149,7 @@ public class TestCoprocessorEndpointTracing {
@Test @Test
public void traceAsyncTableEndpoint() { public void traceAsyncTableEndpoint() {
final AsyncConnection connection = connectionRule.getConnection(); final AsyncConnection connection = connectionRule.getAsyncConnection();
final AsyncTable<?> table = connection.getTable(TEST_TABLE); final AsyncTable<?> table = connection.getTable(TEST_TABLE);
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
final CompletableFuture<Map<byte[], String>> future = new CompletableFuture<>(); final CompletableFuture<Map<byte[], String>> future = new CompletableFuture<>();
@ -228,7 +228,7 @@ public class TestCoprocessorEndpointTracing {
@Test @Test
public void traceSyncTableEndpointCall() throws Exception { public void traceSyncTableEndpointCall() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection(); final Connection connection = connectionRule.getConnection();
try (final Table table = connection.getTable(TEST_TABLE)) { try (final Table table = connection.getTable(TEST_TABLE)) {
final RpcController controller = new ServerRpcController(); final RpcController controller = new ServerRpcController();
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
@ -280,7 +280,7 @@ public class TestCoprocessorEndpointTracing {
@Test @Test
public void traceSyncTableEndpointCallAndCallback() throws Exception { public void traceSyncTableEndpointCallAndCallback() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection(); final Connection connection = connectionRule.getConnection();
try (final Table table = connection.getTable(TEST_TABLE)) { try (final Table table = connection.getTable(TEST_TABLE)) {
final RpcController controller = new ServerRpcController(); final RpcController controller = new ServerRpcController();
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
@ -336,7 +336,7 @@ public class TestCoprocessorEndpointTracing {
@Test @Test
public void traceSyncTableRegionCoprocessorRpcChannel() throws Exception { public void traceSyncTableRegionCoprocessorRpcChannel() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection(); final Connection connection = connectionRule.getConnection();
try (final Table table = connection.getTable(TEST_TABLE)) { try (final Table table = connection.getTable(TEST_TABLE)) {
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
final EchoResponseProto response = TraceUtil.trace(() -> { final EchoResponseProto response = TraceUtil.trace(() -> {
@ -376,7 +376,7 @@ public class TestCoprocessorEndpointTracing {
@Test @Test
public void traceSyncTableBatchEndpoint() throws Exception { public void traceSyncTableBatchEndpoint() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection(); final Connection connection = connectionRule.getConnection();
try (final Table table = connection.getTable(TEST_TABLE)) { try (final Table table = connection.getTable(TEST_TABLE)) {
final Descriptors.MethodDescriptor descriptor = final Descriptors.MethodDescriptor descriptor =
TestProtobufRpcProto.getDescriptor().findMethodByName("echo"); TestProtobufRpcProto.getDescriptor().findMethodByName("echo");
@ -423,7 +423,7 @@ public class TestCoprocessorEndpointTracing {
@Test @Test
public void traceSyncTableBatchEndpointCallback() throws Exception { public void traceSyncTableBatchEndpointCallback() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection(); final Connection connection = connectionRule.getConnection();
try (final Table table = connection.getTable(TEST_TABLE)) { try (final Table table = connection.getTable(TEST_TABLE)) {
final Descriptors.MethodDescriptor descriptor = final Descriptors.MethodDescriptor descriptor =
TestProtobufRpcProto.getDescriptor().findMethodByName("echo"); TestProtobufRpcProto.getDescriptor().findMethodByName("echo");
@ -472,7 +472,7 @@ public class TestCoprocessorEndpointTracing {
@Test @Test
public void traceAsyncAdminEndpoint() throws Exception { public void traceAsyncAdminEndpoint() throws Exception {
final AsyncConnection connection = connectionRule.getConnection(); final AsyncConnection connection = connectionRule.getAsyncConnection();
final AsyncAdmin admin = connection.getAdmin(); final AsyncAdmin admin = connection.getAdmin();
final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build();
final ServiceCaller<TestProtobufRpcProto, EchoResponseProto> callback = final ServiceCaller<TestProtobufRpcProto, EchoResponseProto> callback =
@ -504,7 +504,7 @@ public class TestCoprocessorEndpointTracing {
@Test @Test
public void traceSyncAdminEndpoint() throws Exception { public void traceSyncAdminEndpoint() throws Exception {
final Connection connection = connectionRule.getConnection().toConnection(); final Connection connection = connectionRule.getConnection();
try (final Admin admin = connection.getAdmin()) { try (final Admin admin = connection.getAdmin()) {
final TestProtobufRpcProto.BlockingInterface service = final TestProtobufRpcProto.BlockingInterface service =
TestProtobufRpcProto.newBlockingStub(admin.coprocessorService()); TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
@ -537,7 +537,7 @@ public class TestCoprocessorEndpointTracing {
} }
private void waitForAndLog(Matcher<SpanData> spanMatcher) { private void waitForAndLog(Matcher<SpanData> spanMatcher) {
final Configuration conf = connectionRule.getConnection().getConfiguration(); final Configuration conf = connectionRule.getAsyncConnection().getConfiguration();
Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>( Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
otelClassRule::getSpans, hasItem(spanMatcher))); otelClassRule::getSpans, hasItem(spanMatcher)));
final List<SpanData> spans = otelClassRule.getSpans(); final List<SpanData> spans = otelClassRule.getSpans();

View File

@ -56,7 +56,7 @@ public class TestShellExecEndpointCoprocessor {
@Rule @Rule
public final ConnectionRule connectionRule = public final ConnectionRule connectionRule =
new ConnectionRule(miniClusterRule::createConnection); ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
@Test @Test
public void testShellExecUnspecified() { public void testShellExecUnspecified() {
@ -69,7 +69,7 @@ public class TestShellExecEndpointCoprocessor {
} }
private void testShellExecForeground(final Consumer<ShellExecRequest.Builder> consumer) { private void testShellExecForeground(final Consumer<ShellExecRequest.Builder> consumer) {
final AsyncConnection conn = connectionRule.getConnection(); final AsyncConnection conn = connectionRule.getAsyncConnection();
final AsyncAdmin admin = conn.getAdmin(); final AsyncAdmin admin = conn.getAdmin();
final String command = "echo -n \"hello world\""; final String command = "echo -n \"hello world\"";
@ -87,7 +87,7 @@ public class TestShellExecEndpointCoprocessor {
@Test @Test
public void testShellExecBackground() throws IOException { public void testShellExecBackground() throws IOException {
final AsyncConnection conn = connectionRule.getConnection(); final AsyncConnection conn = connectionRule.getAsyncConnection();
final AsyncAdmin admin = conn.getAdmin(); final AsyncAdmin admin = conn.getAdmin();
final File testDataDir = ensureTestDataDirExists(miniClusterRule.getTestingUtility()); final File testDataDir = ensureTestDataDirExists(miniClusterRule.getTestingUtility());
@ -121,7 +121,7 @@ public class TestShellExecEndpointCoprocessor {
final Path testDataDir = Optional.of(testingUtility) final Path testDataDir = Optional.of(testingUtility)
.map(HBaseTestingUtil::getDataTestDir) .map(HBaseTestingUtil::getDataTestDir)
.map(Object::toString) .map(Object::toString)
.map(val -> Paths.get(val)) .map(Paths::get)
.orElseThrow(() -> new RuntimeException("Unable to locate temp directory path.")); .orElseThrow(() -> new RuntimeException("Unable to locate temp directory path."));
final File testDataDirFile = Files.createDirectories(testDataDir).toFile(); final File testDataDirFile = Files.createDirectories(testDataDir).toFile();
assertTrue(testDataDirFile.exists()); assertTrue(testDataDirFile.exists());

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Rule; import org.junit.Rule;
import org.junit.rules.ExternalResource; import org.junit.rules.ExternalResource;
@ -35,7 +36,7 @@ import org.junit.rules.ExternalResource;
* public class TestMyClass { * public class TestMyClass {
* private static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build(); * private static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
* private static final ConnectionRule connectionRule = * private static final ConnectionRule connectionRule =
* new ConnectionRule(miniClusterRule::createConnection); * ConnectionRule.createAsyncConnectionRule(miniClusterRule::createConnection);
* *
* @ClassRule * @ClassRule
* public static final TestRule rule = RuleChain * public static final TestRule rule = RuleChain
@ -44,26 +45,73 @@ import org.junit.rules.ExternalResource;
* } * }
* }</pre> * }</pre>
*/ */
public class ConnectionRule extends ExternalResource { public final class ConnectionRule extends ExternalResource {
private final Supplier<CompletableFuture<AsyncConnection>> connectionSupplier; private final Supplier<Connection> connectionSupplier;
private AsyncConnection connection; private final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier;
public ConnectionRule(final Supplier<CompletableFuture<AsyncConnection>> connectionSupplier) { private Connection connection;
this.connectionSupplier = connectionSupplier; private AsyncConnection asyncConnection;
public static ConnectionRule createConnectionRule(
final Supplier<Connection> connectionSupplier
) {
return new ConnectionRule(connectionSupplier, null);
} }
public AsyncConnection getConnection() { public static ConnectionRule createAsyncConnectionRule(
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
) {
return new ConnectionRule(null, asyncConnectionSupplier);
}
public static ConnectionRule createConnectionRule(
final Supplier<Connection> connectionSupplier,
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
) {
return new ConnectionRule(connectionSupplier, asyncConnectionSupplier);
}
private ConnectionRule(
final Supplier<Connection> connectionSupplier,
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
) {
this.connectionSupplier = connectionSupplier;
this.asyncConnectionSupplier = asyncConnectionSupplier;
}
public Connection getConnection() {
if (connection == null) {
throw new IllegalStateException(
"ConnectionRule not initialized with a synchronous connection.");
}
return connection; return connection;
} }
public AsyncConnection getAsyncConnection() {
if (asyncConnection == null) {
throw new IllegalStateException(
"ConnectionRule not initialized with an asynchronous connection.");
}
return asyncConnection;
}
@Override @Override
protected void before() throws Throwable { protected void before() throws Throwable {
this.connection = connectionSupplier.get().join(); if (connectionSupplier != null) {
this.connection = connectionSupplier.get();
}
if (asyncConnectionSupplier != null) {
this.asyncConnection = asyncConnectionSupplier.get().join();
}
if (connection == null && asyncConnection != null) {
this.connection = asyncConnection.toConnection();
}
} }
@Override @Override
protected void after() { protected void after() {
CompletableFuture<Void> closeConnection = CompletableFuture.runAsync(() -> {
if (this.connection != null) { if (this.connection != null) {
try { try {
connection.close(); connection.close();
@ -71,5 +119,16 @@ public class ConnectionRule extends ExternalResource {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
});
CompletableFuture<Void> closeAsyncConnection = CompletableFuture.runAsync(() -> {
if (this.asyncConnection != null) {
try {
asyncConnection.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
CompletableFuture.allOf(closeConnection, closeAsyncConnection).join();
} }
} }

View File

@ -19,9 +19,11 @@ package org.apache.hadoop.hbase;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Rule; import org.junit.Rule;
@ -44,7 +46,7 @@ import org.junit.rules.TestRule;
* *
* &#64;Rule * &#64;Rule
* public final ConnectionRule connectionRule = * public final ConnectionRule connectionRule =
* new ConnectionRule(miniClusterRule::createConnection); * ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
* } * }
* } * }
* </pre> * </pre>
@ -108,11 +110,23 @@ public final class MiniClusterRule extends ExternalResource {
return testingUtility; return testingUtility;
} }
/**
* Create a {@link Connection} to the managed {@link SingleProcessHBaseCluster}. It's up to
* the caller to {@link Connection#close() close()} the connection when finished.
*/
public Connection createConnection() {
try {
return createAsyncConnection().get().toConnection();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
/** /**
* Create a {@link AsyncConnection} to the managed {@link SingleProcessHBaseCluster}. It's up to * Create a {@link AsyncConnection} to the managed {@link SingleProcessHBaseCluster}. It's up to
* the caller to {@link AsyncConnection#close() close()} the connection when finished. * the caller to {@link AsyncConnection#close() close()} the connection when finished.
*/ */
public CompletableFuture<AsyncConnection> createConnection() { public CompletableFuture<AsyncConnection> createAsyncConnection() {
if (miniCluster == null) { if (miniCluster == null) {
throw new IllegalStateException("test cluster not initialized"); throw new IllegalStateException("test cluster not initialized");
} }

View File

@ -80,8 +80,8 @@ public class TestApiV1ClusterMetricsResource {
}) })
.build(); .build();
private static final ConnectionRule connectionRule = private static final ConnectionRule connectionRule =
new ConnectionRule(miniClusterRule::createConnection); ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
private static final ClassSetup classRule = new ClassSetup(connectionRule::getConnection); private static final ClassSetup classRule = new ClassSetup(connectionRule::getAsyncConnection);
private static final class ClassSetup extends ExternalResource { private static final class ClassSetup extends ExternalResource {

View File

@ -68,9 +68,9 @@ public class TestMetaBrowser {
public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build(); public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
private final ConnectionRule connectionRule = private final ConnectionRule connectionRule =
new ConnectionRule(miniClusterRule::createConnection); ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
private final ClearUserNamespacesAndTablesRule clearUserNamespacesAndTablesRule = private final ClearUserNamespacesAndTablesRule clearUserNamespacesAndTablesRule =
new ClearUserNamespacesAndTablesRule(connectionRule::getConnection); new ClearUserNamespacesAndTablesRule(connectionRule::getAsyncConnection);
@Rule @Rule
public TestRule rule = RuleChain.outerRule(connectionRule) public TestRule rule = RuleChain.outerRule(connectionRule)
@ -84,7 +84,7 @@ public class TestMetaBrowser {
@Before @Before
public void before() { public void before() {
connection = connectionRule.getConnection(); connection = connectionRule.getAsyncConnection();
admin = connection.getAdmin(); admin = connection.getAdmin();
} }