HBASE-27783 Implement a shell command to disable all peer modification (#5182)

Signed-off-by: Liangjun He <heliangjun@apache.org>
This commit is contained in:
Duo Zhang 2023-04-19 22:07:29 +08:00 committed by GitHub
parent 94a8f319bc
commit 398c5ef313
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 694 additions and 13 deletions

View File

@ -2186,6 +2186,34 @@ public interface Admin extends Abortable, Closeable {
*/
boolean isReplicationPeerEnabled(String peerId) throws IOException;
/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @return the previous enable/disable state
*/
default boolean replicationPeerModificationSwitch(boolean on) throws IOException {
return replicationPeerModificationSwitch(on, false);
}
/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @param drainProcedures if {@code true}, will wait until all the running replication peer
* modification procedures finish
* @return the previous enable/disable state
*/
boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures) throws IOException;
/**
* Check whether replication peer modification is enabled.
* @return {@code true} if modification is enabled, otherwise {@code false}
*/
boolean isReplicationPeerModificationEnabled() throws IOException;
/**
* Mark region server(s) as decommissioned to prevent additional regions from getting assigned to
* them. Optionally unload the regions on the servers. If there are multiple servers to be

View File

@ -891,6 +891,17 @@ class AdminOverAsyncAdmin implements Admin {
return get(admin.isReplicationPeerEnabled(peerId));
}
@Override
public boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures)
throws IOException {
return get(admin.replicationPeerModificationSwitch(on, drainProcedures));
}
@Override
public boolean isReplicationPeerModificationEnabled() throws IOException {
return get(admin.isReplicationPeerModificationEnabled());
}
@Override
public void decommissionRegionServers(List<ServerName> servers, boolean offload)
throws IOException {

View File

@ -815,10 +815,39 @@ public interface AsyncAdmin {
* Check if a replication peer is enabled.
* @param peerId id of replication peer to check
* @return true if replication peer is enabled. The return value will be wrapped by a
* {@link CompletableFuture}.
* {@link CompletableFuture}
*/
CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId);
/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @return the previous enable/disable state wrapped by a {@link CompletableFuture}
*/
default CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on) {
return replicationPeerModificationSwitch(on, false);
}
/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @param drainProcedures if {@code true}, will wait until all the running replication peer
* modification procedures finish
* @return the previous enable/disable state wrapped by a {@link CompletableFuture}
*/
CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on, boolean drainProcedures);
/**
* Check whether replication peer modification is enabled.
* @return {@code true} if modification is enabled, otherwise {@code false}, wrapped by a
* {@link CompletableFuture}
*/
CompletableFuture<Boolean> isReplicationPeerModificationEnabled();
/**
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
* taken. If the table is disabled, an offline snapshot is taken. Snapshots are taken sequentially

View File

@ -496,6 +496,17 @@ class AsyncHBaseAdmin implements AsyncAdmin {
return wrap(rawAdmin.isReplicationPeerEnabled(peerId));
}
@Override
public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
boolean drainProcedures) {
return wrap(rawAdmin.replicationPeerModificationSwitch(on, drainProcedures));
}
@Override
public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
return wrap(rawAdmin.isReplicationPeerModificationEnabled());
}
@Override
public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
return wrap(rawAdmin.snapshot(snapshot));

View File

@ -288,6 +288,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTa
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
@ -329,12 +330,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
@ -3774,6 +3781,74 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.call();
}
private void waitUntilAllReplicationPeerModificationProceduresDone(
CompletableFuture<Boolean> future, boolean prevOn, int retries) {
CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
this.<List<ProcedureProtos.Procedure>> newMasterCaller()
.action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
(s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
resp -> resp.getProcedureList()))
.call();
addListener(callFuture, (r, e) -> {
if (e != null) {
future.completeExceptionally(e);
} else if (r.isEmpty()) {
// we are done
future.complete(prevOn);
} else {
// retry later to see if the procedures are done
retryTimer.newTimeout(
t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
}
});
}
@Override
public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
boolean drainProcedures) {
ReplicationPeerModificationSwitchRequest request =
ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
.action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
(s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
resp -> resp.getPreviousValue()))
.call();
// if we do not need to wait all previous peer modification procedure done, or we are enabling
// peer modification, just return here.
if (!drainProcedures || on) {
return callFuture;
}
// otherwise we need to wait until all previous peer modification procedure done
CompletableFuture<Boolean> future = new CompletableFuture<>();
addListener(callFuture, (prevOn, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
// even if the previous state is disabled, we still need to wait here, as there could be
// another client thread which called this method just before us and have already changed the
// state to off, but there are still peer modification procedures not finished, so we should
// also wait here.
waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
});
return future;
}
@Override
public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
return this.<Boolean> newMasterCaller()
.action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
(s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
(resp) -> resp.getEnabled()))
.call();
}
@Override
public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();

View File

@ -1133,6 +1133,15 @@ service MasterService {
rpc IsReplicationPeerEnabled(GetReplicationPeerStateRequest)
returns(GetReplicationPeerStateResponse);
rpc ReplicationPeerModificationSwitch(ReplicationPeerModificationSwitchRequest)
returns(ReplicationPeerModificationSwitchResponse);
rpc GetReplicationPeerModificationProcedures(GetReplicationPeerModificationProceduresRequest)
returns(GetReplicationPeerModificationProceduresResponse);
rpc IsReplicationPeerModificationEnabled(IsReplicationPeerModificationEnabledRequest)
returns(IsReplicationPeerModificationEnabledResponse);
/** Returns a list of ServerNames marked as decommissioned. */
rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest)
returns(ListDecommissionedRegionServersResponse);

View File

@ -26,6 +26,7 @@ option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "HBase.proto";
import "server/Procedure.proto";
message TableCF {
optional TableName table_name = 1;
@ -165,6 +166,33 @@ message TransitReplicationPeerSyncReplicationStateResponse {
message GetReplicationPeerStateRequest {
required string peer_id = 1;
}
message GetReplicationPeerStateResponse {
required bool is_enabled = 1;
}
message ReplicationPeerModificationSwitchRequest {
required bool on = 1;
}
message ReplicationPeerModificationSwitchResponse {
required bool previous_value = 1;
}
message ReplicationPeerModificationState {
required bool on = 1;
}
message GetReplicationPeerModificationProceduresRequest {
}
message GetReplicationPeerModificationProceduresResponse {
repeated Procedure procedure = 1;
}
message IsReplicationPeerModificationEnabledRequest {
}
message IsReplicationPeerModificationEnabledResponse {
required bool enabled = 1;
}

View File

@ -52,11 +52,14 @@ public abstract class BooleanStateStore extends MasterStateStore {
* Set the flag on/off.
* @param on true if the flag should be on, false otherwise
* @throws IOException if the operation fails
* @return returns the previous state
*/
public synchronized void set(boolean on) throws IOException {
public synchronized boolean set(boolean on) throws IOException {
byte[] state = toByteArray(on);
setState(state);
boolean prevOn = this.on;
this.on = on;
return prevOn;
}
protected abstract byte[] toByteArray(boolean on);

View File

@ -173,6 +173,7 @@ import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerModificationStateStore;
import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
@ -468,6 +469,11 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
private TaskGroup startupTaskGroup;
/**
* Store whether we allow replication peer modification operations.
*/
private ReplicationPeerModificationStateStore replicationPeerModificationStateStore;
/**
* Initializes the HMaster. The steps are as follows:
* <p>
@ -785,6 +791,8 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
this.replicationPeerManager =
ReplicationPeerManager.create(fileSystemManager.getFileSystem(), zooKeeper, conf, clusterId);
this.replicationPeerModificationStateStore =
new ReplicationPeerModificationStateStore(masterRegion);
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
this.drainingServerTracker.start();
@ -3787,6 +3795,9 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
}
private long executePeerProcedure(AbstractPeerProcedure<?> procedure) throws IOException {
if (!isReplicationPeerModificationEnabled()) {
throw new IOException("Replication peer modification disabled");
}
long procId = procedureExecutor.submitProcedure(procedure);
procedure.getLatch().await();
return procId;
@ -3866,6 +3877,16 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
return executePeerProcedure(new TransitPeerSyncReplicationStateProcedure(peerId, state));
}
@Override
public boolean replicationPeerModificationSwitch(boolean on) throws IOException {
return replicationPeerModificationStateStore.set(on);
}
@Override
public boolean isReplicationPeerModificationEnabled() {
return replicationPeerModificationStateStore.get();
}
/**
* Mark region server(s) as decommissioned (previously called 'draining') to prevent additional
* regions from getting assigned to them. Also unload the regions on the servers asynchronously.0
@ -4290,5 +4311,4 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
// initialize master side coprocessors before we start handling requests
this.cpHost = new MasterCoprocessorHost(this, conf);
}
}

View File

@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.replication.AbstractPeerNoLockProcedure;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
@ -421,12 +422,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
@ -2170,6 +2177,56 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
return GetReplicationPeerStateResponse.newBuilder().setIsEnabled(isEnabled).build();
}
@Override
public ReplicationPeerModificationSwitchResponse replicationPeerModificationSwitch(
RpcController controller, ReplicationPeerModificationSwitchRequest request)
throws ServiceException {
try {
server.checkInitialized();
boolean prevValue = server.replicationPeerModificationSwitch(request.getOn());
return ReplicationPeerModificationSwitchResponse.newBuilder().setPreviousValue(prevValue)
.build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
@Override
public GetReplicationPeerModificationProceduresResponse getReplicationPeerModificationProcedures(
RpcController controller, GetReplicationPeerModificationProceduresRequest request)
throws ServiceException {
try {
server.checkInitialized();
GetReplicationPeerModificationProceduresResponse.Builder builder =
GetReplicationPeerModificationProceduresResponse.newBuilder();
for (Procedure<?> proc : server.getProcedures()) {
if (proc.isFinished()) {
continue;
}
if (!(proc instanceof AbstractPeerNoLockProcedure)) {
continue;
}
builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc));
}
return builder.build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
@Override
public IsReplicationPeerModificationEnabledResponse isReplicationPeerModificationEnabled(
RpcController controller, IsReplicationPeerModificationEnabledRequest request)
throws ServiceException {
try {
server.checkInitialized();
return IsReplicationPeerModificationEnabledResponse.newBuilder()
.setEnabled(server.isReplicationPeerModificationEnabled()).build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
@Override
public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(
RpcController controller, ListDecommissionedRegionServersRequest request)

View File

@ -390,6 +390,10 @@ public interface MasterServices extends Server {
long transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState clusterState)
throws ReplicationException, IOException;
boolean replicationPeerModificationSwitch(boolean on) throws IOException;
boolean isReplicationPeerModificationEnabled();
/** Returns {@link LockManager} to lock namespaces/tables/regions. */
LockManager getLockManager();

View File

@ -86,6 +86,10 @@ public abstract class MasterStateStore {
}
private void tryMigrate(ZKWatcher watcher, String zkPath) throws IOException, KeeperException {
if (zkPath == null) {
// this means we do not store this state in zk, skip migrating
return;
}
Result result = get();
if (result.isEmpty()) {
// migrate

View File

@ -164,4 +164,10 @@ public abstract class AbstractPeerProcedure<TState> extends AbstractPeerNoLockPr
queueStorage);
}
}
protected final void checkPeerModificationEnabled(MasterProcedureEnv env) throws IOException {
if (!env.getMasterServices().isReplicationPeerModificationEnabled()) {
throw new IOException("Replication peer modification disabled");
}
}
}

View File

@ -158,6 +158,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
switch (state) {
case PRE_PEER_MODIFICATION:
try {
checkPeerModificationEnabled(env);
prePeerModification(env);
} catch (IOException e) {
LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, "

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.BooleanStateStore;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
/**
* Store the peer modification state.
*/
@InterfaceAudience.Private
public class ReplicationPeerModificationStateStore extends BooleanStateStore {
public static final String STATE_NAME = "replication_peer_modification_on";
public ReplicationPeerModificationStateStore(MasterRegion masterRegion)
throws DeserializationException, IOException, KeeperException {
super(masterRegion, STATE_NAME, null, null);
}
@Override
protected byte[] toByteArray(boolean on) {
ReplicationProtos.ReplicationPeerModificationState.Builder builder =
ReplicationProtos.ReplicationPeerModificationState.newBuilder();
builder.setOn(on);
return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
}
@Override
protected boolean parseFrom(byte[] bytes) throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(bytes);
ReplicationProtos.ReplicationPeerModificationState.Builder builder =
ReplicationProtos.ReplicationPeerModificationState.newBuilder();
try {
int magicLen = ProtobufUtil.lengthOfPBMagic();
ProtobufUtil.mergeFrom(builder, bytes, magicLen, bytes.length - magicLen);
} catch (IOException e) {
throw new DeserializationException(e);
}
return builder.build().getOn();
}
}

View File

@ -236,6 +236,7 @@ public class TransitPeerSyncReplicationStateProcedure
switch (state) {
case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
try {
checkPeerModificationEnabled(env);
preTransit(env);
} catch (IOException e) {
LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} "

View File

@ -17,12 +17,20 @@
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -61,4 +69,24 @@ public class TestAdmin4 extends TestAdminBase {
assertEquals(-1, ZKUtil.checkExists(zkw,
ZNodePaths.joinZNode(zkw.getZNodePaths().drainingZNode, serverName.getServerName())));
}
@Test
public void testReplicationPeerModificationSwitch() throws Exception {
assertTrue(ADMIN.isReplicationPeerModificationEnabled());
try {
// disable modification, should returns true as it is enabled by default and the above
// assertion has confirmed it
assertTrue(ADMIN.replicationPeerModificationSwitch(false));
IOException error =
assertThrows(IOException.class, () -> ADMIN.addReplicationPeer("peer", ReplicationPeerConfig
.newBuilder().setClusterKey(TEST_UTIL.getClusterKey() + "-test").build()));
assertThat(error.getCause().getMessage(),
containsString("Replication peer modification disabled"));
// enable again, and the previous value should be false
assertFalse(ADMIN.replicationPeerModificationSwitch(true));
} finally {
// always reset to avoid mess up other tests
ADMIN.replicationPeerModificationSwitch(true);
}
}
}

View File

@ -18,13 +18,15 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -106,6 +108,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
queueStorage.removeQueue(serverName, queue);
}
}
admin.replicationPeerModificationSwitch(true).join();
}
@Test
@ -509,7 +512,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
}
}
/*
/**
* Tests that admin api throws ReplicationPeerNotFoundException if peer doesn't exist.
*/
@Test
@ -522,4 +525,19 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
assertThat(e.getCause(), instanceOf(ReplicationPeerNotFoundException.class));
}
}
@Test
public void testReplicationPeerModificationSwitch() throws Exception {
assertTrue(admin.isReplicationPeerModificationEnabled().get());
// disable modification, should returns true as it is enabled by default and the above
// assertion has confirmed it
assertTrue(admin.replicationPeerModificationSwitch(false).get());
ExecutionException error = assertThrows(ExecutionException.class, () -> admin
.addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build())
.get());
assertThat(error.getCause().getMessage(),
containsString("Replication peer modification disabled"));
// enable again, and the previous value should be false
assertFalse(admin.replicationPeerModificationSwitch(true).get());
}
}

View File

@ -518,4 +518,14 @@ public class MockNoopMasterServices implements MasterServices {
long nonceGroup, long nonce) throws IOException {
return -1;
}
@Override
public boolean replicationPeerModificationSwitch(boolean on) throws IOException {
return false;
}
@Override
public boolean isReplicationPeerModificationEnabled() {
return false;
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
import org.apache.hadoop.hbase.replication.FSReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, LargeTests.class })
public class TestDisablePeerModification {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestDisablePeerModification.class);
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
private static CountDownLatch ARRIVE = new CountDownLatch(1);
private static CountDownLatch RESUME = new CountDownLatch(1);
public static final class MockPeerStorage extends FSReplicationPeerStorage {
public MockPeerStorage(FileSystem fs, Configuration conf) throws IOException {
super(fs, conf);
}
@Override
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled,
SyncReplicationState syncReplicationState) throws ReplicationException {
ARRIVE.countDown();
try {
RESUME.await();
} catch (InterruptedException e) {
throw new ReplicationException(e);
}
super.addPeer(peerId, peerConfig, enabled, syncReplicationState);
}
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setClass(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
MockPeerStorage.class, ReplicationPeerStorage.class);
UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDown() throws IOException {
UTIL.shutdownMiniCluster();
}
@Test
public void testDrainProcs() throws Exception {
AsyncAdmin admin = UTIL.getAsyncConnection().getAdmin();
ReplicationPeerConfig rpc =
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build();
CompletableFuture<Void> addFuture = admin.addReplicationPeer("test_peer", rpc);
ARRIVE.await();
// we have a pending add peer procedure which has already passed the first state, let's issue a
// peer modification switch request to disable peer modification and set drainProcs to true
CompletableFuture<Boolean> switchFuture = admin.replicationPeerModificationSwitch(false, true);
// sleep a while, the switchFuture should not finish yet
// the sleep is necessary as we can not join on the switchFuture, so there is no stable way to
// make sure we have already changed the flag at master side, sleep a while is the most suitable
// way here
Thread.sleep(5000);
assertFalse(switchFuture.isDone());
// also verify that we can not schedule a new peer modification procedure
AddPeerProcedure proc = new AddPeerProcedure("failure", rpc, true);
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().submitProcedure(proc);
UTIL.waitFor(15000, () -> proc.isFinished());
// make sure the procedure is failed because of peer modification disabled
assertTrue(proc.isFailed());
assertThat(proc.getException().getCause().getMessage(),
containsString("Replication peer modification disabled"));
// sleep a while and check again, make sure the switchFuture is still not done
Thread.sleep(5000);
assertFalse(switchFuture.isDone());
// resume the add peer procedure and wait it done
RESUME.countDown();
addFuture.get();
// this time the switchFuture should be able to finish
assertTrue(switchFuture.get());
}
}

View File

@ -954,4 +954,15 @@ public class VerifyingRSGroupAdmin implements Admin, Closeable {
public void flushMasterStore() throws IOException {
admin.flushMasterStore();
}
@Override
public boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures)
throws IOException {
return admin.replicationPeerModificationSwitch(on, drainProcedures);
}
@Override
public boolean isReplicationPeerModificationEnabled() throws IOException {
return admin.isReplicationPeerModificationEnabled();
}
}

View File

@ -479,5 +479,21 @@ module Hbase
@admin.updateReplicationPeerConfig(id, builder.build)
end
#----------------------------------------------------------------------------------------------
# Enable/disable replication peer modification
# Returns previous switch setting.
def peer_modification_switch(enable_or_disable, drain_procs)
@admin.replicationPeerModificationSwitch(
java.lang.Boolean.valueOf(enable_or_disable), java.lang.Boolean.valueOf(drain_procs)
)
end
#----------------------------------------------------------------------------------------------
# Query whether replication peer modification is enabled.
# Returns whether replication peer modification is enabled (true is enabled).
def peer_modification_enabled?
@admin.isReplicationPeerModificationEnabled
end
end
end

View File

@ -526,6 +526,8 @@ Shell.load_command_group(
list_peer_configs
update_peer_config
transit_peer_sync_replication_state
peer_modification_enabled
peer_modification_switch
]
)

View File

@ -0,0 +1,40 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with this
# work for additional information regarding copyright ownership. The ASF
# licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# frozen_string_literal: true
module Shell
module Commands
# Prints whether peer modification operations are enabled
class PeerModificationEnabled < Command
def help
<<~EOF
Query whether peer modification operations are enabled
Examples:
hbase> peer_modification_enabled
EOF
end
def command
state = replication_admin.peer_modification_enabled?
formatter.row([state.to_s])
state
end
end
end
end

View File

@ -0,0 +1,46 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# frozen_string_literal: true
module Shell
module Commands
# Enable or disable peer modification operations
class PeerModificationSwitch < Command
def help
<<~EOF
Enable/Disable peer modification. Returns previous state.
Examples:
hbase> peer_modification_switch true
hbase> peer_modification_switch false, true
The second boolean parameter means whether you want to wait until all remaining peer modification
finished, before the command returns.
EOF
end
def command(enable_or_disable, drain_procs = false)
prev_state = !!replication_admin.peer_modification_switch(enable_or_disable, drain_procs)
formatter.row(["Previous peer modification state : #{prev_state}"])
prev_state
end
end
end
end

View File

@ -606,7 +606,7 @@ module Hbase
assert_equal(0, command(:list_peers).length)
end
define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do
define_test 'set_peer_bandwidth: works with peer bandwidth upper limit' do
cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test"
args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args)
@ -621,7 +621,7 @@ module Hbase
command(:remove_peer, @peer_id)
end
define_test "transit_peer_sync_replication_state: test" do
define_test 'transit_peer_sync_replication_state: test' do
cluster_key = "server1.cie.com:2181:/hbase"
remote_wal_dir = "hdfs://srv1:9999/hbase"
table_cfs = { "ns3:table1" => [], "ns3:table2" => [],
@ -652,7 +652,7 @@ module Hbase
command(:remove_peer, @peer_id)
end
define_test "get_peer_config: works with simple clusterKey peer" do
define_test 'get_peer_config: works with simple clusterKey peer' do
cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test"
args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args)
@ -696,7 +696,7 @@ module Hbase
command(:remove_peer, peer_id_second)
end
define_test "update_peer_config: can update peer config and data" do
define_test 'update_peer_config: can update peer config and data' do
config_params = { "config1" => "value1", "config2" => "value2" }
data_params = {"data1" => "value1", "data2" => "value2"}
args = {ENDPOINT_CLASSNAME => @dummy_endpoint, CONFIG => config_params, DATA => data_params}
@ -717,7 +717,7 @@ module Hbase
assert_equal("value2", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data2"))))
end
define_test "append_peer_exclude_namespaces: works with namespaces array" do
define_test 'append_peer_exclude_namespaces: works with namespaces array' do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args)
@ -753,7 +753,7 @@ module Hbase
command(:remove_peer, @peer_id)
end
define_test "remove_peer_exclude_namespaces: works with namespaces array" do
define_test 'remove_peer_exclude_namespaces: works with namespaces array' do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args)
@ -791,6 +791,20 @@ module Hbase
command(:remove_peer, @peer_id)
end
define_test 'peer_modification_switch' do
command(:peer_modification_switch, true)
output = capture_stdout { command(:peer_modification_enabled) }
assert(output.include?('true'))
output = capture_stdout { command(:peer_modification_switch, false, true) }
assert(output.include?('true'))
output = capture_stdout { command(:peer_modification_enabled) }
assert(output.include?('false'))
output = capture_stdout { command(:peer_modification_switch, true) }
assert(output.include?('false'))
end
# assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279
# Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below.
# define_test "add_peer: adding a second peer with same id should error" do

View File

@ -1329,4 +1329,17 @@ public class ThriftAdmin implements Admin {
public void flushMasterStore() throws IOException {
throw new NotImplementedException("flushMasterStore not supported in ThriftAdmin");
}
@Override
public boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures)
throws IOException {
throw new NotImplementedException(
"replicationPeerModificationSwitch not supported in ThriftAdmin");
}
@Override
public boolean isReplicationPeerModificationEnabled() throws IOException {
throw new NotImplementedException(
"isReplicationPeerModificationEnabled not supported in ThriftAdmin");
}
}