HBASE-19536 Client side changes for moving peer modification from zk watcher to procedure

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Guanghao Zhang 2017-12-19 15:50:57 +08:00 committed by zhangduo
parent 7f4bd0d371
commit 76a044c5d9
3 changed files with 238 additions and 80 deletions

View File

@ -2466,7 +2466,7 @@ public interface Admin extends Abortable, Closeable {
/**
* Add a new replication peer for replicating data to slave cluster.
* @param peerId a short name that identifies the peer
* @param peerConfig configuration for the replication slave cluster
* @param peerConfig configuration for the replication peer
* @throws IOException if a remote or network exception occurs
*/
default void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
@ -2477,13 +2477,44 @@ public interface Admin extends Abortable, Closeable {
/**
* Add a new replication peer for replicating data to slave cluster.
* @param peerId a short name that identifies the peer
* @param peerConfig configuration for the replication slave cluster
* @param peerConfig configuration for the replication peer
* @param enabled peer state, true if ENABLED and false if DISABLED
* @throws IOException if a remote or network exception occurs
*/
void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws IOException;
/**
* Add a new replication peer but does not block and wait for it.
* <p>
* You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
* ExecutionException if there was an error while executing the operation or TimeoutException in
* case the wait timeout was not long enough to allow the operation to complete.
* @param peerId a short name that identifies the peer
* @param peerConfig configuration for the replication peer
* @return the result of the async operation
* @throws IOException IOException if a remote or network exception occurs
*/
default Future<Void> addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig)
throws IOException {
return addReplicationPeerAsync(peerId, peerConfig, true);
}
/**
* Add a new replication peer but does not block and wait for it.
* <p>
* You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
* ExecutionException if there was an error while executing the operation or TimeoutException in
* case the wait timeout was not long enough to allow the operation to complete.
* @param peerId a short name that identifies the peer
* @param peerConfig configuration for the replication peer
* @param enabled peer state, true if ENABLED and false if DISABLED
* @return the result of the async operation
* @throws IOException IOException if a remote or network exception occurs
*/
Future<Void> addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig,
boolean enabled) throws IOException;
/**
* Remove a peer and stop the replication.
* @param peerId a short name that identifies the peer
@ -2491,6 +2522,18 @@ public interface Admin extends Abortable, Closeable {
*/
void removeReplicationPeer(String peerId) throws IOException;
/**
* Remove a replication peer but does not block and wait for it.
* <p>
* You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
* ExecutionException if there was an error while executing the operation or TimeoutException in
* case the wait timeout was not long enough to allow the operation to complete.
* @param peerId a short name that identifies the peer
* @return the result of the async operation
* @throws IOException IOException if a remote or network exception occurs
*/
Future<Void> removeReplicationPeerAsync(String peerId) throws IOException;
/**
* Restart the replication stream to the specified peer.
* @param peerId a short name that identifies the peer
@ -2498,6 +2541,18 @@ public interface Admin extends Abortable, Closeable {
*/
void enableReplicationPeer(String peerId) throws IOException;
/**
* Enable a replication peer but does not block and wait for it.
* <p>
* You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
* ExecutionException if there was an error while executing the operation or TimeoutException in
* case the wait timeout was not long enough to allow the operation to complete.
* @param peerId a short name that identifies the peer
* @return the result of the async operation
* @throws IOException IOException if a remote or network exception occurs
*/
Future<Void> enableReplicationPeerAsync(String peerId) throws IOException;
/**
* Stop the replication stream to the specified peer.
* @param peerId a short name that identifies the peer
@ -2505,6 +2560,18 @@ public interface Admin extends Abortable, Closeable {
*/
void disableReplicationPeer(String peerId) throws IOException;
/**
* Disable a replication peer but does not block and wait for it.
* <p>
* You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
* ExecutionException if there was an error while executing the operation or TimeoutException in
* case the wait timeout was not long enough to allow the operation to complete.
* @param peerId a short name that identifies the peer
* @return the result of the async operation
* @throws IOException IOException if a remote or network exception occurs
*/
Future<Void> disableReplicationPeerAsync(String peerId) throws IOException;
/**
* Returns the configured ReplicationPeerConfig for the specified peer.
* @param peerId a short name that identifies the peer
@ -2516,12 +2583,26 @@ public interface Admin extends Abortable, Closeable {
/**
* Update the peerConfig for the specified peer.
* @param peerId a short name that identifies the peer
* @param peerConfig new config for the peer
* @param peerConfig new config for the replication peer
* @throws IOException if a remote or network exception occurs
*/
void updateReplicationPeerConfig(String peerId,
ReplicationPeerConfig peerConfig) throws IOException;
/**
* Update the peerConfig for the specified peer but does not block and wait for it.
* <p>
* You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
* ExecutionException if there was an error while executing the operation or TimeoutException in
* case the wait timeout was not long enough to allow the operation to complete.
* @param peerId a short name that identifies the peer
* @param peerConfig new config for the replication peer
* @return the result of the async operation
* @throws IOException IOException if a remote or network exception occurs
*/
Future<Void> updateReplicationPeerConfigAsync(String peerId, ReplicationPeerConfig peerConfig)
throws IOException;
/**
* Append the replicable table column family config from the specified peer.
* @param id a short that identifies the cluster

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -200,7 +201,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTa
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
/**
@ -3772,6 +3778,25 @@ public class HBaseAdmin implements Admin {
}
}
@InterfaceAudience.Private
@InterfaceStability.Evolving
private static class ReplicationFuture extends ProcedureFuture<Void> {
private final String peerId;
private final Supplier<String> getOperation;
public ReplicationFuture(HBaseAdmin admin, String peerId, Long procId,
Supplier<String> getOperation) {
super(admin, procId);
this.peerId = peerId;
this.getOperation = getOperation;
}
@Override
public String toString() {
return "Operation: " + getOperation.get() + ", peerId: " + peerId;
}
}
@Override
public List<SecurityCapability> getSecurityCapabilities() throws IOException {
try {
@ -3844,50 +3869,82 @@ public class HBaseAdmin implements Admin {
@Override
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
protected Void rpcCall() throws Exception {
master.addReplicationPeer(getRpcController(),
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled));
return null;
}
});
get(addReplicationPeerAsync(peerId, peerConfig, enabled), this.syncWaitTimeout,
TimeUnit.MILLISECONDS);
}
@Override
public Future<Void> addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig,
boolean enabled) throws IOException {
AddReplicationPeerResponse response = executeCallable(
new MasterCallable<AddReplicationPeerResponse>(getConnection(), getRpcControllerFactory()) {
@Override
protected AddReplicationPeerResponse rpcCall() throws Exception {
return master.addReplicationPeer(getRpcController(),
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled));
}
});
return new ReplicationFuture(this, peerId, response.getProcId(), () -> "ADD_REPLICATION_PEER");
}
@Override
public void removeReplicationPeer(String peerId) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
protected Void rpcCall() throws Exception {
master.removeReplicationPeer(getRpcController(),
RequestConverter.buildRemoveReplicationPeerRequest(peerId));
return null;
}
});
get(removeReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
}
@Override
public Future<Void> removeReplicationPeerAsync(String peerId) throws IOException {
RemoveReplicationPeerResponse response =
executeCallable(new MasterCallable<RemoveReplicationPeerResponse>(getConnection(),
getRpcControllerFactory()) {
@Override
protected RemoveReplicationPeerResponse rpcCall() throws Exception {
return master.removeReplicationPeer(getRpcController(),
RequestConverter.buildRemoveReplicationPeerRequest(peerId));
}
});
return new ReplicationFuture(this, peerId, response.getProcId(),
() -> "REMOVE_REPLICATION_PEER");
}
@Override
public void enableReplicationPeer(final String peerId) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
protected Void rpcCall() throws Exception {
master.enableReplicationPeer(getRpcController(),
RequestConverter.buildEnableReplicationPeerRequest(peerId));
return null;
}
});
get(enableReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
}
@Override
public Future<Void> enableReplicationPeerAsync(final String peerId) throws IOException {
EnableReplicationPeerResponse response =
executeCallable(new MasterCallable<EnableReplicationPeerResponse>(getConnection(),
getRpcControllerFactory()) {
@Override
protected EnableReplicationPeerResponse rpcCall() throws Exception {
return master.enableReplicationPeer(getRpcController(),
RequestConverter.buildEnableReplicationPeerRequest(peerId));
}
});
return new ReplicationFuture(this, peerId, response.getProcId(),
() -> "ENABLE_REPLICATION_PEER");
}
@Override
public void disableReplicationPeer(final String peerId) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
protected Void rpcCall() throws Exception {
master.disableReplicationPeer(getRpcController(),
RequestConverter.buildDisableReplicationPeerRequest(peerId));
return null;
}
});
get(disableReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
}
@Override
public Future<Void> disableReplicationPeerAsync(final String peerId) throws IOException {
DisableReplicationPeerResponse response =
executeCallable(new MasterCallable<DisableReplicationPeerResponse>(getConnection(),
getRpcControllerFactory()) {
@Override
protected DisableReplicationPeerResponse rpcCall() throws Exception {
return master.disableReplicationPeer(getRpcController(),
RequestConverter.buildDisableReplicationPeerRequest(peerId));
}
});
return new ReplicationFuture(this, peerId, response.getProcId(),
() -> "DISABLE_REPLICATION_PEER");
}
@Override
@ -3906,14 +3963,24 @@ public class HBaseAdmin implements Admin {
@Override
public void updateReplicationPeerConfig(final String peerId,
final ReplicationPeerConfig peerConfig) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
protected Void rpcCall() throws Exception {
master.updateReplicationPeerConfig(getRpcController(),
RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig));
return null;
}
});
get(updateReplicationPeerConfigAsync(peerId, peerConfig), this.syncWaitTimeout,
TimeUnit.MILLISECONDS);
}
@Override
public Future<Void> updateReplicationPeerConfigAsync(final String peerId,
final ReplicationPeerConfig peerConfig) throws IOException {
UpdateReplicationPeerConfigResponse response =
executeCallable(new MasterCallable<UpdateReplicationPeerConfigResponse>(getConnection(),
getRpcControllerFactory()) {
@Override
protected UpdateReplicationPeerConfigResponse rpcCall() throws Exception {
return master.updateReplicationPeerConfig(getRpcController(),
RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig));
}
});
return new ReplicationFuture(this, peerId, response.getProcId(),
() -> "UPDATE_REPLICATION_PEER_CONFIG");
}
@Override

View File

@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -1525,47 +1526,34 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> addReplicationPeer(String peerId,
ReplicationPeerConfig peerConfig, boolean enabled) {
return this
.<Void> newMasterCaller()
.action(
(controller, stub) -> this
.<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub,
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), (s,
c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
(s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
}
@Override
public CompletableFuture<Void> removeReplicationPeer(String peerId) {
return this
.<Void> newMasterCaller()
.action(
(controller, stub) -> this
.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse, Void> call(controller,
stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId),
(s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call();
return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
RequestConverter.buildRemoveReplicationPeerRequest(peerId),
(s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
}
@Override
public CompletableFuture<Void> enableReplicationPeer(String peerId) {
return this
.<Void> newMasterCaller()
.action(
(controller, stub) -> this
.<EnableReplicationPeerRequest, EnableReplicationPeerResponse, Void> call(controller,
stub, RequestConverter.buildEnableReplicationPeerRequest(peerId),
(s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call();
return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
RequestConverter.buildEnableReplicationPeerRequest(peerId),
(s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
}
@Override
public CompletableFuture<Void> disableReplicationPeer(String peerId) {
return this
.<Void> newMasterCaller()
.action(
(controller, stub) -> this
.<DisableReplicationPeerRequest, DisableReplicationPeerResponse, Void> call(
controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s,
c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null))
.call();
return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
RequestConverter.buildDisableReplicationPeerRequest(peerId),
(s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
}
@Override
@ -1584,13 +1572,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
ReplicationPeerConfig peerConfig) {
return this
.<Void> newMasterCaller()
.action(
(controller, stub) -> this
.<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse, Void> call(
controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId,
peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), (
resp) -> null)).call();
.<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
(s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
(resp) -> resp.getProcId(),
new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
}
@Override
@ -2549,6 +2535,30 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
private class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
private final String peerId;
private final Supplier<String> getOperation;
ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
this.peerId = peerId;
this.getOperation = getOperation;
}
String getDescription() {
return "Operation: " + getOperation.get() + ", peerId: " + peerId;
}
@Override
void onFinished() {
LOG.info(getDescription() + " completed");
}
@Override
void onError(Throwable error) {
LOG.info(getDescription() + " failed with " + error.getMessage());
}
}
private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
CompletableFuture<Void> future = new CompletableFuture<>();
procFuture.whenComplete((procId, error) -> {