diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index 40dac2f4458..b8546fa4533 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -2473,7 +2473,7 @@ public interface Admin extends Abortable, Closeable {
/**
* Add a new replication peer for replicating data to slave cluster.
* @param peerId a short name that identifies the peer
- * @param peerConfig configuration for the replication slave cluster
+ * @param peerConfig configuration for the replication peer
* @throws IOException if a remote or network exception occurs
*/
default void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
@@ -2484,13 +2484,44 @@ public interface Admin extends Abortable, Closeable {
/**
* Add a new replication peer for replicating data to slave cluster.
* @param peerId a short name that identifies the peer
- * @param peerConfig configuration for the replication slave cluster
+ * @param peerConfig configuration for the replication peer
* @param enabled peer state, true if ENABLED and false if DISABLED
* @throws IOException if a remote or network exception occurs
*/
void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws IOException;
+ /**
+ * Add a new replication peer but does not block and wait for it.
+ *
+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
+ * ExecutionException if there was an error while executing the operation or TimeoutException in
+ * case the wait timeout was not long enough to allow the operation to complete.
+ * @param peerId a short name that identifies the peer
+ * @param peerConfig configuration for the replication peer
+ * @return the result of the async operation
+ * @throws IOException IOException if a remote or network exception occurs
+ */
+ default Future addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig)
+ throws IOException {
+ return addReplicationPeerAsync(peerId, peerConfig, true);
+ }
+
+ /**
+ * Add a new replication peer but does not block and wait for it.
+ *
+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
+ * ExecutionException if there was an error while executing the operation or TimeoutException in
+ * case the wait timeout was not long enough to allow the operation to complete.
+ * @param peerId a short name that identifies the peer
+ * @param peerConfig configuration for the replication peer
+ * @param enabled peer state, true if ENABLED and false if DISABLED
+ * @return the result of the async operation
+ * @throws IOException IOException if a remote or network exception occurs
+ */
+ Future addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig,
+ boolean enabled) throws IOException;
+
/**
* Remove a peer and stop the replication.
* @param peerId a short name that identifies the peer
@@ -2498,6 +2529,18 @@ public interface Admin extends Abortable, Closeable {
*/
void removeReplicationPeer(String peerId) throws IOException;
+ /**
+ * Remove a replication peer but does not block and wait for it.
+ *
+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
+ * ExecutionException if there was an error while executing the operation or TimeoutException in
+ * case the wait timeout was not long enough to allow the operation to complete.
+ * @param peerId a short name that identifies the peer
+ * @return the result of the async operation
+ * @throws IOException IOException if a remote or network exception occurs
+ */
+ Future removeReplicationPeerAsync(String peerId) throws IOException;
+
/**
* Restart the replication stream to the specified peer.
* @param peerId a short name that identifies the peer
@@ -2505,6 +2548,18 @@ public interface Admin extends Abortable, Closeable {
*/
void enableReplicationPeer(String peerId) throws IOException;
+ /**
+ * Enable a replication peer but does not block and wait for it.
+ *
+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
+ * ExecutionException if there was an error while executing the operation or TimeoutException in
+ * case the wait timeout was not long enough to allow the operation to complete.
+ * @param peerId a short name that identifies the peer
+ * @return the result of the async operation
+ * @throws IOException IOException if a remote or network exception occurs
+ */
+ Future enableReplicationPeerAsync(String peerId) throws IOException;
+
/**
* Stop the replication stream to the specified peer.
* @param peerId a short name that identifies the peer
@@ -2512,6 +2567,18 @@ public interface Admin extends Abortable, Closeable {
*/
void disableReplicationPeer(String peerId) throws IOException;
+ /**
+ * Disable a replication peer but does not block and wait for it.
+ *
+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
+ * ExecutionException if there was an error while executing the operation or TimeoutException in
+ * case the wait timeout was not long enough to allow the operation to complete.
+ * @param peerId a short name that identifies the peer
+ * @return the result of the async operation
+ * @throws IOException IOException if a remote or network exception occurs
+ */
+ Future disableReplicationPeerAsync(String peerId) throws IOException;
+
/**
* Returns the configured ReplicationPeerConfig for the specified peer.
* @param peerId a short name that identifies the peer
@@ -2523,12 +2590,26 @@ public interface Admin extends Abortable, Closeable {
/**
* Update the peerConfig for the specified peer.
* @param peerId a short name that identifies the peer
- * @param peerConfig new config for the peer
+ * @param peerConfig new config for the replication peer
* @throws IOException if a remote or network exception occurs
*/
void updateReplicationPeerConfig(String peerId,
ReplicationPeerConfig peerConfig) throws IOException;
+ /**
+ * Update the peerConfig for the specified peer but does not block and wait for it.
+ *
+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
+ * ExecutionException if there was an error while executing the operation or TimeoutException in
+ * case the wait timeout was not long enough to allow the operation to complete.
+ * @param peerId a short name that identifies the peer
+ * @param peerConfig new config for the replication peer
+ * @return the result of the async operation
+ * @throws IOException IOException if a remote or network exception occurs
+ */
+ Future updateReplicationPeerConfigAsync(String peerId, ReplicationPeerConfig peerConfig)
+ throws IOException;
+
/**
* Append the replicable table column family config from the specified peer.
* @param id a short that identifies the cluster
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index c1373836603..86859847be5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -200,7 +201,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTa
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
/**
@@ -3779,6 +3785,25 @@ public class HBaseAdmin implements Admin {
}
}
+ @InterfaceAudience.Private
+ @InterfaceStability.Evolving
+ private static class ReplicationFuture extends ProcedureFuture {
+ private final String peerId;
+ private final Supplier getOperation;
+
+ public ReplicationFuture(HBaseAdmin admin, String peerId, Long procId,
+ Supplier getOperation) {
+ super(admin, procId);
+ this.peerId = peerId;
+ this.getOperation = getOperation;
+ }
+
+ @Override
+ public String toString() {
+ return "Operation: " + getOperation.get() + ", peerId: " + peerId;
+ }
+ }
+
@Override
public List getSecurityCapabilities() throws IOException {
try {
@@ -3851,50 +3876,82 @@ public class HBaseAdmin implements Admin {
@Override
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws IOException {
- executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) {
- @Override
- protected Void rpcCall() throws Exception {
- master.addReplicationPeer(getRpcController(),
- RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled));
- return null;
- }
- });
+ get(addReplicationPeerAsync(peerId, peerConfig, enabled), this.syncWaitTimeout,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Future addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig,
+ boolean enabled) throws IOException {
+ AddReplicationPeerResponse response = executeCallable(
+ new MasterCallable(getConnection(), getRpcControllerFactory()) {
+ @Override
+ protected AddReplicationPeerResponse rpcCall() throws Exception {
+ return master.addReplicationPeer(getRpcController(),
+ RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled));
+ }
+ });
+ return new ReplicationFuture(this, peerId, response.getProcId(), () -> "ADD_REPLICATION_PEER");
}
@Override
public void removeReplicationPeer(String peerId) throws IOException {
- executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) {
- @Override
- protected Void rpcCall() throws Exception {
- master.removeReplicationPeer(getRpcController(),
- RequestConverter.buildRemoveReplicationPeerRequest(peerId));
- return null;
- }
- });
+ get(removeReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Future removeReplicationPeerAsync(String peerId) throws IOException {
+ RemoveReplicationPeerResponse response =
+ executeCallable(new MasterCallable(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected RemoveReplicationPeerResponse rpcCall() throws Exception {
+ return master.removeReplicationPeer(getRpcController(),
+ RequestConverter.buildRemoveReplicationPeerRequest(peerId));
+ }
+ });
+ return new ReplicationFuture(this, peerId, response.getProcId(),
+ () -> "REMOVE_REPLICATION_PEER");
}
@Override
public void enableReplicationPeer(final String peerId) throws IOException {
- executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) {
- @Override
- protected Void rpcCall() throws Exception {
- master.enableReplicationPeer(getRpcController(),
- RequestConverter.buildEnableReplicationPeerRequest(peerId));
- return null;
- }
- });
+ get(enableReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Future enableReplicationPeerAsync(final String peerId) throws IOException {
+ EnableReplicationPeerResponse response =
+ executeCallable(new MasterCallable(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected EnableReplicationPeerResponse rpcCall() throws Exception {
+ return master.enableReplicationPeer(getRpcController(),
+ RequestConverter.buildEnableReplicationPeerRequest(peerId));
+ }
+ });
+ return new ReplicationFuture(this, peerId, response.getProcId(),
+ () -> "ENABLE_REPLICATION_PEER");
}
@Override
public void disableReplicationPeer(final String peerId) throws IOException {
- executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) {
- @Override
- protected Void rpcCall() throws Exception {
- master.disableReplicationPeer(getRpcController(),
- RequestConverter.buildDisableReplicationPeerRequest(peerId));
- return null;
- }
- });
+ get(disableReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Future disableReplicationPeerAsync(final String peerId) throws IOException {
+ DisableReplicationPeerResponse response =
+ executeCallable(new MasterCallable(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected DisableReplicationPeerResponse rpcCall() throws Exception {
+ return master.disableReplicationPeer(getRpcController(),
+ RequestConverter.buildDisableReplicationPeerRequest(peerId));
+ }
+ });
+ return new ReplicationFuture(this, peerId, response.getProcId(),
+ () -> "DISABLE_REPLICATION_PEER");
}
@Override
@@ -3913,14 +3970,24 @@ public class HBaseAdmin implements Admin {
@Override
public void updateReplicationPeerConfig(final String peerId,
final ReplicationPeerConfig peerConfig) throws IOException {
- executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) {
- @Override
- protected Void rpcCall() throws Exception {
- master.updateReplicationPeerConfig(getRpcController(),
- RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig));
- return null;
- }
- });
+ get(updateReplicationPeerConfigAsync(peerId, peerConfig), this.syncWaitTimeout,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Future updateReplicationPeerConfigAsync(final String peerId,
+ final ReplicationPeerConfig peerConfig) throws IOException {
+ UpdateReplicationPeerConfigResponse response =
+ executeCallable(new MasterCallable(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected UpdateReplicationPeerConfigResponse rpcCall() throws Exception {
+ return master.updateReplicationPeerConfig(getRpcController(),
+ RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig));
+ }
+ });
+ return new ReplicationFuture(this, peerId, response.getProcId(),
+ () -> "UPDATE_REPLICATION_PEER_CONFIG");
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index a826f8cd0b4..050bfe22294 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -1558,47 +1559,34 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture addReplicationPeer(String peerId,
ReplicationPeerConfig peerConfig, boolean enabled) {
- return this
- . newMasterCaller()
- .action(
- (controller, stub) -> this
- . call(controller, stub,
- RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), (s,
- c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
+ return this. procedureCall(
+ RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
+ (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
+ new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
}
@Override
public CompletableFuture removeReplicationPeer(String peerId) {
- return this
- . newMasterCaller()
- .action(
- (controller, stub) -> this
- . call(controller,
- stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId),
- (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call();
+ return this. procedureCall(
+ RequestConverter.buildRemoveReplicationPeerRequest(peerId),
+ (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
+ new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
}
@Override
public CompletableFuture enableReplicationPeer(String peerId) {
- return this
- . newMasterCaller()
- .action(
- (controller, stub) -> this
- . call(controller,
- stub, RequestConverter.buildEnableReplicationPeerRequest(peerId),
- (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call();
+ return this. procedureCall(
+ RequestConverter.buildEnableReplicationPeerRequest(peerId),
+ (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
+ new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
}
@Override
public CompletableFuture disableReplicationPeer(String peerId) {
- return this
- . newMasterCaller()
- .action(
- (controller, stub) -> this
- . call(
- controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s,
- c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null))
- .call();
+ return this. procedureCall(
+ RequestConverter.buildDisableReplicationPeerRequest(peerId),
+ (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
+ new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
}
@Override
@@ -1617,13 +1605,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
public CompletableFuture updateReplicationPeerConfig(String peerId,
ReplicationPeerConfig peerConfig) {
return this
- . newMasterCaller()
- .action(
- (controller, stub) -> this
- . call(
- controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId,
- peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), (
- resp) -> null)).call();
+ . procedureCall(
+ RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
+ (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
+ (resp) -> resp.getProcId(),
+ new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
}
@Override
@@ -2582,6 +2568,30 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
+ private class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
+ private final String peerId;
+ private final Supplier getOperation;
+
+ ReplicationProcedureBiConsumer(String peerId, Supplier getOperation) {
+ this.peerId = peerId;
+ this.getOperation = getOperation;
+ }
+
+ String getDescription() {
+ return "Operation: " + getOperation.get() + ", peerId: " + peerId;
+ }
+
+ @Override
+ void onFinished() {
+ LOG.info(getDescription() + " completed");
+ }
+
+ @Override
+ void onError(Throwable error) {
+ LOG.info(getDescription() + " failed with " + error.getMessage());
+ }
+ }
+
private CompletableFuture waitProcedureResult(CompletableFuture procFuture) {
CompletableFuture future = new CompletableFuture<>();
procFuture.whenComplete((procId, error) -> {