diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 331f2d119e3..39542e40946 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; @@ -2656,6 +2657,44 @@ public interface Admin extends Abortable, Closeable { */ List listReplicationPeers(Pattern pattern) throws IOException; + /** + * Transit current cluster to a new state in a synchronous replication peer. + * @param peerId a short name that identifies the peer + * @param state a new state of current cluster + * @throws IOException if a remote or network exception occurs + */ + void transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state) + throws IOException; + + /** + * Transit current cluster to a new state in a synchronous replication peer. But does not block + * and wait for it. + *

+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw + * ExecutionException if there was an error while executing the operation or TimeoutException in + * case the wait timeout was not long enough to allow the operation to complete. + * @param peerId a short name that identifies the peer + * @param state a new state of current cluster + * @throws IOException if a remote or network exception occurs + */ + Future transitReplicationPeerSyncReplicationStateAsync(String peerId, + SyncReplicationState state) throws IOException; + + /** + * Get the current cluster state in a synchronous replication peer. + * @param peerId a short name that identifies the peer + * @return the current cluster state + * @throws IOException if a remote or network exception occurs + */ + default SyncReplicationState getReplicationPeerSyncReplicationState(String peerId) + throws IOException { + List peers = listReplicationPeers(Pattern.compile(peerId)); + if (peers.isEmpty() || !peers.get(0).getPeerId().equals(peerId)) { + throw new IOException("Replication peer " + peerId + " does not exist"); + } + return peers.get(0).getSyncReplicationState(); + } + /** * Mark region server(s) as decommissioned to prevent additional regions from getting * assigned to them. Optionally unload the regions on the servers. If there are multiple servers diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 8141e74de75..65cccf7f535 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import com.google.protobuf.RpcChannel; +import java.io.IOException; import java.util.Collection; import java.util.EnumSet; import java.util.List; @@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.quotas.QuotaFilter; import org.apache.hadoop.hbase.quotas.QuotaSettings; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.yetus.audience.InterfaceAudience; /** @@ -599,6 +601,35 @@ public interface AsyncAdmin { CompletableFuture updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig); + /** + * Transit current cluster to a new state in a synchronous replication peer. + * @param peerId a short name that identifies the peer + * @param state a new state of current cluster + */ + CompletableFuture transitReplicationPeerSyncReplicationState(String peerId, + SyncReplicationState state); + + /** + * Get the current cluster state in a synchronous replication peer. + * @param peerId a short name that identifies the peer + * @return the current cluster state wrapped by a {@link CompletableFuture}. + */ + default CompletableFuture + getReplicationPeerSyncReplicationState(String peerId) { + CompletableFuture future = new CompletableFuture<>(); + listReplicationPeers(Pattern.compile(peerId)).whenComplete((peers, error) -> { + if (error != null) { + future.completeExceptionally(error); + } else if (peers.isEmpty() || !peers.get(0).getPeerId().equals(peerId)) { + future.completeExceptionally( + new IOException("Replication peer " + peerId + " does not exist")); + } else { + future.complete(peers.get(0).getSyncReplicationState()); + } + }); + return future; + } + /** * Append the replicable table-cf config of the specified peer * @param peerId a short that identifies the cluster diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 5b22668c6b4..08952cb4307 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.quotas.QuotaFilter; import org.apache.hadoop.hbase.quotas.QuotaSettings; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.yetus.audience.InterfaceAudience; /** @@ -413,6 +414,12 @@ class AsyncHBaseAdmin implements AsyncAdmin { return wrap(rawAdmin.updateReplicationPeerConfig(peerId, peerConfig)); } + @Override + public CompletableFuture transitReplicationPeerSyncReplicationState(String peerId, + SyncReplicationState clusterState) { + return wrap(rawAdmin.transitReplicationPeerSyncReplicationState(peerId, clusterState)); + } + @Override public CompletableFuture appendReplicationPeerTableCFs(String peerId, Map> tableCfs) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 1176cbdd5dc..f78005f836f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -124,6 +124,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; @@ -1717,6 +1719,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable { MasterProtos.ClearDeadServersRequest request) throws ServiceException { return stub.clearDeadServers(controller, request); } + + @Override + public TransitReplicationPeerSyncReplicationStateResponse + transitReplicationPeerSyncReplicationState(RpcController controller, + TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException { + return stub.transitReplicationPeerSyncReplicationState(controller, request); + } }; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 07e1be4a71b..ac8d972771d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; @@ -206,6 +207,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Disab import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; @@ -4013,6 +4015,30 @@ public class HBaseAdmin implements Admin { () -> "UPDATE_REPLICATION_PEER_CONFIG"); } + @Override + public void transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state) + throws IOException { + get(transitReplicationPeerSyncReplicationStateAsync(peerId, state), this.syncWaitTimeout, + TimeUnit.MILLISECONDS); + } + + @Override + public Future transitReplicationPeerSyncReplicationStateAsync(String peerId, + SyncReplicationState state) throws IOException { + TransitReplicationPeerSyncReplicationStateResponse response = + executeCallable(new MasterCallable( + getConnection(), getRpcControllerFactory()) { + @Override + protected TransitReplicationPeerSyncReplicationStateResponse rpcCall() throws Exception { + return master.transitReplicationPeerSyncReplicationState(getRpcController(), + RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId, + state)); + } + }); + return new ReplicationFuture(this, peerId, response.getProcId(), + () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"); + } + @Override public void appendReplicationPeerTableCFs(String id, Map> tableCfs) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 0fd0e59ab90..963cca794ee 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.quotas.QuotaTableUtil; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; @@ -255,6 +256,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; @@ -1612,6 +1615,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); } + @Override + public CompletableFuture transitReplicationPeerSyncReplicationState(String peerId, + SyncReplicationState clusterState) { + return this + . procedureCall( + RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId, + clusterState), + (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done), + (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId, + () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE")); + } + @Override public CompletableFuture appendReplicationPeerTableCFs(String id, Map> tableCfs) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java index 50690b4285d..7bb65d20242 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java @@ -166,6 +166,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; @@ -638,4 +640,11 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection { throws ServiceException { return stub.splitRegion(controller, request); } + + @Override + public TransitReplicationPeerSyncReplicationStateResponse + transitReplicationPeerSyncReplicationState(RpcController controller, + TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException { + return stub.transitReplicationPeerSyncReplicationState(controller, request); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 474ded3a372..6cbe05b0f9a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -396,25 +397,28 @@ public final class ReplicationPeerConfigUtil { return ProtobufUtil.prependPBMagic(bytes); } - public static ReplicationPeerDescription toReplicationPeerDescription( - ReplicationProtos.ReplicationPeerDescription desc) { - boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == desc.getState() - .getState(); + public static ReplicationPeerDescription + toReplicationPeerDescription(ReplicationProtos.ReplicationPeerDescription desc) { + boolean enabled = + ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState(); ReplicationPeerConfig config = convert(desc.getConfig()); - return new ReplicationPeerDescription(desc.getId(), enabled, config); + return new ReplicationPeerDescription(desc.getId(), enabled, config, + SyncReplicationState.valueOf(desc.getSyncReplicationState().getNumber())); } - public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription( - ReplicationPeerDescription desc) { + public static ReplicationProtos.ReplicationPeerDescription + toProtoReplicationPeerDescription(ReplicationPeerDescription desc) { ReplicationProtos.ReplicationPeerDescription.Builder builder = ReplicationProtos.ReplicationPeerDescription.newBuilder(); builder.setId(desc.getPeerId()); - ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState - .newBuilder(); - stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED - : ReplicationProtos.ReplicationState.State.DISABLED); + ReplicationProtos.ReplicationState.Builder stateBuilder = + ReplicationProtos.ReplicationState.newBuilder(); + stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED : + ReplicationProtos.ReplicationState.State.DISABLED); builder.setState(stateBuilder.build()); builder.setConfig(convert(desc.getPeerConfig())); + builder.setSyncReplicationState( + ReplicationProtos.SyncReplicationState.forNumber(desc.getSyncReplicationState().ordinal())); return builder.build(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java index ba97d07e785..2d077c51298 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java @@ -28,11 +28,14 @@ public class ReplicationPeerDescription { private final String id; private final boolean enabled; private final ReplicationPeerConfig config; + private final SyncReplicationState syncReplicationState; - public ReplicationPeerDescription(String id, boolean enabled, ReplicationPeerConfig config) { + public ReplicationPeerDescription(String id, boolean enabled, ReplicationPeerConfig config, + SyncReplicationState syncReplicationState) { this.id = id; this.enabled = enabled; this.config = config; + this.syncReplicationState = syncReplicationState; } public String getPeerId() { @@ -47,11 +50,16 @@ public class ReplicationPeerDescription { return this.config; } + public SyncReplicationState getSyncReplicationState() { + return this.syncReplicationState; + } + @Override public String toString() { StringBuilder builder = new StringBuilder("id : ").append(id); builder.append(", enabled : " + enabled); builder.append(", config : " + config); + builder.append(", syncReplicationState : " + syncReplicationState); return builder.toString(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java new file mode 100644 index 00000000000..bd144e908c9 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Used by synchronous replication. Indicate the state of the current cluster in a synchronous + * replication peer. The state may be one of {@link SyncReplicationState#ACTIVE}, + * {@link SyncReplicationState#DOWNGRADE_ACTIVE} or + * {@link SyncReplicationState#STANDBY}. + *

+ * For asynchronous replication, the state is {@link SyncReplicationState#NONE}. + */ +@InterfaceAudience.Public +public enum SyncReplicationState { + NONE, ACTIVE, DOWNGRADE_ACTIVE, STANDBY; + + public static SyncReplicationState valueOf(int value) { + switch (value) { + case 0: + return NONE; + case 1: + return ACTIVE; + case 2: + return DOWNGRADE_ACTIVE; + case 3: + return STANDBY; + default: + throw new IllegalArgumentException("Unknown synchronous replication state " + value); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index fc037a83e57..1269cc7b3b7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -147,6 +148,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; /** @@ -1867,4 +1869,12 @@ public final class RequestConverter { } return pbServers; } + + public static TransitReplicationPeerSyncReplicationStateRequest + buildTransitReplicationPeerSyncReplicationStateRequest(String peerId, + SyncReplicationState state) { + return TransitReplicationPeerSyncReplicationStateRequest.newBuilder().setPeerId(peerId) + .setSyncReplicationState(ReplicationProtos.SyncReplicationState.forNumber(state.ordinal())) + .build(); + } } diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index 3a236c03894..c2ab18017f4 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -962,6 +962,10 @@ service MasterService { rpc ListReplicationPeers(ListReplicationPeersRequest) returns(ListReplicationPeersResponse); + /** Transit the state of current cluster in a synchronous replication peer */ + rpc TransitReplicationPeerSyncReplicationState(TransitReplicationPeerSyncReplicationStateRequest) + returns(TransitReplicationPeerSyncReplicationStateResponse); + /** Returns a list of ServerNames marked as decommissioned. */ rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest) returns(ListDecommissionedRegionServersResponse); diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 39d2824394e..cb2eabb4351 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -457,3 +457,7 @@ enum InitMetaState { message InitMetaStateData { } + +message TransitPeerSyncReplicationStateStateData { + required SyncReplicationState syncReplicationState = 1; +} diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 20dd0495bbc..3564ae4f751 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -63,6 +63,16 @@ message ReplicationState { required State state = 1; } +/** + * Indicate the state of the current cluster in a synchronous replication peer. + */ +enum SyncReplicationState { + NONE = 0; + ACTIVE = 1; + DOWNGRADE_ACTIVE = 2; + STANDBY = 3; +} + /** * Used by replication. Description of the replication peer. */ @@ -70,6 +80,7 @@ message ReplicationPeerDescription { required string id = 1; required ReplicationState state = 2; required ReplicationPeer config = 3; + optional SyncReplicationState syncReplicationState = 4; } /** @@ -138,3 +149,12 @@ message ListReplicationPeersRequest { message ListReplicationPeersResponse { repeated ReplicationPeerDescription peer_desc = 1; } + +message TransitReplicationPeerSyncReplicationStateRequest { + required string peer_id = 1; + required SyncReplicationState syncReplicationState = 2; +} + +message TransitReplicationPeerSyncReplicationStateResponse { + required uint64 proc_id = 1; +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java index 1adda02e631..d2538abb8ea 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java @@ -31,8 +31,8 @@ public interface ReplicationPeerStorage { * Add a replication peer. * @throws ReplicationException if there are errors accessing the storage service. */ - void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) - throws ReplicationException; + void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled, + SyncReplicationState syncReplicationState) throws ReplicationException; /** * Remove a replication peer. @@ -70,4 +70,18 @@ public interface ReplicationPeerStorage { * @throws ReplicationException if there are errors accessing the storage service. */ ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException; + + /** + * Set the state of current cluster in a synchronous replication peer. + * @throws ReplicationException if there are errors accessing the storage service. + */ + void setPeerSyncReplicationState(String peerId, SyncReplicationState state) + throws ReplicationException; + + /** + * Get the state of current cluster in a synchronous replication peer. + * @throws ReplicationException if there are errors accessing the storage service. + */ + SyncReplicationState getPeerSyncReplicationState(String peerId) + throws ReplicationException; } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index c7568bbb854..e4dea8309c8 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration; diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java index bbe65498738..ad3c4352eca 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -23,12 +23,14 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; @@ -51,6 +53,8 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase public static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); + public static final String SYNCHRONOUS_REPLICATION_STATE_ZNODE = "sync-rep-state"; + /** * The name of the znode that contains the replication status of a remote slave (i.e. peer) * cluster. @@ -79,21 +83,29 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase return ZNodePaths.joinZNode(peersZNode, peerId); } + @VisibleForTesting + public String getSyncReplicationStateNode(String peerId) { + return ZNodePaths.joinZNode(getPeerNode(peerId), SYNCHRONOUS_REPLICATION_STATE_ZNODE); + } + @Override - public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) - throws ReplicationException { + public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled, + SyncReplicationState syncReplicationState) throws ReplicationException { + List multiOps = Arrays.asList( + ZKUtilOp.createAndFailSilent(getPeerNode(peerId), + ReplicationPeerConfigUtil.toByteArray(peerConfig)), + ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId), + enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES), + ZKUtilOp.createAndFailSilent(getSyncReplicationStateNode(peerId), + Bytes.toBytes(syncReplicationState.ordinal()))); try { ZKUtil.createWithParents(zookeeper, peersZNode); - ZKUtil.multiOrSequential(zookeeper, - Arrays.asList( - ZKUtilOp.createAndFailSilent(getPeerNode(peerId), - ReplicationPeerConfigUtil.toByteArray(peerConfig)), - ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId), - enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)), - false); + ZKUtil.multiOrSequential(zookeeper, multiOps, false); } catch (KeeperException e) { - throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" - + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e); + throw new ReplicationException( + "Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state=" + + (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState, + e); } } @@ -167,4 +179,31 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase "Failed to parse replication peer config for peer with id=" + peerId, e); } } + + @Override + public void setPeerSyncReplicationState(String peerId, SyncReplicationState clusterState) + throws ReplicationException { + byte[] clusterStateBytes = Bytes.toBytes(clusterState.ordinal()); + try { + ZKUtil.setData(zookeeper, getSyncReplicationStateNode(peerId), clusterStateBytes); + } catch (KeeperException e) { + throw new ReplicationException( + "Unable to change the cluster state for the synchronous replication peer with id=" + + peerId, + e); + } + } + + @Override + public SyncReplicationState getPeerSyncReplicationState(String peerId) + throws ReplicationException { + byte[] data; + try { + data = ZKUtil.getData(zookeeper, getSyncReplicationStateNode(peerId)); + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException( + "Error getting cluster state for the synchronous replication peer with id=" + peerId, e); + } + return SyncReplicationState.valueOf(Bytes.toInt(data)); + } } diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 437804c05a6..4a2c3cd5170 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -166,7 +166,8 @@ public abstract class TestReplicationStateBasic { assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); rp.getPeerStorage().addPeer(ID_ONE, - ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, + SyncReplicationState.NONE); rqs.addPeerToHFileRefs(ID_ONE); rqs.addHFileRefs(ID_ONE, files1); assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); @@ -189,10 +190,12 @@ public abstract class TestReplicationStateBasic { public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { rp.init(); rp.getPeerStorage().addPeer(ID_ONE, - ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, + SyncReplicationState.NONE); rqs.addPeerToHFileRefs(ID_ONE); rp.getPeerStorage().addPeer(ID_TWO, - ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true); + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true, + SyncReplicationState.NONE); rqs.addPeerToHFileRefs(ID_TWO); List> files1 = new ArrayList<>(3); @@ -241,9 +244,13 @@ public abstract class TestReplicationStateBasic { assertNumberOfPeers(0); // Add some peers - rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); + rp.getPeerStorage().addPeer(ID_ONE, + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, + SyncReplicationState.NONE); assertNumberOfPeers(1); - rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true); + rp.getPeerStorage().addPeer(ID_TWO, + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true, + SyncReplicationState.NONE); assertNumberOfPeers(2); assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils @@ -253,7 +260,9 @@ public abstract class TestReplicationStateBasic { assertNumberOfPeers(1); // Add one peer - rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); + rp.getPeerStorage().addPeer(ID_ONE, + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, + SyncReplicationState.NONE); rp.addPeer(ID_ONE); assertNumberOfPeers(2); assertTrue(rp.getPeer(ID_ONE).isPeerEnabled()); @@ -365,7 +374,7 @@ public abstract class TestReplicationStateBasic { // Add peers for the corresponding queues so they are not orphans rp.getPeerStorage().addPeer("qId" + i, ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(), - true); + true, SyncReplicationState.NONE); } } } diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java index 3290fb0097f..12586956c79 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java @@ -87,8 +87,9 @@ public class TestZKReplicationPeerStorage { Random rand = new Random(seed); return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong())) .setReplicationEndpointImpl(Long.toHexString(rand.nextLong())) - .setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand)) - .setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean()) + .setRemoteWALDir(Long.toHexString(rand.nextLong())).setNamespaces(randNamespaces(rand)) + .setExcludeNamespaces(randNamespaces(rand)).setTableCFsMap(randTableCFs(rand)) + .setExcludeTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean()) .setBandwidth(rand.nextInt(1000)).build(); } @@ -139,7 +140,8 @@ public class TestZKReplicationPeerStorage { public void test() throws ReplicationException { int peerCount = 10; for (int i = 0; i < peerCount; i++) { - STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0); + STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0, + SyncReplicationState.valueOf(i % 4)); } List peerIds = STORAGE.listPeerIds(); assertEquals(peerCount, peerIds.size()); @@ -163,6 +165,10 @@ public class TestZKReplicationPeerStorage { for (int i = 0; i < peerCount; i++) { assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); } + for (int i = 0; i < peerCount; i++) { + assertEquals(SyncReplicationState.valueOf(i % 4), + STORAGE.getPeerSyncReplicationState(Integer.toString(i))); + } String toRemove = Integer.toString(peerCount / 2); STORAGE.removePeer(toRemove); peerIds = STORAGE.listPeerIds(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java index 3175af308cb..96f1e17ba0b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -1390,6 +1391,28 @@ public interface MasterObserver { default void postListReplicationPeers(final ObserverContext ctx, String regex) throws IOException {} + /** + * Called before transit current cluster state for the specified synchronous replication peer + * @param ctx the environment to interact with the framework and master + * @param peerId a short name that identifies the peer + * @param state a new state + */ + default void preTransitReplicationPeerSyncReplicationState( + final ObserverContext ctx, String peerId, + SyncReplicationState state) throws IOException { + } + + /** + * Called after transit current cluster state for the specified synchronous replication peer + * @param ctx the environment to interact with the framework and master + * @param peerId a short name that identifies the peer + * @param state a new state + */ + default void postTransitReplicationPeerSyncReplicationState( + final ObserverContext ctx, String peerId, + SyncReplicationState state) throws IOException { + } + /** * Called before new LockProcedure is queued. * @param ctx the environment to interact with the framework and master diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 4dc7bf83259..a118ebd24b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -139,6 +139,7 @@ import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure; import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure; import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure; import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer; @@ -176,6 +177,7 @@ import org.apache.hadoop.hbase.replication.ReplicationLoadSource; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader; @@ -3562,6 +3564,16 @@ public class HMaster extends HRegionServer implements MasterServices { return peers; } + @Override + public long transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state) + throws ReplicationException, IOException { + LOG.info( + getClientIdAuditPrefix() + + " transit current cluster state to {} in a synchronous replication peer id={}", + state, peerId); + return executePeerProcedure(new TransitPeerSyncReplicationStateProcedure(peerId, state)); + } + /** * Mark region server(s) as decommissioned (previously called 'draining') to prevent additional * regions from getting assigned to them. Also unload the regions on the servers asynchronously.0 diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index e563cd4ecc5..87ac1b24c60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -1606,6 +1607,26 @@ public class MasterCoprocessorHost }); } + public void preTransitReplicationPeerSyncReplicationState(final String peerId, + final SyncReplicationState clusterState) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { + @Override + public void call(MasterObserver observer) throws IOException { + observer.preTransitReplicationPeerSyncReplicationState(this, peerId, clusterState); + } + }); + } + + public void postTransitReplicationPeerSyncReplicationState(final String peerId, + final SyncReplicationState clusterState) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { + @Override + public void call(MasterObserver observer) throws IOException { + observer.postTransitReplicationPeerSyncReplicationState(this, peerId, clusterState); + } + }); + } + public void preRequestLock(String namespace, TableName tableName, RegionInfo[] regionInfos, LockType type, String description) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 8efbfe5502b..ef2d1b8db46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.AccessController; @@ -291,6 +292,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; @@ -1947,6 +1950,20 @@ public class MasterRpcServices extends RSRpcServices } } + @Override + public TransitReplicationPeerSyncReplicationStateResponse + transitReplicationPeerSyncReplicationState(RpcController controller, + TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException { + try { + long procId = master.transitReplicationPeerSyncReplicationState(request.getPeerId(), + SyncReplicationState.valueOf(request.getSyncReplicationState().getNumber())); + return TransitReplicationPeerSyncReplicationStateResponse.newBuilder().setProcId(procId) + .build(); + } catch (ReplicationException | IOException e) { + throw new ServiceException(e); + } + } + @Override public ListReplicationPeersResponse listReplicationPeers(RpcController controller, ListReplicationPeersRequest request) throws ServiceException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index ac521d5aba3..c3e798776fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -475,6 +476,14 @@ public interface MasterServices extends Server { List listReplicationPeers(String regex) throws ReplicationException, IOException; + /** + * Set current cluster state for a synchronous replication peer. + * @param peerId a short name that identifies the peer + * @param clusterState state of current cluster + */ + long transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState clusterState) + throws ReplicationException, IOException; + /** * @return {@link LockManager} to lock namespaces/tables/regions. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java index 399bcd7cd2b..ff87566aec4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java @@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience; public interface PeerProcedureInterface { enum PeerOperationType { - ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH + ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, TRANSIT_SYNC_REPLICATION_STATE } String getPeerId(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 05ecd61f5ce..f07a0d816c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.EnumMap; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -43,6 +45,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -61,6 +64,16 @@ public class ReplicationPeerManager { private final ConcurrentMap peers; + private final EnumMap> allowedTransition = + new EnumMap>(SyncReplicationState.class) { + { + put(SyncReplicationState.ACTIVE, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE)); + put(SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE)); + put(SyncReplicationState.DOWNGRADE_ACTIVE, + EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)); + } + }; + ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage, ConcurrentMap peers) { this.peerStorage = peerStorage; @@ -167,6 +180,17 @@ public class ReplicationPeerManager { return desc; } + public void preTransitPeerSyncReplicationState(String peerId, SyncReplicationState state) + throws DoNotRetryIOException { + ReplicationPeerDescription desc = checkPeerExists(peerId); + SyncReplicationState fromState = desc.getSyncReplicationState(); + EnumSet allowedToStates = allowedTransition.get(fromState); + if (allowedToStates == null || !allowedToStates.contains(state)) { + throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState + + " to " + state + " for peer id=" + peerId); + } + } + public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException { if (peers.containsKey(peerId)) { @@ -174,8 +198,12 @@ public class ReplicationPeerManager { return; } ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); - peerStorage.addPeer(peerId, copiedPeerConfig, enabled); - peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig)); + SyncReplicationState syncReplicationState = + StringUtils.isBlank(peerConfig.getRemoteWALDir()) ? SyncReplicationState.NONE + : SyncReplicationState.DOWNGRADE_ACTIVE; + peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState); + peers.put(peerId, + new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState)); } public void removePeer(String peerId) throws ReplicationException { @@ -194,7 +222,8 @@ public class ReplicationPeerManager { return; } peerStorage.setPeerState(peerId, enabled); - peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig())); + peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig(), + desc.getSyncReplicationState())); } public void enablePeer(String peerId) throws ReplicationException { @@ -219,7 +248,8 @@ public class ReplicationPeerManager { newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build(); peerStorage.updatePeerConfig(peerId, newPeerConfig); - peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig)); + peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig, + desc.getSyncReplicationState())); } public List listPeers(Pattern pattern) { @@ -239,7 +269,15 @@ public class ReplicationPeerManager { queueStorage.removeLastSequenceIds(peerId); } - void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { + public void transitPeerSyncReplicationState(String peerId, SyncReplicationState state) + throws ReplicationException { + ReplicationPeerDescription desc = peers.get(peerId); + peerStorage.setPeerSyncReplicationState(peerId, state); + peers.put(peerId, + new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), state)); + } + + public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still // on-going when the refresh peer config procedure is done, if a RS which has already been // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in @@ -368,7 +406,8 @@ public class ReplicationPeerManager { for (String peerId : peerStorage.listPeerIds()) { ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); boolean enabled = peerStorage.isPeerEnabled(peerId); - peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig)); + SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId); + peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state)); } return new ReplicationPeerManager(peerStorage, ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java new file mode 100644 index 00000000000..d26eeccfbb6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; + +/** + * The procedure for transit current cluster state for a synchronous replication peer. + */ +@InterfaceAudience.Private +public class TransitPeerSyncReplicationStateProcedure extends ModifyPeerProcedure { + + private static final Logger LOG = + LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class); + + private SyncReplicationState state; + + public TransitPeerSyncReplicationStateProcedure() { + } + + public TransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) { + super(peerId); + this.state = state; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE; + } + + @Override + protected void prePeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preTransitReplicationPeerSyncReplicationState(peerId, state); + } + env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, state); + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, state); + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + LOG.info("Successfully transit current cluster state to {} in synchronous replication peer {}", + state, peerId); + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId, state); + } + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(TransitPeerSyncReplicationStateStateData.newBuilder() + .setSyncReplicationState(ReplicationProtos.SyncReplicationState.forNumber(state.ordinal())) + .build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + TransitPeerSyncReplicationStateStateData data = + serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); + state = SyncReplicationState.valueOf(data.getSyncReplicationState().getNumber()); + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + switch (state) { + case PRE_PEER_MODIFICATION: + try { + prePeerModification(env); + } catch (IOException e) { + LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " + + "mark the procedure as failure and give up", getClass().getName(), peerId, e); + setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e); + releaseLatch(); + return Flow.NO_MORE_STATE; + } catch (ReplicationException e) { + LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(), + peerId, e); + throw new ProcedureYieldException(); + } + setNextState(PeerModificationState.UPDATE_PEER_STORAGE); + return Flow.HAS_MORE_STATE; + case UPDATE_PEER_STORAGE: + try { + updatePeerStorage(env); + } catch (ReplicationException e) { + LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId, + e); + throw new ProcedureYieldException(); + } + setNextState(PeerModificationState.REFRESH_PEER_ON_RS); + return Flow.HAS_MORE_STATE; + case REFRESH_PEER_ON_RS: + // TODO: Need add child procedure for every RegionServer + setNextState(PeerModificationState.POST_PEER_MODIFICATION); + return Flow.HAS_MORE_STATE; + case POST_PEER_MODIFICATION: + try { + postPeerModification(env); + } catch (ReplicationException e) { + LOG.warn("{} failed to call postPeerModification for peer {}, retry", + getClass().getName(), peerId, e); + throw new ProcedureYieldException(); + } catch (IOException e) { + LOG.warn("{} failed to call post CP hook for peer {}, " + + "ignore since the procedure has already done", getClass().getName(), peerId, e); + } + releaseLatch(); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + private void releaseLatch() { + ProcedurePrepareLatch.releaseLatch(latch, this); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 2758c7e3792..52b7a92cb05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; @@ -2502,6 +2503,13 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor, requirePermission(ctx, "updateReplicationPeerConfig", Action.ADMIN); } + @Override + public void preTransitReplicationPeerSyncReplicationState( + final ObserverContext ctx, String peerId, + SyncReplicationState clusterState) throws IOException { + requirePermission(ctx, "transitSyncReplicationPeerState", Action.ADMIN); + } + @Override public void preListReplicationPeers(final ObserverContext ctx, String regex) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index e47110032f6..a7710e7d466 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest; import org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -983,4 +984,65 @@ public class TestReplicationAdmin { // OK } } + + @Test + public void testTransitSyncReplicationPeerState() throws Exception { + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); + assertEquals(SyncReplicationState.NONE, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE)); + + try { + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE, + SyncReplicationState.DOWNGRADE_ACTIVE); + fail("Can't transit cluster state if replication peer don't config remote wal dir"); + } catch (Exception e) { + // OK + } + + String rootDir = "hdfs://srv1:9999/hbase"; + builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_SECOND); + builder.setRemoteWALDir(rootDir); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + + // Disable and enable peer don't affect SyncReplicationState + hbaseAdmin.disableReplicationPeer(ID_SECOND); + assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + hbaseAdmin.enableReplicationPeer(ID_SECOND); + assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); + assertEquals(SyncReplicationState.ACTIVE, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + + try { + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, + SyncReplicationState.STANDBY); + fail("Can't transit cluster state from ACTIVE to STANDBY"); + } catch (Exception e) { + // OK + } + + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, + SyncReplicationState.DOWNGRADE_ACTIVE); + assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.STANDBY); + assertEquals(SyncReplicationState.STANDBY, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + + try { + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); + fail("Can't transit cluster state from STANDBY to ACTIVE"); + } catch (Exception e) { + // OK + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 34570d37b80..e2ed33aafc9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -467,4 +468,9 @@ public class MockNoopMasterServices implements MasterServices { public boolean isClusterUp() { return true; } + + public long transitReplicationPeerSyncReplicationState(String peerId, + SyncReplicationState clusterState) throws ReplicationException, IOException { + return 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index 08dd428d04f..24b930c588b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -109,7 +110,8 @@ public class TestReplicationHFileCleaner { public void setup() throws ReplicationException, IOException { root = TEST_UTIL.getDataTestDirOnTestFS(); rp.getPeerStorage().addPeer(peerId, - ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true); + ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true, + SyncReplicationState.NONE); rq.addPeerToHFileRefs(peerId); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java index 65eac4a9f04..241909583e7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java @@ -155,11 +155,13 @@ public class TestReplicationTrackerZKImpl { public void testPeerNameControl() throws Exception { int exists = 0; rp.getPeerStorage().addPeer("6", - ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true); + ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true, + SyncReplicationState.NONE); try { rp.getPeerStorage().addPeer("6", - ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true); + ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true, + SyncReplicationState.NONE); } catch (ReplicationException e) { if (e.getCause() instanceof KeeperException.NodeExistsException) { exists++; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index eb46cd77f4e..81708935731 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -585,7 +586,7 @@ public abstract class TestReplicationSourceManager { private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, final boolean waitForSource) throws Exception { final ReplicationPeers rp = manager.getReplicationPeers(); - rp.getPeerStorage().addPeer(peerId, peerConfig, true); + rp.getPeerStorage().addPeer(peerId, peerConfig, true, SyncReplicationState.NONE); try { manager.addPeer(peerId); } catch (Exception e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index 870fa19b37e..d2aa6823db6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -117,6 +117,7 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.Permission.Action; @@ -2940,6 +2941,21 @@ public class TestAccessController extends SecureTestUtil { verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER); } + @Test + public void testTransitSyncReplicationPeerState() throws Exception { + AccessTestAction action = new AccessTestAction() { + @Override + public Object run() throws Exception { + ACCESS_CONTROLLER.preTransitReplicationPeerSyncReplicationState( + ObserverContextImpl.createAndPrepare(CP_ENV), "test", SyncReplicationState.NONE); + return null; + } + }; + + verifyAllowed(action, SUPERUSER, USER_ADMIN); + verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER); + } + @Test public void testListReplicationPeers() throws Exception { AccessTestAction action = new AccessTestAction() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java index 89119824d93..f5eca39a7d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE; @@ -67,9 +68,9 @@ public class TestHBaseFsckReplication { String peerId1 = "1"; String peerId2 = "2"; peerStorage.addPeer(peerId1, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(), - true); + true, SyncReplicationState.NONE); peerStorage.addPeer(peerId2, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(), - true); + true, SyncReplicationState.NONE); for (int i = 0; i < 10; i++) { queueStorage.addWAL(ServerName.valueOf("localhost", 10000 + i, 100000 + i), peerId1, "file-" + i); diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index d1f1344d8a3..5f86365c5b2 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -20,6 +20,7 @@ include Java java_import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil +java_import org.apache.hadoop.hbase.replication.SyncReplicationState java_import org.apache.hadoop.hbase.replication.ReplicationPeerConfig java_import org.apache.hadoop.hbase.util.Bytes java_import org.apache.hadoop.hbase.zookeeper.ZKConfig @@ -338,6 +339,20 @@ module Hbase '!' + ReplicationPeerConfigUtil.convertToString(tableCFs) end + # Transit current cluster to a new state in the specified synchronous + # replication peer + def transit_peer_sync_replication_state(id, state) + if 'ACTIVE'.eql?(state) + @admin.transitReplicationPeerSyncReplicationState(id, SyncReplicationState::ACTIVE) + elsif 'DOWNGRADE_ACTIVE'.eql?(state) + @admin.transitReplicationPeerSyncReplicationState(id, SyncReplicationState::DOWNGRADE_ACTIVE) + elsif 'STANDBY'.eql?(state) + @admin.transitReplicationPeerSyncReplicationState(id, SyncReplicationState::STANDBY) + else + raise(ArgumentError, 'synchronous replication state must be ACTIVE, DOWNGRADE_ACTIVE or STANDBY') + end + end + #---------------------------------------------------------------------------------------------- # Enables a table's replication switch def enable_tablerep(table_name) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 9a796587b6b..934fa11460a 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -393,6 +393,7 @@ Shell.load_command_group( get_peer_config list_peer_configs update_peer_config + transit_peer_sync_replication_state ] ) diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb index f3ab7496a56..f2ec014df8b 100644 --- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb +++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb @@ -39,8 +39,8 @@ EOF peers = replication_admin.list_peers formatter.header(%w[PEER_ID CLUSTER_KEY ENDPOINT_CLASSNAME - REMOTE_ROOT_DIR STATE REPLICATE_ALL - NAMESPACES TABLE_CFS BANDWIDTH + REMOTE_ROOT_DIR SYNC_REPLICATION_STATE STATE + REPLICATE_ALL NAMESPACES TABLE_CFS BANDWIDTH SERIAL]) peers.each do |peer| @@ -67,7 +67,7 @@ EOF remote_root_dir = config.getRemoteWALDir end formatter.row([id, cluster_key, endpoint_classname, - remote_root_dir, state, + remote_root_dir, peer.getSyncReplicationState, state, config.replicateAllUserTables, namespaces, tableCFs, config.getBandwidth, config.isSerial]) end diff --git a/hbase-shell/src/main/ruby/shell/commands/transit_peer_sync_replication_state.rb b/hbase-shell/src/main/ruby/shell/commands/transit_peer_sync_replication_state.rb new file mode 100644 index 00000000000..6d4a9638282 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/transit_peer_sync_replication_state.rb @@ -0,0 +1,44 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class TransitPeerSyncReplicationState < Command + def help + <<-EOF +Transit current cluster to new state in the specified synchronous replication peer. +Examples: + + # Transit cluster state to DOWNGRADE_ACTIVE in a synchronous replication peer + hbase> transit_peer_sync_replication_state '1', 'DOWNGRADE_ACTIVE' + # Transit cluster state to ACTIVE in a synchronous replication peer + hbase> transit_peer_sync_replication_state '1', 'ACTIVE' + # Transit cluster state to STANDBY in a synchronous replication peer + hbase> transit_peer_sync_replication_state '1', 'STANDBY' + +EOF + end + + def command(id, state) + replication_admin.transit_peer_sync_replication_state(id, state) + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 5d04fbba878..9d364ce43f2 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -23,6 +23,9 @@ require 'hbase/hbase' require 'hbase/table' include HBaseConstants +include Java + +java_import org.apache.hadoop.hbase.replication.SyncReplicationState module Hbase class ReplicationAdminTest < Test::Unit::TestCase @@ -513,6 +516,27 @@ module Hbase command(:remove_peer, @peer_id) end + define_test "transit_peer_sync_replication_state: test" do + cluster_key = "server1.cie.com:2181:/hbase" + remote_wal_dir = "hdfs://srv1:9999/hbase" + args = { CLUSTER_KEY => cluster_key, REMOTE_WAL_DIR => remote_wal_dir } + command(:add_peer, @peer_id, args) + + assert_equal(1, command(:list_peers).length) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(SyncReplicationState::DOWNGRADE_ACTIVE, peer.getSyncReplicationState) + + command(:transit_peer_sync_replication_state, @peer_id, 'ACTIVE') + assert_equal(1, command(:list_peers).length) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(SyncReplicationState::ACTIVE, peer.getSyncReplicationState) + + # cleanup for future tests + command(:remove_peer, @peer_id) + end + define_test "get_peer_config: works with simple clusterKey peer" do cluster_key = "localhost:2181:/hbase-test" args = { CLUSTER_KEY => cluster_key }