HBASE-26834 Adapt ConnectionRule for both sync and async connections
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
7acfbc0bea
commit
18d53339a8
|
@ -56,7 +56,7 @@ public class TestShellExecEndpointCoprocessor {
|
|||
|
||||
@Rule
|
||||
public final ConnectionRule connectionRule =
|
||||
new ConnectionRule(miniClusterRule::createConnection);
|
||||
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
|
||||
|
||||
@Test
|
||||
public void testShellExecUnspecified() {
|
||||
|
@ -69,7 +69,7 @@ public class TestShellExecEndpointCoprocessor {
|
|||
}
|
||||
|
||||
private void testShellExecForeground(final Consumer<ShellExecRequest.Builder> consumer) {
|
||||
final AsyncConnection conn = connectionRule.getConnection();
|
||||
final AsyncConnection conn = connectionRule.getAsyncConnection();
|
||||
final AsyncAdmin admin = conn.getAdmin();
|
||||
|
||||
final String command = "echo -n \"hello world\"";
|
||||
|
@ -87,7 +87,7 @@ public class TestShellExecEndpointCoprocessor {
|
|||
|
||||
@Test
|
||||
public void testShellExecBackground() throws IOException {
|
||||
final AsyncConnection conn = connectionRule.getConnection();
|
||||
final AsyncConnection conn = connectionRule.getAsyncConnection();
|
||||
final AsyncAdmin admin = conn.getAdmin();
|
||||
|
||||
final File testDataDir = ensureTestDataDirExists(miniClusterRule.getTestingUtility());
|
||||
|
@ -121,7 +121,7 @@ public class TestShellExecEndpointCoprocessor {
|
|||
final Path testDataDir = Optional.of(testingUtility)
|
||||
.map(HBaseTestingUtility::getDataTestDir)
|
||||
.map(Object::toString)
|
||||
.map(val -> Paths.get(val))
|
||||
.map(Paths::get)
|
||||
.orElseThrow(() -> new RuntimeException("Unable to locate temp directory path."));
|
||||
final File testDataDirFile = Files.createDirectories(testDataDir).toFile();
|
||||
assertTrue(testDataDirFile.exists());
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.ExternalResource;
|
||||
|
@ -35,7 +36,7 @@ import org.junit.rules.ExternalResource;
|
|||
* public class TestMyClass {
|
||||
* private static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
|
||||
* private static final ConnectionRule connectionRule =
|
||||
* new ConnectionRule(miniClusterRule::createConnection);
|
||||
* ConnectionRule.createAsyncConnectionRule(miniClusterRule::createConnection);
|
||||
*
|
||||
* @ClassRule
|
||||
* public static final TestRule rule = RuleChain
|
||||
|
@ -44,26 +45,70 @@ import org.junit.rules.ExternalResource;
|
|||
* }
|
||||
* }</pre>
|
||||
*/
|
||||
public class ConnectionRule extends ExternalResource {
|
||||
public final class ConnectionRule extends ExternalResource {
|
||||
|
||||
private final Supplier<CompletableFuture<AsyncConnection>> connectionSupplier;
|
||||
private AsyncConnection connection;
|
||||
private final Supplier<Connection> connectionSupplier;
|
||||
private final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier;
|
||||
|
||||
public ConnectionRule(final Supplier<CompletableFuture<AsyncConnection>> connectionSupplier) {
|
||||
this.connectionSupplier = connectionSupplier;
|
||||
private Connection connection;
|
||||
private AsyncConnection asyncConnection;
|
||||
|
||||
public static ConnectionRule createConnectionRule(
|
||||
final Supplier<Connection> connectionSupplier
|
||||
) {
|
||||
return new ConnectionRule(connectionSupplier, null);
|
||||
}
|
||||
|
||||
public AsyncConnection getConnection() {
|
||||
public static ConnectionRule createAsyncConnectionRule(
|
||||
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
|
||||
) {
|
||||
return new ConnectionRule(null, asyncConnectionSupplier);
|
||||
}
|
||||
|
||||
public static ConnectionRule createConnectionRule(
|
||||
final Supplier<Connection> connectionSupplier,
|
||||
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
|
||||
) {
|
||||
return new ConnectionRule(connectionSupplier, asyncConnectionSupplier);
|
||||
}
|
||||
|
||||
private ConnectionRule(
|
||||
final Supplier<Connection> connectionSupplier,
|
||||
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
|
||||
) {
|
||||
this.connectionSupplier = connectionSupplier;
|
||||
this.asyncConnectionSupplier = asyncConnectionSupplier;
|
||||
}
|
||||
|
||||
public Connection getConnection() {
|
||||
if (connection == null) {
|
||||
throw new IllegalStateException(
|
||||
"ConnectionRule not initialized with a synchronous connection.");
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
|
||||
public AsyncConnection getAsyncConnection() {
|
||||
if (asyncConnection == null) {
|
||||
throw new IllegalStateException(
|
||||
"ConnectionRule not initialized with an asynchronous connection.");
|
||||
}
|
||||
return asyncConnection;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void before() throws Throwable {
|
||||
this.connection = connectionSupplier.get().join();
|
||||
if (connectionSupplier != null) {
|
||||
this.connection = connectionSupplier.get();
|
||||
}
|
||||
if (asyncConnectionSupplier != null) {
|
||||
this.asyncConnection = asyncConnectionSupplier.get().join();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void after() {
|
||||
CompletableFuture<Void> closeConnection = CompletableFuture.runAsync(() -> {
|
||||
if (this.connection != null) {
|
||||
try {
|
||||
connection.close();
|
||||
|
@ -71,5 +116,16 @@ public class ConnectionRule extends ExternalResource {
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
CompletableFuture<Void> closeAsyncConnection = CompletableFuture.runAsync(() -> {
|
||||
if (this.asyncConnection != null) {
|
||||
try {
|
||||
asyncConnection.close();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
CompletableFuture.allOf(closeConnection, closeAsyncConnection).join();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
|
@ -41,7 +42,7 @@ import org.junit.rules.TestRule;
|
|||
*
|
||||
* @Rule
|
||||
* public final ConnectionRule connectionRule =
|
||||
* new ConnectionRule(miniClusterRule::createConnection);
|
||||
* ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
|
||||
* }
|
||||
* }</pre>
|
||||
*/
|
||||
|
@ -102,11 +103,26 @@ public final class MiniClusterRule extends ExternalResource {
|
|||
return testingUtility;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link Connection} to the managed {@link MiniHBaseCluster}. It's up to the caller
|
||||
* to {@link Connection#close() close()} the connection when finished.
|
||||
*/
|
||||
public Connection createConnection() {
|
||||
if (miniCluster == null) {
|
||||
throw new IllegalStateException("test cluster not initialized");
|
||||
}
|
||||
try {
|
||||
return ConnectionFactory.createConnection(miniCluster.getConf());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link AsyncConnection} to the managed {@link MiniHBaseCluster}. It's up to the caller
|
||||
* to {@link AsyncConnection#close() close()} the connection when finished.
|
||||
*/
|
||||
public CompletableFuture<AsyncConnection> createConnection() {
|
||||
public CompletableFuture<AsyncConnection> createAsyncConnection() {
|
||||
if (miniCluster == null) {
|
||||
throw new IllegalStateException("test cluster not initialized");
|
||||
}
|
||||
|
|
|
@ -68,9 +68,9 @@ public class TestMetaBrowser {
|
|||
public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
|
||||
|
||||
private final ConnectionRule connectionRule =
|
||||
new ConnectionRule(miniClusterRule::createConnection);
|
||||
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
|
||||
private final ClearUserNamespacesAndTablesRule clearUserNamespacesAndTablesRule =
|
||||
new ClearUserNamespacesAndTablesRule(connectionRule::getConnection);
|
||||
new ClearUserNamespacesAndTablesRule(connectionRule::getAsyncConnection);
|
||||
|
||||
@Rule
|
||||
public TestRule rule = RuleChain.outerRule(connectionRule)
|
||||
|
@ -84,7 +84,7 @@ public class TestMetaBrowser {
|
|||
|
||||
@Before
|
||||
public void before() {
|
||||
connection = connectionRule.getConnection();
|
||||
connection = connectionRule.getAsyncConnection();
|
||||
admin = connection.getAdmin();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue