HBASE-19536 Client side changes for moving peer modification from zk watcher to procedure

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Guanghao Zhang 2017-12-19 15:50:57 +08:00 committed by zhangduo
parent 5e410d8140
commit 750b7d8dbe
3 changed files with 238 additions and 80 deletions

View File

@ -2473,7 +2473,7 @@ public interface Admin extends Abortable, Closeable {
/** /**
* Add a new replication peer for replicating data to slave cluster. * Add a new replication peer for replicating data to slave cluster.
* @param peerId a short name that identifies the peer * @param peerId a short name that identifies the peer
* @param peerConfig configuration for the replication slave cluster * @param peerConfig configuration for the replication peer
* @throws IOException if a remote or network exception occurs * @throws IOException if a remote or network exception occurs
*/ */
default void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) default void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
@ -2484,13 +2484,44 @@ public interface Admin extends Abortable, Closeable {
/** /**
* Add a new replication peer for replicating data to slave cluster. * Add a new replication peer for replicating data to slave cluster.
* @param peerId a short name that identifies the peer * @param peerId a short name that identifies the peer
* @param peerConfig configuration for the replication slave cluster * @param peerConfig configuration for the replication peer
* @param enabled peer state, true if ENABLED and false if DISABLED * @param enabled peer state, true if ENABLED and false if DISABLED
* @throws IOException if a remote or network exception occurs * @throws IOException if a remote or network exception occurs
*/ */
void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws IOException; throws IOException;
/**
* Add a new replication peer but does not block and wait for it.
* <p>
* You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
* ExecutionException if there was an error while executing the operation or TimeoutException in
* case the wait timeout was not long enough to allow the operation to complete.
* @param peerId a short name that identifies the peer
* @param peerConfig configuration for the replication peer
* @return the result of the async operation
* @throws IOException IOException if a remote or network exception occurs
*/
default Future<Void> addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig)
throws IOException {
return addReplicationPeerAsync(peerId, peerConfig, true);
}
/**
* Add a new replication peer but does not block and wait for it.
* <p>
* You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
* ExecutionException if there was an error while executing the operation or TimeoutException in
* case the wait timeout was not long enough to allow the operation to complete.
* @param peerId a short name that identifies the peer
* @param peerConfig configuration for the replication peer
* @param enabled peer state, true if ENABLED and false if DISABLED
* @return the result of the async operation
* @throws IOException IOException if a remote or network exception occurs
*/
Future<Void> addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig,
boolean enabled) throws IOException;
/** /**
* Remove a peer and stop the replication. * Remove a peer and stop the replication.
* @param peerId a short name that identifies the peer * @param peerId a short name that identifies the peer
@ -2498,6 +2529,18 @@ public interface Admin extends Abortable, Closeable {
*/ */
void removeReplicationPeer(String peerId) throws IOException; void removeReplicationPeer(String peerId) throws IOException;
/**
* Remove a replication peer but does not block and wait for it.
* <p>
* You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
* ExecutionException if there was an error while executing the operation or TimeoutException in
* case the wait timeout was not long enough to allow the operation to complete.
* @param peerId a short name that identifies the peer
* @return the result of the async operation
* @throws IOException IOException if a remote or network exception occurs
*/
Future<Void> removeReplicationPeerAsync(String peerId) throws IOException;
/** /**
* Restart the replication stream to the specified peer. * Restart the replication stream to the specified peer.
* @param peerId a short name that identifies the peer * @param peerId a short name that identifies the peer
@ -2505,6 +2548,18 @@ public interface Admin extends Abortable, Closeable {
*/ */
void enableReplicationPeer(String peerId) throws IOException; void enableReplicationPeer(String peerId) throws IOException;
/**
* Enable a replication peer but does not block and wait for it.
* <p>
* You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
* ExecutionException if there was an error while executing the operation or TimeoutException in
* case the wait timeout was not long enough to allow the operation to complete.
* @param peerId a short name that identifies the peer
* @return the result of the async operation
* @throws IOException IOException if a remote or network exception occurs
*/
Future<Void> enableReplicationPeerAsync(String peerId) throws IOException;
/** /**
* Stop the replication stream to the specified peer. * Stop the replication stream to the specified peer.
* @param peerId a short name that identifies the peer * @param peerId a short name that identifies the peer
@ -2512,6 +2567,18 @@ public interface Admin extends Abortable, Closeable {
*/ */
void disableReplicationPeer(String peerId) throws IOException; void disableReplicationPeer(String peerId) throws IOException;
/**
* Disable a replication peer but does not block and wait for it.
* <p>
* You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
* ExecutionException if there was an error while executing the operation or TimeoutException in
* case the wait timeout was not long enough to allow the operation to complete.
* @param peerId a short name that identifies the peer
* @return the result of the async operation
* @throws IOException IOException if a remote or network exception occurs
*/
Future<Void> disableReplicationPeerAsync(String peerId) throws IOException;
/** /**
* Returns the configured ReplicationPeerConfig for the specified peer. * Returns the configured ReplicationPeerConfig for the specified peer.
* @param peerId a short name that identifies the peer * @param peerId a short name that identifies the peer
@ -2523,12 +2590,26 @@ public interface Admin extends Abortable, Closeable {
/** /**
* Update the peerConfig for the specified peer. * Update the peerConfig for the specified peer.
* @param peerId a short name that identifies the peer * @param peerId a short name that identifies the peer
* @param peerConfig new config for the peer * @param peerConfig new config for the replication peer
* @throws IOException if a remote or network exception occurs * @throws IOException if a remote or network exception occurs
*/ */
void updateReplicationPeerConfig(String peerId, void updateReplicationPeerConfig(String peerId,
ReplicationPeerConfig peerConfig) throws IOException; ReplicationPeerConfig peerConfig) throws IOException;
/**
* Update the peerConfig for the specified peer but does not block and wait for it.
* <p>
* You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
* ExecutionException if there was an error while executing the operation or TimeoutException in
* case the wait timeout was not long enough to allow the operation to complete.
* @param peerId a short name that identifies the peer
* @param peerConfig new config for the replication peer
* @return the result of the async operation
* @throws IOException IOException if a remote or network exception occurs
*/
Future<Void> updateReplicationPeerConfigAsync(String peerId, ReplicationPeerConfig peerConfig)
throws IOException;
/** /**
* Append the replicable table column family config from the specified peer. * Append the replicable table column family config from the specified peer.
* @param id a short that identifies the cluster * @param id a short that identifies the cluster

View File

@ -1,4 +1,4 @@
/* /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -200,7 +201,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTa
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
/** /**
@ -3779,6 +3785,25 @@ public class HBaseAdmin implements Admin {
} }
} }
@InterfaceAudience.Private
@InterfaceStability.Evolving
private static class ReplicationFuture extends ProcedureFuture<Void> {
private final String peerId;
private final Supplier<String> getOperation;
public ReplicationFuture(HBaseAdmin admin, String peerId, Long procId,
Supplier<String> getOperation) {
super(admin, procId);
this.peerId = peerId;
this.getOperation = getOperation;
}
@Override
public String toString() {
return "Operation: " + getOperation.get() + ", peerId: " + peerId;
}
}
@Override @Override
public List<SecurityCapability> getSecurityCapabilities() throws IOException { public List<SecurityCapability> getSecurityCapabilities() throws IOException {
try { try {
@ -3851,50 +3876,82 @@ public class HBaseAdmin implements Admin {
@Override @Override
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws IOException { throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { get(addReplicationPeerAsync(peerId, peerConfig, enabled), this.syncWaitTimeout,
@Override TimeUnit.MILLISECONDS);
protected Void rpcCall() throws Exception { }
master.addReplicationPeer(getRpcController(),
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled)); @Override
return null; public Future<Void> addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig,
} boolean enabled) throws IOException {
}); AddReplicationPeerResponse response = executeCallable(
new MasterCallable<AddReplicationPeerResponse>(getConnection(), getRpcControllerFactory()) {
@Override
protected AddReplicationPeerResponse rpcCall() throws Exception {
return master.addReplicationPeer(getRpcController(),
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled));
}
});
return new ReplicationFuture(this, peerId, response.getProcId(), () -> "ADD_REPLICATION_PEER");
} }
@Override @Override
public void removeReplicationPeer(String peerId) throws IOException { public void removeReplicationPeer(String peerId) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { get(removeReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
@Override }
protected Void rpcCall() throws Exception {
master.removeReplicationPeer(getRpcController(), @Override
RequestConverter.buildRemoveReplicationPeerRequest(peerId)); public Future<Void> removeReplicationPeerAsync(String peerId) throws IOException {
return null; RemoveReplicationPeerResponse response =
} executeCallable(new MasterCallable<RemoveReplicationPeerResponse>(getConnection(),
}); getRpcControllerFactory()) {
@Override
protected RemoveReplicationPeerResponse rpcCall() throws Exception {
return master.removeReplicationPeer(getRpcController(),
RequestConverter.buildRemoveReplicationPeerRequest(peerId));
}
});
return new ReplicationFuture(this, peerId, response.getProcId(),
() -> "REMOVE_REPLICATION_PEER");
} }
@Override @Override
public void enableReplicationPeer(final String peerId) throws IOException { public void enableReplicationPeer(final String peerId) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { get(enableReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
@Override }
protected Void rpcCall() throws Exception {
master.enableReplicationPeer(getRpcController(), @Override
RequestConverter.buildEnableReplicationPeerRequest(peerId)); public Future<Void> enableReplicationPeerAsync(final String peerId) throws IOException {
return null; EnableReplicationPeerResponse response =
} executeCallable(new MasterCallable<EnableReplicationPeerResponse>(getConnection(),
}); getRpcControllerFactory()) {
@Override
protected EnableReplicationPeerResponse rpcCall() throws Exception {
return master.enableReplicationPeer(getRpcController(),
RequestConverter.buildEnableReplicationPeerRequest(peerId));
}
});
return new ReplicationFuture(this, peerId, response.getProcId(),
() -> "ENABLE_REPLICATION_PEER");
} }
@Override @Override
public void disableReplicationPeer(final String peerId) throws IOException { public void disableReplicationPeer(final String peerId) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { get(disableReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
@Override }
protected Void rpcCall() throws Exception {
master.disableReplicationPeer(getRpcController(), @Override
RequestConverter.buildDisableReplicationPeerRequest(peerId)); public Future<Void> disableReplicationPeerAsync(final String peerId) throws IOException {
return null; DisableReplicationPeerResponse response =
} executeCallable(new MasterCallable<DisableReplicationPeerResponse>(getConnection(),
}); getRpcControllerFactory()) {
@Override
protected DisableReplicationPeerResponse rpcCall() throws Exception {
return master.disableReplicationPeer(getRpcController(),
RequestConverter.buildDisableReplicationPeerRequest(peerId));
}
});
return new ReplicationFuture(this, peerId, response.getProcId(),
() -> "DISABLE_REPLICATION_PEER");
} }
@Override @Override
@ -3913,14 +3970,24 @@ public class HBaseAdmin implements Admin {
@Override @Override
public void updateReplicationPeerConfig(final String peerId, public void updateReplicationPeerConfig(final String peerId,
final ReplicationPeerConfig peerConfig) throws IOException { final ReplicationPeerConfig peerConfig) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { get(updateReplicationPeerConfigAsync(peerId, peerConfig), this.syncWaitTimeout,
@Override TimeUnit.MILLISECONDS);
protected Void rpcCall() throws Exception { }
master.updateReplicationPeerConfig(getRpcController(),
RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig)); @Override
return null; public Future<Void> updateReplicationPeerConfigAsync(final String peerId,
} final ReplicationPeerConfig peerConfig) throws IOException {
}); UpdateReplicationPeerConfigResponse response =
executeCallable(new MasterCallable<UpdateReplicationPeerConfigResponse>(getConnection(),
getRpcControllerFactory()) {
@Override
protected UpdateReplicationPeerConfigResponse rpcCall() throws Exception {
return master.updateReplicationPeerConfig(getRpcController(),
RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig));
}
});
return new ReplicationFuture(this, peerId, response.getProcId(),
() -> "UPDATE_REPLICATION_PEER_CONFIG");
} }
@Override @Override

View File

@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -1558,47 +1559,34 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override @Override
public CompletableFuture<Void> addReplicationPeer(String peerId, public CompletableFuture<Void> addReplicationPeer(String peerId,
ReplicationPeerConfig peerConfig, boolean enabled) { ReplicationPeerConfig peerConfig, boolean enabled) {
return this return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
.<Void> newMasterCaller() RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
.action( (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
(controller, stub) -> this new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
.<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub,
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), (s,
c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
} }
@Override @Override
public CompletableFuture<Void> removeReplicationPeer(String peerId) { public CompletableFuture<Void> removeReplicationPeer(String peerId) {
return this return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
.<Void> newMasterCaller() RequestConverter.buildRemoveReplicationPeerRequest(peerId),
.action( (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
(controller, stub) -> this new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse, Void> call(controller,
stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId),
(s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call();
} }
@Override @Override
public CompletableFuture<Void> enableReplicationPeer(String peerId) { public CompletableFuture<Void> enableReplicationPeer(String peerId) {
return this return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
.<Void> newMasterCaller() RequestConverter.buildEnableReplicationPeerRequest(peerId),
.action( (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
(controller, stub) -> this new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
.<EnableReplicationPeerRequest, EnableReplicationPeerResponse, Void> call(controller,
stub, RequestConverter.buildEnableReplicationPeerRequest(peerId),
(s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call();
} }
@Override @Override
public CompletableFuture<Void> disableReplicationPeer(String peerId) { public CompletableFuture<Void> disableReplicationPeer(String peerId) {
return this return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
.<Void> newMasterCaller() RequestConverter.buildDisableReplicationPeerRequest(peerId),
.action( (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
(controller, stub) -> this new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
.<DisableReplicationPeerRequest, DisableReplicationPeerResponse, Void> call(
controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s,
c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null))
.call();
} }
@Override @Override
@ -1617,13 +1605,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
ReplicationPeerConfig peerConfig) { ReplicationPeerConfig peerConfig) {
return this return this
.<Void> newMasterCaller() .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
.action( RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
(controller, stub) -> this (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
.<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse, Void> call( (resp) -> resp.getProcId(),
controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), (
resp) -> null)).call();
} }
@Override @Override
@ -2582,6 +2568,30 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
} }
} }
private class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
private final String peerId;
private final Supplier<String> getOperation;
ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
this.peerId = peerId;
this.getOperation = getOperation;
}
String getDescription() {
return "Operation: " + getOperation.get() + ", peerId: " + peerId;
}
@Override
void onFinished() {
LOG.info(getDescription() + " completed");
}
@Override
void onError(Throwable error) {
LOG.info(getDescription() + " failed with " + error.getMessage());
}
}
private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
procFuture.whenComplete((procId, error) -> { procFuture.whenComplete((procId, error) -> {