From f78f232b2831b86bd3c4fdd046f61044af13e0a3 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Mon, 21 Mar 2022 12:41:22 +0100 Subject: [PATCH] HBASE-26834 Adapt ConnectionRule for both sync and async connections Signed-off-by: Duo Zhang --- .../TestCoprocessorEndpointTracing.java | 22 ++--- .../TestShellExecEndpointCoprocessor.java | 8 +- .../apache/hadoop/hbase/ConnectionRule.java | 87 ++++++++++++++++--- .../apache/hadoop/hbase/MiniClusterRule.java | 18 +++- .../http/TestApiV1ClusterMetricsResource.java | 4 +- .../hbase/master/http/TestMetaBrowser.java | 6 +- 6 files changed, 109 insertions(+), 36 deletions(-) diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpointTracing.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpointTracing.java index f91db84e8c8..167a656ded0 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpointTracing.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpointTracing.java @@ -116,13 +116,13 @@ public class TestCoprocessorEndpointTracing { }) .build(); private static final ConnectionRule connectionRule = - new ConnectionRule(miniclusterRule::createConnection); + ConnectionRule.createAsyncConnectionRule(miniclusterRule::createAsyncConnection); private static final class Setup extends ExternalResource { @Override protected void before() throws Throwable { final HBaseTestingUtil util = miniclusterRule.getTestingUtility(); - final AsyncConnection connection = connectionRule.getConnection(); + final AsyncConnection connection = connectionRule.getAsyncConnection(); final AsyncAdmin admin = connection.getAdmin(); final TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TEST_TABLE) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY)).build(); @@ -149,7 +149,7 @@ public class TestCoprocessorEndpointTracing { @Test public void traceAsyncTableEndpoint() { - final AsyncConnection connection = connectionRule.getConnection(); + final AsyncConnection connection = connectionRule.getAsyncConnection(); final AsyncTable table = connection.getTable(TEST_TABLE); final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); final CompletableFuture> future = new CompletableFuture<>(); @@ -228,7 +228,7 @@ public class TestCoprocessorEndpointTracing { @Test public void traceSyncTableEndpointCall() throws Exception { - final Connection connection = connectionRule.getConnection().toConnection(); + final Connection connection = connectionRule.getConnection(); try (final Table table = connection.getTable(TEST_TABLE)) { final RpcController controller = new ServerRpcController(); final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); @@ -280,7 +280,7 @@ public class TestCoprocessorEndpointTracing { @Test public void traceSyncTableEndpointCallAndCallback() throws Exception { - final Connection connection = connectionRule.getConnection().toConnection(); + final Connection connection = connectionRule.getConnection(); try (final Table table = connection.getTable(TEST_TABLE)) { final RpcController controller = new ServerRpcController(); final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); @@ -336,7 +336,7 @@ public class TestCoprocessorEndpointTracing { @Test public void traceSyncTableRegionCoprocessorRpcChannel() throws Exception { - final Connection connection = connectionRule.getConnection().toConnection(); + final Connection connection = connectionRule.getConnection(); try (final Table table = connection.getTable(TEST_TABLE)) { final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); final EchoResponseProto response = TraceUtil.trace(() -> { @@ -376,7 +376,7 @@ public class TestCoprocessorEndpointTracing { @Test public void traceSyncTableBatchEndpoint() throws Exception { - final Connection connection = connectionRule.getConnection().toConnection(); + final Connection connection = connectionRule.getConnection(); try (final Table table = connection.getTable(TEST_TABLE)) { final Descriptors.MethodDescriptor descriptor = TestProtobufRpcProto.getDescriptor().findMethodByName("echo"); @@ -423,7 +423,7 @@ public class TestCoprocessorEndpointTracing { @Test public void traceSyncTableBatchEndpointCallback() throws Exception { - final Connection connection = connectionRule.getConnection().toConnection(); + final Connection connection = connectionRule.getConnection(); try (final Table table = connection.getTable(TEST_TABLE)) { final Descriptors.MethodDescriptor descriptor = TestProtobufRpcProto.getDescriptor().findMethodByName("echo"); @@ -472,7 +472,7 @@ public class TestCoprocessorEndpointTracing { @Test public void traceAsyncAdminEndpoint() throws Exception { - final AsyncConnection connection = connectionRule.getConnection(); + final AsyncConnection connection = connectionRule.getAsyncConnection(); final AsyncAdmin admin = connection.getAdmin(); final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); final ServiceCaller callback = @@ -504,7 +504,7 @@ public class TestCoprocessorEndpointTracing { @Test public void traceSyncAdminEndpoint() throws Exception { - final Connection connection = connectionRule.getConnection().toConnection(); + final Connection connection = connectionRule.getConnection(); try (final Admin admin = connection.getAdmin()) { final TestProtobufRpcProto.BlockingInterface service = TestProtobufRpcProto.newBlockingStub(admin.coprocessorService()); @@ -537,7 +537,7 @@ public class TestCoprocessorEndpointTracing { } private void waitForAndLog(Matcher spanMatcher) { - final Configuration conf = connectionRule.getConnection().getConfiguration(); + final Configuration conf = connectionRule.getAsyncConnection().getConfiguration(); Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>( otelClassRule::getSpans, hasItem(spanMatcher))); final List spans = otelClassRule.getSpans(); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/TestShellExecEndpointCoprocessor.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/TestShellExecEndpointCoprocessor.java index 4d9cd7eb406..c6e3cacfde5 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/TestShellExecEndpointCoprocessor.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/TestShellExecEndpointCoprocessor.java @@ -56,7 +56,7 @@ public class TestShellExecEndpointCoprocessor { @Rule public final ConnectionRule connectionRule = - new ConnectionRule(miniClusterRule::createConnection); + ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection); @Test public void testShellExecUnspecified() { @@ -69,7 +69,7 @@ public class TestShellExecEndpointCoprocessor { } private void testShellExecForeground(final Consumer consumer) { - final AsyncConnection conn = connectionRule.getConnection(); + final AsyncConnection conn = connectionRule.getAsyncConnection(); final AsyncAdmin admin = conn.getAdmin(); final String command = "echo -n \"hello world\""; @@ -87,7 +87,7 @@ public class TestShellExecEndpointCoprocessor { @Test public void testShellExecBackground() throws IOException { - final AsyncConnection conn = connectionRule.getConnection(); + final AsyncConnection conn = connectionRule.getAsyncConnection(); final AsyncAdmin admin = conn.getAdmin(); final File testDataDir = ensureTestDataDirExists(miniClusterRule.getTestingUtility()); @@ -121,7 +121,7 @@ public class TestShellExecEndpointCoprocessor { final Path testDataDir = Optional.of(testingUtility) .map(HBaseTestingUtil::getDataTestDir) .map(Object::toString) - .map(val -> Paths.get(val)) + .map(Paths::get) .orElseThrow(() -> new RuntimeException("Unable to locate temp directory path.")); final File testDataDirFile = Files.createDirectories(testDataDir).toFile(); assertTrue(testDataDirFile.exists()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ConnectionRule.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ConnectionRule.java index 21ca35adb4a..77bd1c531c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ConnectionRule.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ConnectionRule.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.Connection; import org.junit.ClassRule; import org.junit.Rule; import org.junit.rules.ExternalResource; @@ -35,7 +36,7 @@ import org.junit.rules.ExternalResource; * public class TestMyClass { * private static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build(); * private static final ConnectionRule connectionRule = - * new ConnectionRule(miniClusterRule::createConnection); + * ConnectionRule.createAsyncConnectionRule(miniClusterRule::createConnection); * * @ClassRule * public static final TestRule rule = RuleChain @@ -44,32 +45,90 @@ import org.junit.rules.ExternalResource; * } * } */ -public class ConnectionRule extends ExternalResource { +public final class ConnectionRule extends ExternalResource { - private final Supplier> connectionSupplier; - private AsyncConnection connection; + private final Supplier connectionSupplier; + private final Supplier> asyncConnectionSupplier; - public ConnectionRule(final Supplier> connectionSupplier) { - this.connectionSupplier = connectionSupplier; + private Connection connection; + private AsyncConnection asyncConnection; + + public static ConnectionRule createConnectionRule( + final Supplier connectionSupplier + ) { + return new ConnectionRule(connectionSupplier, null); } - public AsyncConnection getConnection() { + public static ConnectionRule createAsyncConnectionRule( + final Supplier> asyncConnectionSupplier + ) { + return new ConnectionRule(null, asyncConnectionSupplier); + } + + public static ConnectionRule createConnectionRule( + final Supplier connectionSupplier, + final Supplier> asyncConnectionSupplier + ) { + return new ConnectionRule(connectionSupplier, asyncConnectionSupplier); + } + + private ConnectionRule( + final Supplier connectionSupplier, + final Supplier> asyncConnectionSupplier + ) { + this.connectionSupplier = connectionSupplier; + this.asyncConnectionSupplier = asyncConnectionSupplier; + } + + public Connection getConnection() { + if (connection == null) { + throw new IllegalStateException( + "ConnectionRule not initialized with a synchronous connection."); + } return connection; } + public AsyncConnection getAsyncConnection() { + if (asyncConnection == null) { + throw new IllegalStateException( + "ConnectionRule not initialized with an asynchronous connection."); + } + return asyncConnection; + } + @Override protected void before() throws Throwable { - this.connection = connectionSupplier.get().join(); + if (connectionSupplier != null) { + this.connection = connectionSupplier.get(); + } + if (asyncConnectionSupplier != null) { + this.asyncConnection = asyncConnectionSupplier.get().join(); + } + if (connection == null && asyncConnection != null) { + this.connection = asyncConnection.toConnection(); + } } @Override protected void after() { - if (this.connection != null) { - try { - connection.close(); - } catch (IOException e) { - throw new RuntimeException(e); + CompletableFuture closeConnection = CompletableFuture.runAsync(() -> { + if (this.connection != null) { + try { + connection.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } } - } + }); + CompletableFuture closeAsyncConnection = CompletableFuture.runAsync(() -> { + if (this.asyncConnection != null) { + try { + asyncConnection.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + CompletableFuture.allOf(closeConnection, closeAsyncConnection).join(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniClusterRule.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniClusterRule.java index 3e13ec7e12f..f13258f9373 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniClusterRule.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniClusterRule.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.junit.ClassRule; import org.junit.Rule; @@ -44,7 +46,7 @@ import org.junit.rules.TestRule; * * @Rule * public final ConnectionRule connectionRule = - * new ConnectionRule(miniClusterRule::createConnection); + * ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection); * } * } * @@ -108,11 +110,23 @@ public final class MiniClusterRule extends ExternalResource { return testingUtility; } + /** + * Create a {@link Connection} to the managed {@link SingleProcessHBaseCluster}. It's up to + * the caller to {@link Connection#close() close()} the connection when finished. + */ + public Connection createConnection() { + try { + return createAsyncConnection().get().toConnection(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + /** * Create a {@link AsyncConnection} to the managed {@link SingleProcessHBaseCluster}. It's up to * the caller to {@link AsyncConnection#close() close()} the connection when finished. */ - public CompletableFuture createConnection() { + public CompletableFuture createAsyncConnection() { if (miniCluster == null) { throw new IllegalStateException("test cluster not initialized"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/http/TestApiV1ClusterMetricsResource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/http/TestApiV1ClusterMetricsResource.java index 9a8717d5a84..1426b3a0d96 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/http/TestApiV1ClusterMetricsResource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/http/TestApiV1ClusterMetricsResource.java @@ -80,8 +80,8 @@ public class TestApiV1ClusterMetricsResource { }) .build(); private static final ConnectionRule connectionRule = - new ConnectionRule(miniClusterRule::createConnection); - private static final ClassSetup classRule = new ClassSetup(connectionRule::getConnection); + ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection); + private static final ClassSetup classRule = new ClassSetup(connectionRule::getAsyncConnection); private static final class ClassSetup extends ExternalResource { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/http/TestMetaBrowser.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/http/TestMetaBrowser.java index 6d534399bdd..dac3c727a46 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/http/TestMetaBrowser.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/http/TestMetaBrowser.java @@ -68,9 +68,9 @@ public class TestMetaBrowser { public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build(); private final ConnectionRule connectionRule = - new ConnectionRule(miniClusterRule::createConnection); + ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection); private final ClearUserNamespacesAndTablesRule clearUserNamespacesAndTablesRule = - new ClearUserNamespacesAndTablesRule(connectionRule::getConnection); + new ClearUserNamespacesAndTablesRule(connectionRule::getAsyncConnection); @Rule public TestRule rule = RuleChain.outerRule(connectionRule) @@ -84,7 +84,7 @@ public class TestMetaBrowser { @Before public void before() { - connection = connectionRule.getConnection(); + connection = connectionRule.getAsyncConnection(); admin = connection.getAdmin(); }