HBASE-19781 Add a new cluster state flag for synchronous replication

This commit is contained in:
Guanghao Zhang 2018-01-22 11:44:49 +08:00 committed by zhangduo
parent 274b813e12
commit 2acebac00e
40 changed files with 816 additions and 51 deletions

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
@ -2656,6 +2657,44 @@ public interface Admin extends Abortable, Closeable {
*/
List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) throws IOException;
/**
* Transit current cluster to a new state in a synchronous replication peer.
* @param peerId a short name that identifies the peer
* @param state a new state of current cluster
* @throws IOException if a remote or network exception occurs
*/
void transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state)
throws IOException;
/**
* Transit current cluster to a new state in a synchronous replication peer. But does not block
* and wait for it.
* <p>
* You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
* ExecutionException if there was an error while executing the operation or TimeoutException in
* case the wait timeout was not long enough to allow the operation to complete.
* @param peerId a short name that identifies the peer
* @param state a new state of current cluster
* @throws IOException if a remote or network exception occurs
*/
Future<Void> transitReplicationPeerSyncReplicationStateAsync(String peerId,
SyncReplicationState state) throws IOException;
/**
* Get the current cluster state in a synchronous replication peer.
* @param peerId a short name that identifies the peer
* @return the current cluster state
* @throws IOException if a remote or network exception occurs
*/
default SyncReplicationState getReplicationPeerSyncReplicationState(String peerId)
throws IOException {
List<ReplicationPeerDescription> peers = listReplicationPeers(Pattern.compile(peerId));
if (peers.isEmpty() || !peers.get(0).getPeerId().equals(peerId)) {
throw new IOException("Replication peer " + peerId + " does not exist");
}
return peers.get(0).getSyncReplicationState();
}
/**
* Mark region server(s) as decommissioned to prevent additional regions from getting
* assigned to them. Optionally unload the regions on the servers. If there are multiple servers

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import com.google.protobuf.RpcChannel;
import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.quotas.QuotaFilter;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -599,6 +601,35 @@ public interface AsyncAdmin {
CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
ReplicationPeerConfig peerConfig);
/**
* Transit current cluster to a new state in a synchronous replication peer.
* @param peerId a short name that identifies the peer
* @param state a new state of current cluster
*/
CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
SyncReplicationState state);
/**
* Get the current cluster state in a synchronous replication peer.
* @param peerId a short name that identifies the peer
* @return the current cluster state wrapped by a {@link CompletableFuture}.
*/
default CompletableFuture<SyncReplicationState>
getReplicationPeerSyncReplicationState(String peerId) {
CompletableFuture<SyncReplicationState> future = new CompletableFuture<>();
listReplicationPeers(Pattern.compile(peerId)).whenComplete((peers, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (peers.isEmpty() || !peers.get(0).getPeerId().equals(peerId)) {
future.completeExceptionally(
new IOException("Replication peer " + peerId + " does not exist"));
} else {
future.complete(peers.get(0).getSyncReplicationState());
}
});
return future;
}
/**
* Append the replicable table-cf config of the specified peer
* @param peerId a short that identifies the cluster

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.quotas.QuotaFilter;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -413,6 +414,12 @@ class AsyncHBaseAdmin implements AsyncAdmin {
return wrap(rawAdmin.updateReplicationPeerConfig(peerId, peerConfig));
}
@Override
public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
SyncReplicationState clusterState) {
return wrap(rawAdmin.transitReplicationPeerSyncReplicationState(peerId, clusterState));
}
@Override
public CompletableFuture<Void> appendReplicationPeerTableCFs(String peerId,
Map<TableName, List<String>> tableCfs) {

View File

@ -124,6 +124,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
@ -1717,6 +1719,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
MasterProtos.ClearDeadServersRequest request) throws ServiceException {
return stub.clearDeadServers(controller, request);
}
@Override
public TransitReplicationPeerSyncReplicationStateResponse
transitReplicationPeerSyncReplicationState(RpcController controller,
TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException {
return stub.transitReplicationPeerSyncReplicationState(controller, request);
}
};
}

View File

@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
@ -206,6 +207,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Disab
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
@ -4013,6 +4015,30 @@ public class HBaseAdmin implements Admin {
() -> "UPDATE_REPLICATION_PEER_CONFIG");
}
@Override
public void transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state)
throws IOException {
get(transitReplicationPeerSyncReplicationStateAsync(peerId, state), this.syncWaitTimeout,
TimeUnit.MILLISECONDS);
}
@Override
public Future<Void> transitReplicationPeerSyncReplicationStateAsync(String peerId,
SyncReplicationState state) throws IOException {
TransitReplicationPeerSyncReplicationStateResponse response =
executeCallable(new MasterCallable<TransitReplicationPeerSyncReplicationStateResponse>(
getConnection(), getRpcControllerFactory()) {
@Override
protected TransitReplicationPeerSyncReplicationStateResponse rpcCall() throws Exception {
return master.transitReplicationPeerSyncReplicationState(getRpcController(),
RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
state));
}
});
return new ReplicationFuture(this, peerId, response.getProcId(),
() -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE");
}
@Override
public void appendReplicationPeerTableCFs(String id,
Map<TableName, List<String>> tableCfs)

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
@ -255,6 +256,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
@ -1612,6 +1615,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
}
@Override
public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
SyncReplicationState clusterState) {
return this
.<TransitReplicationPeerSyncReplicationStateRequest, TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
clusterState),
(s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
(resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
() -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
}
@Override
public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
Map<TableName, List<String>> tableCfs) {

View File

@ -166,6 +166,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
@ -638,4 +640,11 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
throws ServiceException {
return stub.splitRegion(controller, request);
}
@Override
public TransitReplicationPeerSyncReplicationStateResponse
transitReplicationPeerSyncReplicationState(RpcController controller,
TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException {
return stub.transitReplicationPeerSyncReplicationState(controller, request);
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@ -396,25 +397,28 @@ public final class ReplicationPeerConfigUtil {
return ProtobufUtil.prependPBMagic(bytes);
}
public static ReplicationPeerDescription toReplicationPeerDescription(
ReplicationProtos.ReplicationPeerDescription desc) {
boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == desc.getState()
.getState();
public static ReplicationPeerDescription
toReplicationPeerDescription(ReplicationProtos.ReplicationPeerDescription desc) {
boolean enabled =
ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState();
ReplicationPeerConfig config = convert(desc.getConfig());
return new ReplicationPeerDescription(desc.getId(), enabled, config);
return new ReplicationPeerDescription(desc.getId(), enabled, config,
SyncReplicationState.valueOf(desc.getSyncReplicationState().getNumber()));
}
public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription(
ReplicationPeerDescription desc) {
public static ReplicationProtos.ReplicationPeerDescription
toProtoReplicationPeerDescription(ReplicationPeerDescription desc) {
ReplicationProtos.ReplicationPeerDescription.Builder builder =
ReplicationProtos.ReplicationPeerDescription.newBuilder();
builder.setId(desc.getPeerId());
ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState
.newBuilder();
stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED
: ReplicationProtos.ReplicationState.State.DISABLED);
ReplicationProtos.ReplicationState.Builder stateBuilder =
ReplicationProtos.ReplicationState.newBuilder();
stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED :
ReplicationProtos.ReplicationState.State.DISABLED);
builder.setState(stateBuilder.build());
builder.setConfig(convert(desc.getPeerConfig()));
builder.setSyncReplicationState(
ReplicationProtos.SyncReplicationState.forNumber(desc.getSyncReplicationState().ordinal()));
return builder.build();
}

View File

@ -28,11 +28,14 @@ public class ReplicationPeerDescription {
private final String id;
private final boolean enabled;
private final ReplicationPeerConfig config;
private final SyncReplicationState syncReplicationState;
public ReplicationPeerDescription(String id, boolean enabled, ReplicationPeerConfig config) {
public ReplicationPeerDescription(String id, boolean enabled, ReplicationPeerConfig config,
SyncReplicationState syncReplicationState) {
this.id = id;
this.enabled = enabled;
this.config = config;
this.syncReplicationState = syncReplicationState;
}
public String getPeerId() {
@ -47,11 +50,16 @@ public class ReplicationPeerDescription {
return this.config;
}
public SyncReplicationState getSyncReplicationState() {
return this.syncReplicationState;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder("id : ").append(id);
builder.append(", enabled : " + enabled);
builder.append(", config : " + config);
builder.append(", syncReplicationState : " + syncReplicationState);
return builder.toString();
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Used by synchronous replication. Indicate the state of the current cluster in a synchronous
* replication peer. The state may be one of {@link SyncReplicationState#ACTIVE},
* {@link SyncReplicationState#DOWNGRADE_ACTIVE} or
* {@link SyncReplicationState#STANDBY}.
* <p>
* For asynchronous replication, the state is {@link SyncReplicationState#NONE}.
*/
@InterfaceAudience.Public
public enum SyncReplicationState {
NONE, ACTIVE, DOWNGRADE_ACTIVE, STANDBY;
public static SyncReplicationState valueOf(int value) {
switch (value) {
case 0:
return NONE;
case 1:
return ACTIVE;
case 2:
return DOWNGRADE_ACTIVE;
case 3:
return STANDBY;
default:
throw new IllegalArgumentException("Unknown synchronous replication state " + value);
}
}
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
@ -147,6 +148,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
/**
@ -1867,4 +1869,12 @@ public final class RequestConverter {
}
return pbServers;
}
public static TransitReplicationPeerSyncReplicationStateRequest
buildTransitReplicationPeerSyncReplicationStateRequest(String peerId,
SyncReplicationState state) {
return TransitReplicationPeerSyncReplicationStateRequest.newBuilder().setPeerId(peerId)
.setSyncReplicationState(ReplicationProtos.SyncReplicationState.forNumber(state.ordinal()))
.build();
}
}

View File

@ -962,6 +962,10 @@ service MasterService {
rpc ListReplicationPeers(ListReplicationPeersRequest)
returns(ListReplicationPeersResponse);
/** Transit the state of current cluster in a synchronous replication peer */
rpc TransitReplicationPeerSyncReplicationState(TransitReplicationPeerSyncReplicationStateRequest)
returns(TransitReplicationPeerSyncReplicationStateResponse);
/** Returns a list of ServerNames marked as decommissioned. */
rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest)
returns(ListDecommissionedRegionServersResponse);

View File

@ -457,3 +457,7 @@ enum InitMetaState {
message InitMetaStateData {
}
message TransitPeerSyncReplicationStateStateData {
required SyncReplicationState syncReplicationState = 1;
}

View File

@ -63,6 +63,16 @@ message ReplicationState {
required State state = 1;
}
/**
* Indicate the state of the current cluster in a synchronous replication peer.
*/
enum SyncReplicationState {
NONE = 0;
ACTIVE = 1;
DOWNGRADE_ACTIVE = 2;
STANDBY = 3;
}
/**
* Used by replication. Description of the replication peer.
*/
@ -70,6 +80,7 @@ message ReplicationPeerDescription {
required string id = 1;
required ReplicationState state = 2;
required ReplicationPeer config = 3;
optional SyncReplicationState syncReplicationState = 4;
}
/**
@ -138,3 +149,12 @@ message ListReplicationPeersRequest {
message ListReplicationPeersResponse {
repeated ReplicationPeerDescription peer_desc = 1;
}
message TransitReplicationPeerSyncReplicationStateRequest {
required string peer_id = 1;
required SyncReplicationState syncReplicationState = 2;
}
message TransitReplicationPeerSyncReplicationStateResponse {
required uint64 proc_id = 1;
}

View File

@ -31,8 +31,8 @@ public interface ReplicationPeerStorage {
* Add a replication peer.
* @throws ReplicationException if there are errors accessing the storage service.
*/
void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException;
void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled,
SyncReplicationState syncReplicationState) throws ReplicationException;
/**
* Remove a replication peer.
@ -70,4 +70,18 @@ public interface ReplicationPeerStorage {
* @throws ReplicationException if there are errors accessing the storage service.
*/
ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException;
/**
* Set the state of current cluster in a synchronous replication peer.
* @throws ReplicationException if there are errors accessing the storage service.
*/
void setPeerSyncReplicationState(String peerId, SyncReplicationState state)
throws ReplicationException;
/**
* Get the state of current cluster in a synchronous replication peer.
* @throws ReplicationException if there are errors accessing the storage service.
*/
SyncReplicationState getPeerSyncReplicationState(String peerId)
throws ReplicationException;
}

View File

@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;

View File

@ -23,12 +23,14 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
@ -51,6 +53,8 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
public static final byte[] DISABLED_ZNODE_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
public static final String SYNCHRONOUS_REPLICATION_STATE_ZNODE = "sync-rep-state";
/**
* The name of the znode that contains the replication status of a remote slave (i.e. peer)
* cluster.
@ -79,21 +83,29 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
return ZNodePaths.joinZNode(peersZNode, peerId);
}
@VisibleForTesting
public String getSyncReplicationStateNode(String peerId) {
return ZNodePaths.joinZNode(getPeerNode(peerId), SYNCHRONOUS_REPLICATION_STATE_ZNODE);
}
@Override
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException {
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled,
SyncReplicationState syncReplicationState) throws ReplicationException {
List<ZKUtilOp> multiOps = Arrays.asList(
ZKUtilOp.createAndFailSilent(getPeerNode(peerId),
ReplicationPeerConfigUtil.toByteArray(peerConfig)),
ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES),
ZKUtilOp.createAndFailSilent(getSyncReplicationStateNode(peerId),
Bytes.toBytes(syncReplicationState.ordinal())));
try {
ZKUtil.createWithParents(zookeeper, peersZNode);
ZKUtil.multiOrSequential(zookeeper,
Arrays.asList(
ZKUtilOp.createAndFailSilent(getPeerNode(peerId),
ReplicationPeerConfigUtil.toByteArray(peerConfig)),
ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)),
false);
ZKUtil.multiOrSequential(zookeeper, multiOps, false);
} catch (KeeperException e) {
throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>"
+ peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
throw new ReplicationException(
"Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state=" +
(enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState,
e);
}
}
@ -167,4 +179,31 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
"Failed to parse replication peer config for peer with id=" + peerId, e);
}
}
@Override
public void setPeerSyncReplicationState(String peerId, SyncReplicationState clusterState)
throws ReplicationException {
byte[] clusterStateBytes = Bytes.toBytes(clusterState.ordinal());
try {
ZKUtil.setData(zookeeper, getSyncReplicationStateNode(peerId), clusterStateBytes);
} catch (KeeperException e) {
throw new ReplicationException(
"Unable to change the cluster state for the synchronous replication peer with id=" +
peerId,
e);
}
}
@Override
public SyncReplicationState getPeerSyncReplicationState(String peerId)
throws ReplicationException {
byte[] data;
try {
data = ZKUtil.getData(zookeeper, getSyncReplicationStateNode(peerId));
} catch (KeeperException | InterruptedException e) {
throw new ReplicationException(
"Error getting cluster state for the synchronous replication peer with id=" + peerId, e);
}
return SyncReplicationState.valueOf(Bytes.toInt(data));
}
}

View File

@ -166,7 +166,8 @@ public abstract class TestReplicationStateBasic {
assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
rp.getPeerStorage().addPeer(ID_ONE,
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
SyncReplicationState.NONE);
rqs.addPeerToHFileRefs(ID_ONE);
rqs.addHFileRefs(ID_ONE, files1);
assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
@ -189,10 +190,12 @@ public abstract class TestReplicationStateBasic {
public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
rp.init();
rp.getPeerStorage().addPeer(ID_ONE,
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
SyncReplicationState.NONE);
rqs.addPeerToHFileRefs(ID_ONE);
rp.getPeerStorage().addPeer(ID_TWO,
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true);
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true,
SyncReplicationState.NONE);
rqs.addPeerToHFileRefs(ID_TWO);
List<Pair<Path, Path>> files1 = new ArrayList<>(3);
@ -241,9 +244,13 @@ public abstract class TestReplicationStateBasic {
assertNumberOfPeers(0);
// Add some peers
rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
rp.getPeerStorage().addPeer(ID_ONE,
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
SyncReplicationState.NONE);
assertNumberOfPeers(1);
rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
rp.getPeerStorage().addPeer(ID_TWO,
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true,
SyncReplicationState.NONE);
assertNumberOfPeers(2);
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
@ -253,7 +260,9 @@ public abstract class TestReplicationStateBasic {
assertNumberOfPeers(1);
// Add one peer
rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
rp.getPeerStorage().addPeer(ID_ONE,
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
SyncReplicationState.NONE);
rp.addPeer(ID_ONE);
assertNumberOfPeers(2);
assertTrue(rp.getPeer(ID_ONE).isPeerEnabled());
@ -365,7 +374,7 @@ public abstract class TestReplicationStateBasic {
// Add peers for the corresponding queues so they are not orphans
rp.getPeerStorage().addPeer("qId" + i,
ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(),
true);
true, SyncReplicationState.NONE);
}
}
}

View File

@ -87,8 +87,9 @@ public class TestZKReplicationPeerStorage {
Random rand = new Random(seed);
return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong()))
.setReplicationEndpointImpl(Long.toHexString(rand.nextLong()))
.setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand))
.setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean())
.setRemoteWALDir(Long.toHexString(rand.nextLong())).setNamespaces(randNamespaces(rand))
.setExcludeNamespaces(randNamespaces(rand)).setTableCFsMap(randTableCFs(rand))
.setExcludeTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean())
.setBandwidth(rand.nextInt(1000)).build();
}
@ -139,7 +140,8 @@ public class TestZKReplicationPeerStorage {
public void test() throws ReplicationException {
int peerCount = 10;
for (int i = 0; i < peerCount; i++) {
STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0);
STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0,
SyncReplicationState.valueOf(i % 4));
}
List<String> peerIds = STORAGE.listPeerIds();
assertEquals(peerCount, peerIds.size());
@ -163,6 +165,10 @@ public class TestZKReplicationPeerStorage {
for (int i = 0; i < peerCount; i++) {
assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
}
for (int i = 0; i < peerCount; i++) {
assertEquals(SyncReplicationState.valueOf(i % 4),
STORAGE.getPeerSyncReplicationState(Integer.toString(i)));
}
String toRemove = Integer.toString(peerCount / 2);
STORAGE.removePeer(toRemove);
peerIds = STORAGE.listPeerIds();

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@ -1390,6 +1391,28 @@ public interface MasterObserver {
default void postListReplicationPeers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String regex) throws IOException {}
/**
* Called before transit current cluster state for the specified synchronous replication peer
* @param ctx the environment to interact with the framework and master
* @param peerId a short name that identifies the peer
* @param state a new state
*/
default void preTransitReplicationPeerSyncReplicationState(
final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
SyncReplicationState state) throws IOException {
}
/**
* Called after transit current cluster state for the specified synchronous replication peer
* @param ctx the environment to interact with the framework and master
* @param peerId a short name that identifies the peer
* @param state a new state
*/
default void postTransitReplicationPeerSyncReplicationState(
final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
SyncReplicationState state) throws IOException {
}
/**
* Called before new LockProcedure is queued.
* @param ctx the environment to interact with the framework and master

View File

@ -139,6 +139,7 @@ import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
@ -176,6 +177,7 @@ import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
@ -3562,6 +3564,16 @@ public class HMaster extends HRegionServer implements MasterServices {
return peers;
}
@Override
public long transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state)
throws ReplicationException, IOException {
LOG.info(
getClientIdAuditPrefix() +
" transit current cluster state to {} in a synchronous replication peer id={}",
state, peerId);
return executePeerProcedure(new TransitPeerSyncReplicationStateProcedure(peerId, state));
}
/**
* Mark region server(s) as decommissioned (previously called 'draining') to prevent additional
* regions from getting assigned to them. Also unload the regions on the servers asynchronously.0

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -1606,6 +1607,26 @@ public class MasterCoprocessorHost
});
}
public void preTransitReplicationPeerSyncReplicationState(final String peerId,
final SyncReplicationState clusterState) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preTransitReplicationPeerSyncReplicationState(this, peerId, clusterState);
}
});
}
public void postTransitReplicationPeerSyncReplicationState(final String peerId,
final SyncReplicationState clusterState) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postTransitReplicationPeerSyncReplicationState(this, peerId, clusterState);
}
});
}
public void preRequestLock(String namespace, TableName tableName, RegionInfo[] regionInfos,
LockType type, String description) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.AccessController;
@ -291,6 +292,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
@ -1947,6 +1950,20 @@ public class MasterRpcServices extends RSRpcServices
}
}
@Override
public TransitReplicationPeerSyncReplicationStateResponse
transitReplicationPeerSyncReplicationState(RpcController controller,
TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException {
try {
long procId = master.transitReplicationPeerSyncReplicationState(request.getPeerId(),
SyncReplicationState.valueOf(request.getSyncReplicationState().getNumber()));
return TransitReplicationPeerSyncReplicationStateResponse.newBuilder().setProcId(procId)
.build();
} catch (ReplicationException | IOException e) {
throw new ServiceException(e);
}
}
@Override
public ListReplicationPeersResponse listReplicationPeers(RpcController controller,
ListReplicationPeersRequest request) throws ServiceException {

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@ -475,6 +476,14 @@ public interface MasterServices extends Server {
List<ReplicationPeerDescription> listReplicationPeers(String regex) throws ReplicationException,
IOException;
/**
* Set current cluster state for a synchronous replication peer.
* @param peerId a short name that identifies the peer
* @param clusterState state of current cluster
*/
long transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState clusterState)
throws ReplicationException, IOException;
/**
* @return {@link LockManager} to lock namespaces/tables/regions.
*/

View File

@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience;
public interface PeerProcedureInterface {
enum PeerOperationType {
ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH
ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, TRANSIT_SYNC_REPLICATION_STATE
}
String getPeerId();

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -43,6 +45,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@ -61,6 +64,16 @@ public class ReplicationPeerManager {
private final ConcurrentMap<String, ReplicationPeerDescription> peers;
private final EnumMap<SyncReplicationState, EnumSet<SyncReplicationState>> allowedTransition =
new EnumMap<SyncReplicationState, EnumSet<SyncReplicationState>>(SyncReplicationState.class) {
{
put(SyncReplicationState.ACTIVE, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE));
put(SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE));
put(SyncReplicationState.DOWNGRADE_ACTIVE,
EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE));
}
};
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
ConcurrentMap<String, ReplicationPeerDescription> peers) {
this.peerStorage = peerStorage;
@ -167,6 +180,17 @@ public class ReplicationPeerManager {
return desc;
}
public void preTransitPeerSyncReplicationState(String peerId, SyncReplicationState state)
throws DoNotRetryIOException {
ReplicationPeerDescription desc = checkPeerExists(peerId);
SyncReplicationState fromState = desc.getSyncReplicationState();
EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState);
if (allowedToStates == null || !allowedToStates.contains(state)) {
throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState +
" to " + state + " for peer id=" + peerId);
}
}
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException {
if (peers.containsKey(peerId)) {
@ -174,8 +198,12 @@ public class ReplicationPeerManager {
return;
}
ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
peerStorage.addPeer(peerId, copiedPeerConfig, enabled);
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig));
SyncReplicationState syncReplicationState =
StringUtils.isBlank(peerConfig.getRemoteWALDir()) ? SyncReplicationState.NONE
: SyncReplicationState.DOWNGRADE_ACTIVE;
peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState);
peers.put(peerId,
new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState));
}
public void removePeer(String peerId) throws ReplicationException {
@ -194,7 +222,8 @@ public class ReplicationPeerManager {
return;
}
peerStorage.setPeerState(peerId, enabled);
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig()));
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig(),
desc.getSyncReplicationState()));
}
public void enablePeer(String peerId) throws ReplicationException {
@ -219,7 +248,8 @@ public class ReplicationPeerManager {
newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build();
peerStorage.updatePeerConfig(peerId, newPeerConfig);
peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig));
peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig,
desc.getSyncReplicationState()));
}
public List<ReplicationPeerDescription> listPeers(Pattern pattern) {
@ -239,7 +269,15 @@ public class ReplicationPeerManager {
queueStorage.removeLastSequenceIds(peerId);
}
void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
public void transitPeerSyncReplicationState(String peerId, SyncReplicationState state)
throws ReplicationException {
ReplicationPeerDescription desc = peers.get(peerId);
peerStorage.setPeerSyncReplicationState(peerId, state);
peers.put(peerId,
new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), state));
}
public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
// on-going when the refresh peer config procedure is done, if a RS which has already been
// scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
@ -368,7 +406,8 @@ public class ReplicationPeerManager {
for (String peerId : peerStorage.listPeerIds()) {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
}
return new ReplicationPeerManager(peerStorage,
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
/**
* The procedure for transit current cluster state for a synchronous replication peer.
*/
@InterfaceAudience.Private
public class TransitPeerSyncReplicationStateProcedure extends ModifyPeerProcedure {
private static final Logger LOG =
LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
private SyncReplicationState state;
public TransitPeerSyncReplicationStateProcedure() {
}
public TransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) {
super(peerId);
this.state = state;
}
@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE;
}
@Override
protected void prePeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preTransitReplicationPeerSyncReplicationState(peerId, state);
}
env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, state);
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, state);
}
@Override
protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
LOG.info("Successfully transit current cluster state to {} in synchronous replication peer {}",
state, peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId, state);
}
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
serializer.serialize(TransitPeerSyncReplicationStateStateData.newBuilder()
.setSyncReplicationState(ReplicationProtos.SyncReplicationState.forNumber(state.ordinal()))
.build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
TransitPeerSyncReplicationStateStateData data =
serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
state = SyncReplicationState.valueOf(data.getSyncReplicationState().getNumber());
}
@Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
switch (state) {
case PRE_PEER_MODIFICATION:
try {
prePeerModification(env);
} catch (IOException e) {
LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
"mark the procedure as failure and give up", getClass().getName(), peerId, e);
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
releaseLatch();
return Flow.NO_MORE_STATE;
} catch (ReplicationException e) {
LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(),
peerId, e);
throw new ProcedureYieldException();
}
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
return Flow.HAS_MORE_STATE;
case UPDATE_PEER_STORAGE:
try {
updatePeerStorage(env);
} catch (ReplicationException e) {
LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId,
e);
throw new ProcedureYieldException();
}
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS:
// TODO: Need add child procedure for every RegionServer
setNextState(PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE;
case POST_PEER_MODIFICATION:
try {
postPeerModification(env);
} catch (ReplicationException e) {
LOG.warn("{} failed to call postPeerModification for peer {}, retry",
getClass().getName(), peerId, e);
throw new ProcedureYieldException();
} catch (IOException e) {
LOG.warn("{} failed to call post CP hook for peer {}, " +
"ignore since the procedure has already done", getClass().getName(), peerId, e);
}
releaseLatch();
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
private void releaseLatch() {
ProcedurePrepareLatch.releaseLatch(latch, this);
}
}

View File

@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
@ -2502,6 +2503,13 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
requirePermission(ctx, "updateReplicationPeerConfig", Action.ADMIN);
}
@Override
public void preTransitReplicationPeerSyncReplicationState(
final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
SyncReplicationState clusterState) throws IOException {
requirePermission(ctx, "transitSyncReplicationPeerState", Action.ADMIN);
}
@Override
public void preListReplicationPeers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String regex) throws IOException {

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest;
import org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@ -983,4 +984,65 @@ public class TestReplicationAdmin {
// OK
}
}
@Test
public void testTransitSyncReplicationPeerState() throws Exception {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_ONE);
hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
assertEquals(SyncReplicationState.NONE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
try {
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE,
SyncReplicationState.DOWNGRADE_ACTIVE);
fail("Can't transit cluster state if replication peer don't config remote wal dir");
} catch (Exception e) {
// OK
}
String rootDir = "hdfs://srv1:9999/hbase";
builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_SECOND);
builder.setRemoteWALDir(rootDir);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
// Disable and enable peer don't affect SyncReplicationState
hbaseAdmin.disableReplicationPeer(ID_SECOND);
assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
hbaseAdmin.enableReplicationPeer(ID_SECOND);
assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
assertEquals(SyncReplicationState.ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
try {
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
SyncReplicationState.STANDBY);
fail("Can't transit cluster state from ACTIVE to STANDBY");
} catch (Exception e) {
// OK
}
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
SyncReplicationState.DOWNGRADE_ACTIVE);
assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.STANDBY);
assertEquals(SyncReplicationState.STANDBY,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
try {
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
fail("Can't transit cluster state from STANDBY to ACTIVE");
} catch (Exception e) {
// OK
}
}
}

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@ -467,4 +468,9 @@ public class MockNoopMasterServices implements MasterServices {
public boolean isClusterUp() {
return true;
}
public long transitReplicationPeerSyncReplicationState(String peerId,
SyncReplicationState clusterState) throws ReplicationException, IOException {
return 0;
}
}

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -109,7 +110,8 @@ public class TestReplicationHFileCleaner {
public void setup() throws ReplicationException, IOException {
root = TEST_UTIL.getDataTestDirOnTestFS();
rp.getPeerStorage().addPeer(peerId,
ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true);
ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true,
SyncReplicationState.NONE);
rq.addPeerToHFileRefs(peerId);
}

View File

@ -155,11 +155,13 @@ public class TestReplicationTrackerZKImpl {
public void testPeerNameControl() throws Exception {
int exists = 0;
rp.getPeerStorage().addPeer("6",
ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true,
SyncReplicationState.NONE);
try {
rp.getPeerStorage().addPeer("6",
ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true,
SyncReplicationState.NONE);
} catch (ReplicationException e) {
if (e.getCause() instanceof KeeperException.NodeExistsException) {
exists++;

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -585,7 +586,7 @@ public abstract class TestReplicationSourceManager {
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
final boolean waitForSource) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
rp.getPeerStorage().addPeer(peerId, peerConfig, true);
rp.getPeerStorage().addPeer(peerId, peerConfig, true, SyncReplicationState.NONE);
try {
manager.addPeer(peerId);
} catch (Exception e) {

View File

@ -117,6 +117,7 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.Permission.Action;
@ -2940,6 +2941,21 @@ public class TestAccessController extends SecureTestUtil {
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
}
@Test
public void testTransitSyncReplicationPeerState() throws Exception {
AccessTestAction action = new AccessTestAction() {
@Override
public Object run() throws Exception {
ACCESS_CONTROLLER.preTransitReplicationPeerSyncReplicationState(
ObserverContextImpl.createAndPrepare(CP_ENV), "test", SyncReplicationState.NONE);
return null;
}
};
verifyAllowed(action, SUPERUSER, USER_ADMIN);
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
}
@Test
public void testListReplicationPeers() throws Exception {
AccessTestAction action = new AccessTestAction() {

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
@ -67,9 +68,9 @@ public class TestHBaseFsckReplication {
String peerId1 = "1";
String peerId2 = "2";
peerStorage.addPeer(peerId1, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(),
true);
true, SyncReplicationState.NONE);
peerStorage.addPeer(peerId2, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(),
true);
true, SyncReplicationState.NONE);
for (int i = 0; i < 10; i++) {
queueStorage.addWAL(ServerName.valueOf("localhost", 10000 + i, 100000 + i), peerId1,
"file-" + i);

View File

@ -20,6 +20,7 @@
include Java
java_import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil
java_import org.apache.hadoop.hbase.replication.SyncReplicationState
java_import org.apache.hadoop.hbase.replication.ReplicationPeerConfig
java_import org.apache.hadoop.hbase.util.Bytes
java_import org.apache.hadoop.hbase.zookeeper.ZKConfig
@ -338,6 +339,20 @@ module Hbase
'!' + ReplicationPeerConfigUtil.convertToString(tableCFs)
end
# Transit current cluster to a new state in the specified synchronous
# replication peer
def transit_peer_sync_replication_state(id, state)
if 'ACTIVE'.eql?(state)
@admin.transitReplicationPeerSyncReplicationState(id, SyncReplicationState::ACTIVE)
elsif 'DOWNGRADE_ACTIVE'.eql?(state)
@admin.transitReplicationPeerSyncReplicationState(id, SyncReplicationState::DOWNGRADE_ACTIVE)
elsif 'STANDBY'.eql?(state)
@admin.transitReplicationPeerSyncReplicationState(id, SyncReplicationState::STANDBY)
else
raise(ArgumentError, 'synchronous replication state must be ACTIVE, DOWNGRADE_ACTIVE or STANDBY')
end
end
#----------------------------------------------------------------------------------------------
# Enables a table's replication switch
def enable_tablerep(table_name)

View File

@ -393,6 +393,7 @@ Shell.load_command_group(
get_peer_config
list_peer_configs
update_peer_config
transit_peer_sync_replication_state
]
)

View File

@ -39,8 +39,8 @@ EOF
peers = replication_admin.list_peers
formatter.header(%w[PEER_ID CLUSTER_KEY ENDPOINT_CLASSNAME
REMOTE_ROOT_DIR STATE REPLICATE_ALL
NAMESPACES TABLE_CFS BANDWIDTH
REMOTE_ROOT_DIR SYNC_REPLICATION_STATE STATE
REPLICATE_ALL NAMESPACES TABLE_CFS BANDWIDTH
SERIAL])
peers.each do |peer|
@ -67,7 +67,7 @@ EOF
remote_root_dir = config.getRemoteWALDir
end
formatter.row([id, cluster_key, endpoint_classname,
remote_root_dir, state,
remote_root_dir, peer.getSyncReplicationState, state,
config.replicateAllUserTables, namespaces, tableCFs,
config.getBandwidth, config.isSerial])
end

View File

@ -0,0 +1,44 @@
#
# Copyright The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class TransitPeerSyncReplicationState < Command
def help
<<-EOF
Transit current cluster to new state in the specified synchronous replication peer.
Examples:
# Transit cluster state to DOWNGRADE_ACTIVE in a synchronous replication peer
hbase> transit_peer_sync_replication_state '1', 'DOWNGRADE_ACTIVE'
# Transit cluster state to ACTIVE in a synchronous replication peer
hbase> transit_peer_sync_replication_state '1', 'ACTIVE'
# Transit cluster state to STANDBY in a synchronous replication peer
hbase> transit_peer_sync_replication_state '1', 'STANDBY'
EOF
end
def command(id, state)
replication_admin.transit_peer_sync_replication_state(id, state)
end
end
end
end

View File

@ -23,6 +23,9 @@ require 'hbase/hbase'
require 'hbase/table'
include HBaseConstants
include Java
java_import org.apache.hadoop.hbase.replication.SyncReplicationState
module Hbase
class ReplicationAdminTest < Test::Unit::TestCase
@ -513,6 +516,27 @@ module Hbase
command(:remove_peer, @peer_id)
end
define_test "transit_peer_sync_replication_state: test" do
cluster_key = "server1.cie.com:2181:/hbase"
remote_wal_dir = "hdfs://srv1:9999/hbase"
args = { CLUSTER_KEY => cluster_key, REMOTE_WAL_DIR => remote_wal_dir }
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
peer = command(:list_peers).get(0)
assert_equal(@peer_id, peer.getPeerId)
assert_equal(SyncReplicationState::DOWNGRADE_ACTIVE, peer.getSyncReplicationState)
command(:transit_peer_sync_replication_state, @peer_id, 'ACTIVE')
assert_equal(1, command(:list_peers).length)
peer = command(:list_peers).get(0)
assert_equal(@peer_id, peer.getPeerId)
assert_equal(SyncReplicationState::ACTIVE, peer.getSyncReplicationState)
# cleanup for future tests
command(:remove_peer, @peer_id)
end
define_test "get_peer_config: works with simple clusterKey peer" do
cluster_key = "localhost:2181:/hbase-test"
args = { CLUSTER_KEY => cluster_key }