Signed-off-by: Liangjun He <heliangjun@apache.org>
This commit is contained in:
parent
2ad3a804c8
commit
92a2868f37
|
@ -2975,6 +2975,34 @@ public interface Admin extends Abortable, Closeable {
|
|||
*/
|
||||
List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) throws IOException;
|
||||
|
||||
/**
|
||||
* Enable or disable replication peer modification.
|
||||
* <p/>
|
||||
* This is especially useful when you want to change the replication peer storage.
|
||||
* @param on {@code true} means enable, otherwise disable
|
||||
* @return the previous enable/disable state
|
||||
*/
|
||||
default boolean replicationPeerModificationSwitch(boolean on) throws IOException {
|
||||
return replicationPeerModificationSwitch(on, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Enable or disable replication peer modification.
|
||||
* <p/>
|
||||
* This is especially useful when you want to change the replication peer storage.
|
||||
* @param on {@code true} means enable, otherwise disable
|
||||
* @param drainProcedures if {@code true}, will wait until all the running replication peer
|
||||
* modification procedures finish
|
||||
* @return the previous enable/disable state
|
||||
*/
|
||||
boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures) throws IOException;
|
||||
|
||||
/**
|
||||
* Check whether replication peer modification is enabled.
|
||||
* @return {@code true} if modification is enabled, otherwise {@code false}
|
||||
*/
|
||||
boolean isReplicationPeerModificationEnabled() throws IOException;
|
||||
|
||||
/**
|
||||
* Mark region server(s) as decommissioned to prevent additional regions from getting assigned to
|
||||
* them. Optionally unload the regions on the servers. If there are multiple servers to be
|
||||
|
|
|
@ -793,10 +793,39 @@ public interface AsyncAdmin {
|
|||
* Check if a replication peer is enabled.
|
||||
* @param peerId id of replication peer to check
|
||||
* @return true if replication peer is enabled. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
* {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId);
|
||||
|
||||
/**
|
||||
* Enable or disable replication peer modification.
|
||||
* <p/>
|
||||
* This is especially useful when you want to change the replication peer storage.
|
||||
* @param on {@code true} means enable, otherwise disable
|
||||
* @return the previous enable/disable state wrapped by a {@link CompletableFuture}
|
||||
*/
|
||||
default CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on) {
|
||||
return replicationPeerModificationSwitch(on, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Enable or disable replication peer modification.
|
||||
* <p/>
|
||||
* This is especially useful when you want to change the replication peer storage.
|
||||
* @param on {@code true} means enable, otherwise disable
|
||||
* @param drainProcedures if {@code true}, will wait until all the running replication peer
|
||||
* modification procedures finish
|
||||
* @return the previous enable/disable state wrapped by a {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on, boolean drainProcedures);
|
||||
|
||||
/**
|
||||
* Check whether replication peer modification is enabled.
|
||||
* @return {@code true} if modification is enabled, otherwise {@code false}, wrapped by a
|
||||
* {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<Boolean> isReplicationPeerModificationEnabled();
|
||||
|
||||
/**
|
||||
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
|
||||
* taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique
|
||||
|
|
|
@ -490,6 +490,17 @@ class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
return wrap(rawAdmin.isReplicationPeerEnabled(peerId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
|
||||
boolean drainProcedures) {
|
||||
return wrap(rawAdmin.replicationPeerModificationSwitch(on, drainProcedures));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
|
||||
return wrap(rawAdmin.isReplicationPeerModificationEnabled());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
|
||||
return wrap(rawAdmin.snapshot(snapshot));
|
||||
|
|
|
@ -161,12 +161,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
|
||||
|
||||
|
@ -2035,6 +2041,27 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
FlushMasterStoreRequest request) throws ServiceException {
|
||||
return stub.flushMasterStore(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicationPeerModificationSwitchResponse replicationPeerModificationSwitch(
|
||||
RpcController controller, ReplicationPeerModificationSwitchRequest request)
|
||||
throws ServiceException {
|
||||
return stub.replicationPeerModificationSwitch(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetReplicationPeerModificationProceduresResponse
|
||||
getReplicationPeerModificationProcedures(RpcController controller,
|
||||
GetReplicationPeerModificationProceduresRequest request) throws ServiceException {
|
||||
return stub.getReplicationPeerModificationProcedures(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IsReplicationPeerModificationEnabledResponse isReplicationPeerModificationEnabled(
|
||||
RpcController controller, IsReplicationPeerModificationEnabledRequest request)
|
||||
throws ServiceException {
|
||||
return stub.isReplicationPeerModificationEnabled(controller, request);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -225,6 +225,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMaster
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
|
||||
|
@ -235,9 +236,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddRe
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
|
||||
|
||||
|
@ -4470,4 +4474,57 @@ public class HBaseAdmin implements Admin {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures)
|
||||
throws IOException {
|
||||
ReplicationPeerModificationSwitchRequest request =
|
||||
ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
|
||||
boolean prevOn =
|
||||
executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
|
||||
@Override
|
||||
protected Boolean rpcCall() throws Exception {
|
||||
return master.replicationPeerModificationSwitch(getRpcController(), request)
|
||||
.getPreviousValue();
|
||||
}
|
||||
});
|
||||
// if we do not need to wait all previous peer modification procedure done, or we are enabling
|
||||
// peer modification, just return here.
|
||||
if (!drainProcedures || on) {
|
||||
return prevOn;
|
||||
}
|
||||
// otherwise we need to wait until all previous peer modification procedure done
|
||||
for (int retry = 0;; retry++) {
|
||||
List<ProcedureProtos.Procedure> procs =
|
||||
executeCallable(new MasterCallable<List<ProcedureProtos.Procedure>>(getConnection(),
|
||||
getRpcControllerFactory()) {
|
||||
@Override
|
||||
protected List<ProcedureProtos.Procedure> rpcCall() throws Exception {
|
||||
return master
|
||||
.getReplicationPeerModificationProcedures(getRpcController(),
|
||||
GetReplicationPeerModificationProceduresRequest.getDefaultInstance())
|
||||
.getProcedureList();
|
||||
}
|
||||
});
|
||||
if (procs.isEmpty()) {
|
||||
return prevOn;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(ConnectionUtils.getPauseTime(pause, retry));
|
||||
} catch (InterruptedException e) {
|
||||
throw (IOException) new InterruptedIOException().initCause(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReplicationPeerModificationEnabled() throws IOException {
|
||||
return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
|
||||
@Override
|
||||
protected Boolean rpcCall() throws Exception {
|
||||
return master.isReplicationPeerModificationEnabled(getRpcController(),
|
||||
IsReplicationPeerModificationEnabledRequest.getDefaultInstance()).getEnabled();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -284,6 +284,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTa
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
|
||||
|
@ -299,12 +300,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
|
||||
|
@ -3708,6 +3715,74 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
return future;
|
||||
}
|
||||
|
||||
private void waitUntilAllReplicationPeerModificationProceduresDone(
|
||||
CompletableFuture<Boolean> future, boolean prevOn, int retries) {
|
||||
CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
|
||||
this.<List<ProcedureProtos.Procedure>> newMasterCaller()
|
||||
.action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
|
||||
GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
|
||||
controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
|
||||
(s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
|
||||
resp -> resp.getProcedureList()))
|
||||
.call();
|
||||
addListener(callFuture, (r, e) -> {
|
||||
if (e != null) {
|
||||
future.completeExceptionally(e);
|
||||
} else if (r.isEmpty()) {
|
||||
// we are done
|
||||
future.complete(prevOn);
|
||||
} else {
|
||||
// retry later to see if the procedures are done
|
||||
retryTimer.newTimeout(
|
||||
t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
|
||||
ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
|
||||
boolean drainProcedures) {
|
||||
ReplicationPeerModificationSwitchRequest request =
|
||||
ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
|
||||
CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
|
||||
.action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
|
||||
ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
|
||||
(s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
|
||||
resp -> resp.getPreviousValue()))
|
||||
.call();
|
||||
// if we do not need to wait all previous peer modification procedure done, or we are enabling
|
||||
// peer modification, just return here.
|
||||
if (!drainProcedures || on) {
|
||||
return callFuture;
|
||||
}
|
||||
// otherwise we need to wait until all previous peer modification procedure done
|
||||
CompletableFuture<Boolean> future = new CompletableFuture<>();
|
||||
addListener(callFuture, (prevOn, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
// even if the previous state is disabled, we still need to wait here, as there could be
|
||||
// another client thread which called this method just before us and have already changed the
|
||||
// state to off, but there are still peer modification procedures not finished, so we should
|
||||
// also wait here.
|
||||
waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
|
||||
return this.<Boolean> newMasterCaller()
|
||||
.action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
|
||||
IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
|
||||
IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
|
||||
(s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
|
||||
(resp) -> resp.getEnabled()))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
|
||||
CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
|
||||
|
|
|
@ -195,12 +195,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
|
||||
|
||||
|
@ -776,4 +782,25 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
|
|||
FlushMasterStoreRequest request) throws ServiceException {
|
||||
return stub.flushMasterStore(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicationPeerModificationSwitchResponse replicationPeerModificationSwitch(
|
||||
RpcController controller, ReplicationPeerModificationSwitchRequest request)
|
||||
throws ServiceException {
|
||||
return stub.replicationPeerModificationSwitch(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetReplicationPeerModificationProceduresResponse getReplicationPeerModificationProcedures(
|
||||
RpcController controller, GetReplicationPeerModificationProceduresRequest request)
|
||||
throws ServiceException {
|
||||
return stub.getReplicationPeerModificationProcedures(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IsReplicationPeerModificationEnabledResponse isReplicationPeerModificationEnabled(
|
||||
RpcController controller, IsReplicationPeerModificationEnabledRequest request)
|
||||
throws ServiceException {
|
||||
return stub.isReplicationPeerModificationEnabled(controller, request);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1129,6 +1129,15 @@ service MasterService {
|
|||
rpc IsReplicationPeerEnabled(GetReplicationPeerStateRequest)
|
||||
returns(GetReplicationPeerStateResponse);
|
||||
|
||||
rpc ReplicationPeerModificationSwitch(ReplicationPeerModificationSwitchRequest)
|
||||
returns(ReplicationPeerModificationSwitchResponse);
|
||||
|
||||
rpc GetReplicationPeerModificationProcedures(GetReplicationPeerModificationProceduresRequest)
|
||||
returns(GetReplicationPeerModificationProceduresResponse);
|
||||
|
||||
rpc IsReplicationPeerModificationEnabled(IsReplicationPeerModificationEnabledRequest)
|
||||
returns(IsReplicationPeerModificationEnabledResponse);
|
||||
|
||||
/** Returns a list of ServerNames marked as decommissioned. */
|
||||
rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest)
|
||||
returns(ListDecommissionedRegionServersResponse);
|
||||
|
|
|
@ -26,6 +26,7 @@ option java_generate_equals_and_hash = true;
|
|||
option optimize_for = SPEED;
|
||||
|
||||
import "HBase.proto";
|
||||
import "Procedure.proto";
|
||||
|
||||
message TableCF {
|
||||
optional TableName table_name = 1;
|
||||
|
@ -145,3 +146,29 @@ message GetReplicationPeerStateRequest {
|
|||
message GetReplicationPeerStateResponse {
|
||||
required bool is_enabled = 1;
|
||||
}
|
||||
|
||||
message ReplicationPeerModificationSwitchRequest {
|
||||
required bool on = 1;
|
||||
}
|
||||
|
||||
message ReplicationPeerModificationSwitchResponse {
|
||||
required bool previous_value = 1;
|
||||
}
|
||||
|
||||
message ReplicationPeerModificationState {
|
||||
required bool on = 1;
|
||||
}
|
||||
|
||||
message GetReplicationPeerModificationProceduresRequest {
|
||||
}
|
||||
|
||||
message GetReplicationPeerModificationProceduresResponse {
|
||||
repeated Procedure procedure = 1;
|
||||
}
|
||||
|
||||
message IsReplicationPeerModificationEnabledRequest {
|
||||
}
|
||||
|
||||
message IsReplicationPeerModificationEnabledResponse {
|
||||
required bool enabled = 1;
|
||||
}
|
||||
|
|
|
@ -52,11 +52,14 @@ public abstract class BooleanStateStore extends MasterStateStore {
|
|||
* Set the flag on/off.
|
||||
* @param on true if the flag should be on, false otherwise
|
||||
* @throws IOException if the operation fails
|
||||
* @return returns the previous state
|
||||
*/
|
||||
public synchronized void set(boolean on) throws IOException {
|
||||
public synchronized boolean set(boolean on) throws IOException {
|
||||
byte[] state = toByteArray(on);
|
||||
setState(state);
|
||||
boolean prevOn = this.on;
|
||||
this.on = on;
|
||||
return prevOn;
|
||||
}
|
||||
|
||||
protected abstract byte[] toByteArray(boolean on);
|
||||
|
|
|
@ -172,6 +172,7 @@ import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
|
|||
import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerModificationStateStore;
|
||||
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
|
||||
import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotCleanupStateStore;
|
||||
|
@ -450,6 +451,11 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
|
||||
private TaskGroup startupTaskGroup;
|
||||
|
||||
/**
|
||||
* Store whether we allow replication peer modification operations.
|
||||
*/
|
||||
private ReplicationPeerModificationStateStore replicationPeerModificationStateStore;
|
||||
|
||||
/**
|
||||
* Initializes the HMaster. The steps are as follows:
|
||||
* <p>
|
||||
|
@ -764,6 +770,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
|
||||
this.replicationPeerManager =
|
||||
ReplicationPeerManager.create(fileSystemManager.getFileSystem(), zooKeeper, conf, clusterId);
|
||||
this.replicationPeerModificationStateStore =
|
||||
new ReplicationPeerModificationStateStore(masterRegion);
|
||||
|
||||
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
|
||||
this.drainingServerTracker.start();
|
||||
|
@ -3785,6 +3793,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
|
||||
private long executePeerProcedure(ModifyPeerProcedure procedure) throws IOException {
|
||||
if (!isReplicationPeerModificationEnabled()) {
|
||||
throw new IOException("Replication peer modification disabled");
|
||||
}
|
||||
long procId = procedureExecutor.submitProcedure(procedure);
|
||||
procedure.getLatch().await();
|
||||
return procId;
|
||||
|
@ -3854,6 +3865,16 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
return peers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean replicationPeerModificationSwitch(boolean on) throws IOException {
|
||||
return replicationPeerModificationStateStore.set(on);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReplicationPeerModificationEnabled() {
|
||||
return replicationPeerModificationStateStore.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark region server(s) as decommissioned (previously called 'draining') to prevent additional
|
||||
* regions from getting assigned to them. Also unload the regions on the servers asynchronously.0
|
||||
|
@ -4232,5 +4253,4 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// initialize master side coprocessors before we start handling requests
|
||||
this.cpHost = new MasterCoprocessorHost(this, conf);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
|||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
|
||||
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
|
||||
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
|
||||
|
@ -363,12 +364,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
|
||||
|
@ -2173,6 +2180,56 @@ public class MasterRpcServices extends RSRpcServices
|
|||
return GetReplicationPeerStateResponse.newBuilder().setIsEnabled(isEnabled).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicationPeerModificationSwitchResponse replicationPeerModificationSwitch(
|
||||
RpcController controller, ReplicationPeerModificationSwitchRequest request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
master.checkInitialized();
|
||||
boolean prevValue = master.replicationPeerModificationSwitch(request.getOn());
|
||||
return ReplicationPeerModificationSwitchResponse.newBuilder().setPreviousValue(prevValue)
|
||||
.build();
|
||||
} catch (IOException ioe) {
|
||||
throw new ServiceException(ioe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetReplicationPeerModificationProceduresResponse getReplicationPeerModificationProcedures(
|
||||
RpcController controller, GetReplicationPeerModificationProceduresRequest request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
master.checkInitialized();
|
||||
GetReplicationPeerModificationProceduresResponse.Builder builder =
|
||||
GetReplicationPeerModificationProceduresResponse.newBuilder();
|
||||
for (Procedure<?> proc : master.getProcedures()) {
|
||||
if (proc.isFinished()) {
|
||||
continue;
|
||||
}
|
||||
if (!(proc instanceof AbstractPeerProcedure)) {
|
||||
continue;
|
||||
}
|
||||
builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc));
|
||||
}
|
||||
return builder.build();
|
||||
} catch (IOException ioe) {
|
||||
throw new ServiceException(ioe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IsReplicationPeerModificationEnabledResponse isReplicationPeerModificationEnabled(
|
||||
RpcController controller, IsReplicationPeerModificationEnabledRequest request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
master.checkInitialized();
|
||||
return IsReplicationPeerModificationEnabledResponse.newBuilder()
|
||||
.setEnabled(master.isReplicationPeerModificationEnabled()).build();
|
||||
} catch (IOException ioe) {
|
||||
throw new ServiceException(ioe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(
|
||||
RpcController controller, ListDecommissionedRegionServersRequest request)
|
||||
|
|
|
@ -373,6 +373,10 @@ public interface MasterServices extends Server {
|
|||
List<ReplicationPeerDescription> listReplicationPeers(String regex)
|
||||
throws ReplicationException, IOException;
|
||||
|
||||
boolean replicationPeerModificationSwitch(boolean on) throws IOException;
|
||||
|
||||
boolean isReplicationPeerModificationEnabled();
|
||||
|
||||
/** Returns {@link LockManager} to lock namespaces/tables/regions. */
|
||||
LockManager getLockManager();
|
||||
|
||||
|
|
|
@ -86,6 +86,10 @@ public abstract class MasterStateStore {
|
|||
}
|
||||
|
||||
private void tryMigrate(ZKWatcher watcher, String zkPath) throws IOException, KeeperException {
|
||||
if (zkPath == null) {
|
||||
// this means we do not store this state in zk, skip migrating
|
||||
return;
|
||||
}
|
||||
Result result = get();
|
||||
if (result.isEmpty()) {
|
||||
// migrate
|
||||
|
|
|
@ -90,4 +90,10 @@ public abstract class AbstractPeerProcedure<TState>
|
|||
super.deserializeStateData(serializer);
|
||||
peerId = serializer.deserialize(PeerProcedureStateData.class).getPeerId();
|
||||
}
|
||||
|
||||
protected final void checkPeerModificationEnabled(MasterProcedureEnv env) throws IOException {
|
||||
if (!env.getMasterServices().isReplicationPeerModificationEnabled()) {
|
||||
throw new IOException("Replication peer modification disabled");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -276,6 +276,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
|||
switch (state) {
|
||||
case PRE_PEER_MODIFICATION:
|
||||
try {
|
||||
checkPeerModificationEnabled(env);
|
||||
prePeerModification(env);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, "
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.master.BooleanStateStore;
|
||||
import org.apache.hadoop.hbase.master.region.MasterRegion;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||
|
||||
/**
|
||||
* Store the peer modification state.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationPeerModificationStateStore extends BooleanStateStore {
|
||||
|
||||
public static final String STATE_NAME = "replication_peer_modification_on";
|
||||
|
||||
public ReplicationPeerModificationStateStore(MasterRegion masterRegion)
|
||||
throws DeserializationException, IOException, KeeperException {
|
||||
super(masterRegion, STATE_NAME, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected byte[] toByteArray(boolean on) {
|
||||
ReplicationProtos.ReplicationPeerModificationState.Builder builder =
|
||||
ReplicationProtos.ReplicationPeerModificationState.newBuilder();
|
||||
builder.setOn(on);
|
||||
return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean parseFrom(byte[] bytes) throws DeserializationException {
|
||||
ProtobufUtil.expectPBMagicPrefix(bytes);
|
||||
ReplicationProtos.ReplicationPeerModificationState.Builder builder =
|
||||
ReplicationProtos.ReplicationPeerModificationState.newBuilder();
|
||||
try {
|
||||
int magicLen = ProtobufUtil.lengthOfPBMagic();
|
||||
ProtobufUtil.mergeFrom(builder, bytes, magicLen, bytes.length - magicLen);
|
||||
} catch (IOException e) {
|
||||
throw new DeserializationException(e);
|
||||
}
|
||||
return builder.build().getOn();
|
||||
}
|
||||
}
|
|
@ -17,12 +17,20 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
|
@ -61,4 +69,24 @@ public class TestAdmin4 extends TestAdminBase {
|
|||
assertEquals(-1, ZKUtil.checkExists(zkw,
|
||||
ZNodePaths.joinZNode(zkw.getZNodePaths().drainingZNode, serverName.getServerName())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationPeerModificationSwitch() throws Exception {
|
||||
assertTrue(ADMIN.isReplicationPeerModificationEnabled());
|
||||
try {
|
||||
// disable modification, should returns true as it is enabled by default and the above
|
||||
// assertion has confirmed it
|
||||
assertTrue(ADMIN.replicationPeerModificationSwitch(false));
|
||||
IOException error =
|
||||
assertThrows(IOException.class, () -> ADMIN.addReplicationPeer("peer", ReplicationPeerConfig
|
||||
.newBuilder().setClusterKey(TEST_UTIL.getClusterKey() + "-test").build()));
|
||||
assertThat(error.getCause().getMessage(),
|
||||
containsString("Replication peer modification disabled"));
|
||||
// enable again, and the previous value should be false
|
||||
assertFalse(ADMIN.replicationPeerModificationSwitch(true));
|
||||
} finally {
|
||||
// always reset to avoid mess up other tests
|
||||
ADMIN.replicationPeerModificationSwitch(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,13 +18,15 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.startsWith;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -105,6 +107,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
|
|||
queueStorage.removeQueue(serverName, queue);
|
||||
}
|
||||
}
|
||||
admin.replicationPeerModificationSwitch(true).join();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -519,7 +522,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Tests that admin api throws ReplicationPeerNotFoundException if peer doesn't exist.
|
||||
*/
|
||||
@Test
|
||||
|
@ -532,4 +535,19 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
|
|||
assertThat(e.getCause(), instanceOf(ReplicationPeerNotFoundException.class));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationPeerModificationSwitch() throws Exception {
|
||||
assertTrue(admin.isReplicationPeerModificationEnabled().get());
|
||||
// disable modification, should returns true as it is enabled by default and the above
|
||||
// assertion has confirmed it
|
||||
assertTrue(admin.replicationPeerModificationSwitch(false).get());
|
||||
ExecutionException error = assertThrows(ExecutionException.class, () -> admin
|
||||
.addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build())
|
||||
.get());
|
||||
assertThat(error.getCause().getMessage(),
|
||||
containsString("Replication peer modification disabled"));
|
||||
// enable again, and the previous value should be false
|
||||
assertFalse(admin.replicationPeerModificationSwitch(true).get());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -495,4 +495,14 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
long nonceGroup, long nonce) throws IOException {
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean replicationPeerModificationSwitch(boolean on) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReplicationPeerModificationEnabled() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,175 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.AsyncAdmin;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.FSReplicationPeerStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameter;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ MasterTests.class, LargeTests.class })
|
||||
public class TestDisablePeerModification {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestDisablePeerModification.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static volatile CountDownLatch ARRIVE;
|
||||
|
||||
private static volatile CountDownLatch RESUME;
|
||||
|
||||
public static final class MockPeerStorage extends FSReplicationPeerStorage {
|
||||
|
||||
public MockPeerStorage(FileSystem fs, Configuration conf) throws IOException {
|
||||
super(fs, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException {
|
||||
ARRIVE.countDown();
|
||||
try {
|
||||
RESUME.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new ReplicationException(e);
|
||||
}
|
||||
super.addPeer(peerId, peerConfig, enabled);
|
||||
}
|
||||
}
|
||||
|
||||
private static AsyncConnection CONN;
|
||||
|
||||
@Parameter
|
||||
public boolean async;
|
||||
|
||||
@Parameters(name = "{index}: async={0}")
|
||||
public static List<Object[]> params() {
|
||||
return Arrays.asList(new Object[] { true }, new Object[] { false });
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.getConfiguration().setClass(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
|
||||
MockPeerStorage.class, ReplicationPeerStorage.class);
|
||||
UTIL.startMiniCluster(1);
|
||||
CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws IOException {
|
||||
Closeables.close(CONN, true);
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUpBeforeTest() throws IOException {
|
||||
UTIL.getAdmin().replicationPeerModificationSwitch(true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDrainProcs() throws Exception {
|
||||
ARRIVE = new CountDownLatch(1);
|
||||
RESUME = new CountDownLatch(1);
|
||||
AsyncAdmin admin = CONN.getAdmin();
|
||||
ReplicationPeerConfig rpc =
|
||||
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
|
||||
.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build();
|
||||
CompletableFuture<Void> addFuture = admin.addReplicationPeer("test_peer_" + async, rpc);
|
||||
ARRIVE.await();
|
||||
|
||||
// we have a pending add peer procedure which has already passed the first state, let's issue a
|
||||
// peer modification switch request to disable peer modification and set drainProcs to true
|
||||
CompletableFuture<Boolean> switchFuture;
|
||||
if (async) {
|
||||
switchFuture = admin.replicationPeerModificationSwitch(false, true);
|
||||
} else {
|
||||
switchFuture = new CompletableFuture<>();
|
||||
ForkJoinPool.commonPool().submit(() -> {
|
||||
try {
|
||||
switchFuture.complete(UTIL.getAdmin().replicationPeerModificationSwitch(false, true));
|
||||
} catch (IOException e) {
|
||||
switchFuture.completeExceptionally(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// sleep a while, the switchFuture should not finish yet
|
||||
// the sleep is necessary as we can not join on the switchFuture, so there is no stable way to
|
||||
// make sure we have already changed the flag at master side, sleep a while is the most suitable
|
||||
// way here
|
||||
Thread.sleep(5000);
|
||||
assertFalse(switchFuture.isDone());
|
||||
|
||||
// also verify that we can not schedule a new peer modification procedure
|
||||
AddPeerProcedure proc = new AddPeerProcedure("failure", rpc, true);
|
||||
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().submitProcedure(proc);
|
||||
UTIL.waitFor(15000, () -> proc.isFinished());
|
||||
// make sure the procedure is failed because of peer modification disabled
|
||||
assertTrue(proc.isFailed());
|
||||
assertThat(proc.getException().getCause().getMessage(),
|
||||
containsString("Replication peer modification disabled"));
|
||||
|
||||
// sleep a while and check again, make sure the switchFuture is still not done
|
||||
Thread.sleep(5000);
|
||||
assertFalse(switchFuture.isDone());
|
||||
|
||||
// resume the add peer procedure and wait it done
|
||||
RESUME.countDown();
|
||||
addFuture.get();
|
||||
|
||||
// this time the switchFuture should be able to finish
|
||||
assertTrue(switchFuture.get());
|
||||
}
|
||||
}
|
|
@ -461,5 +461,21 @@ module Hbase
|
|||
|
||||
@admin.updateReplicationPeerConfig(id, builder.build)
|
||||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
# Enable/disable replication peer modification
|
||||
# Returns previous switch setting.
|
||||
def peer_modification_switch(enable_or_disable, drain_procs)
|
||||
@admin.replicationPeerModificationSwitch(
|
||||
java.lang.Boolean.valueOf(enable_or_disable), java.lang.Boolean.valueOf(drain_procs)
|
||||
)
|
||||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
# Query whether replication peer modification is enabled.
|
||||
# Returns whether replication peer modification is enabled (true is enabled).
|
||||
def peer_modification_enabled?
|
||||
@admin.isReplicationPeerModificationEnabled
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -525,6 +525,8 @@ Shell.load_command_group(
|
|||
get_peer_config
|
||||
list_peer_configs
|
||||
update_peer_config
|
||||
peer_modification_enabled
|
||||
peer_modification_switch
|
||||
]
|
||||
)
|
||||
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with this
|
||||
# work for additional information regarding copyright ownership. The ASF
|
||||
# licenses this file to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
# Prints whether peer modification operations are enabled
|
||||
class PeerModificationEnabled < Command
|
||||
def help
|
||||
<<~EOF
|
||||
Query whether peer modification operations are enabled
|
||||
Examples:
|
||||
|
||||
hbase> peer_modification_enabled
|
||||
EOF
|
||||
end
|
||||
|
||||
def command
|
||||
state = replication_admin.peer_modification_enabled?
|
||||
formatter.row([state.to_s])
|
||||
state
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,46 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
# Enable or disable peer modification operations
|
||||
class PeerModificationSwitch < Command
|
||||
def help
|
||||
<<~EOF
|
||||
Enable/Disable peer modification. Returns previous state.
|
||||
Examples:
|
||||
|
||||
hbase> peer_modification_switch true
|
||||
hbase> peer_modification_switch false, true
|
||||
|
||||
The second boolean parameter means whether you want to wait until all remaining peer modification
|
||||
finished, before the command returns.
|
||||
EOF
|
||||
end
|
||||
|
||||
def command(enable_or_disable, drain_procs = false)
|
||||
prev_state = !!replication_admin.peer_modification_switch(enable_or_disable, drain_procs)
|
||||
formatter.row(["Previous peer modification state : #{prev_state}"])
|
||||
prev_state
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -584,7 +584,7 @@ module Hbase
|
|||
assert_equal(0, command(:list_peers).length)
|
||||
end
|
||||
|
||||
define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do
|
||||
define_test 'set_peer_bandwidth: works with peer bandwidth upper limit' do
|
||||
cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test"
|
||||
args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
|
||||
command(:add_peer, @peer_id, args)
|
||||
|
@ -599,7 +599,7 @@ module Hbase
|
|||
command(:remove_peer, @peer_id)
|
||||
end
|
||||
|
||||
define_test "get_peer_config: works with simple clusterKey peer" do
|
||||
define_test 'get_peer_config: works with simple clusterKey peer' do
|
||||
cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test"
|
||||
args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
|
||||
command(:add_peer, @peer_id, args)
|
||||
|
@ -643,7 +643,7 @@ module Hbase
|
|||
command(:remove_peer, peer_id_second)
|
||||
end
|
||||
|
||||
define_test "update_peer_config: can update peer config and data" do
|
||||
define_test 'update_peer_config: can update peer config and data' do
|
||||
config_params = { "config1" => "value1", "config2" => "value2" }
|
||||
data_params = {"data1" => "value1", "data2" => "value2"}
|
||||
args = {ENDPOINT_CLASSNAME => @dummy_endpoint, CONFIG => config_params, DATA => data_params}
|
||||
|
@ -664,7 +664,7 @@ module Hbase
|
|||
assert_equal("value2", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data2"))))
|
||||
end
|
||||
|
||||
define_test "append_peer_exclude_namespaces: works with namespaces array" do
|
||||
define_test 'append_peer_exclude_namespaces: works with namespaces array' do
|
||||
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
|
||||
args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
|
||||
command(:add_peer, @peer_id, args)
|
||||
|
@ -700,7 +700,7 @@ module Hbase
|
|||
command(:remove_peer, @peer_id)
|
||||
end
|
||||
|
||||
define_test "remove_peer_exclude_namespaces: works with namespaces array" do
|
||||
define_test 'remove_peer_exclude_namespaces: works with namespaces array' do
|
||||
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
|
||||
args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
|
||||
command(:add_peer, @peer_id, args)
|
||||
|
@ -738,6 +738,20 @@ module Hbase
|
|||
command(:remove_peer, @peer_id)
|
||||
end
|
||||
|
||||
define_test 'peer_modification_switch' do
|
||||
command(:peer_modification_switch, true)
|
||||
output = capture_stdout { command(:peer_modification_enabled) }
|
||||
assert(output.include?('true'))
|
||||
|
||||
output = capture_stdout { command(:peer_modification_switch, false, true) }
|
||||
assert(output.include?('true'))
|
||||
output = capture_stdout { command(:peer_modification_enabled) }
|
||||
assert(output.include?('false'))
|
||||
|
||||
output = capture_stdout { command(:peer_modification_switch, true) }
|
||||
assert(output.include?('false'))
|
||||
end
|
||||
|
||||
# assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279
|
||||
# Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below.
|
||||
# define_test "add_peer: adding a second peer with same id should error" do
|
||||
|
|
|
@ -1481,4 +1481,17 @@ public class ThriftAdmin implements Admin {
|
|||
throw new NotImplementedException(
|
||||
"modifyTableStoreFileTrackerAsync not supported in ThriftAdmin");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures)
|
||||
throws IOException {
|
||||
throw new NotImplementedException(
|
||||
"replicationPeerModificationSwitch not supported in ThriftAdmin");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReplicationPeerModificationEnabled() throws IOException {
|
||||
throw new NotImplementedException(
|
||||
"isReplicationPeerModificationEnabled not supported in ThriftAdmin");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue