diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 5d9f54ad9e1..b3c8a6aee6b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -3009,6 +3009,14 @@ public interface Admin extends Abortable, Closeable { */ void disableTableReplication(TableName tableName) throws IOException; + /** + * Check if a replication peer is enabled. + * @param peerId id of replication peer to check + * @return true if replication peer is enabled + * @throws IOException if a remote or network exception occurs + */ + boolean isReplicationPeerEnabled(String peerId) throws IOException; + /** * Clear compacting queues on a regionserver. * @param serverName the region server name diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index f071e58e5d9..be8a961ee2e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -773,6 +773,14 @@ public interface AsyncAdmin { */ CompletableFuture disableTableReplication(TableName tableName); + /** + * Check if a replication peer is enabled. + * @param peerId id of replication peer to check + * @return true if replication peer is enabled. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture isReplicationPeerEnabled(String peerId); + /** * Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be * taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index f6f302463bd..925891ff58c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -475,6 +475,11 @@ class AsyncHBaseAdmin implements AsyncAdmin { return wrap(rawAdmin.disableTableReplication(tableName)); } + @Override + public CompletableFuture isReplicationPeerEnabled(String peerId) { + return wrap(rawAdmin.isReplicationPeerEnabled(peerId)); + } + @Override public CompletableFuture snapshot(SnapshotDescription snapshot) { return wrap(rawAdmin.snapshot(snapshot)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index cef1a9b787c..4d231cfe0b9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -160,6 +160,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; @@ -1851,6 +1853,12 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { return stub.listReplicationPeers(controller, request); } + @Override + public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController controller, + GetReplicationPeerStateRequest request) throws ServiceException { + return stub.isReplicationPeerEnabled(controller, request); + } + @Override public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(RpcController controller, GetSpaceQuotaRegionSizesRequest request) throws ServiceException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 5d140818730..bc6288197b9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -231,6 +231,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddRe import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; @@ -3977,6 +3979,21 @@ public class HBaseAdmin implements Admin { setTableRep(tableName, false); } + @Override + public boolean isReplicationPeerEnabled(String peerId) throws IOException { + return executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { + @Override + protected Boolean rpcCall() throws Exception { + GetReplicationPeerStateRequest.Builder request = + GetReplicationPeerStateRequest.newBuilder(); + request.setPeerId(peerId); + GetReplicationPeerStateResponse response = + master.isReplicationPeerEnabled(getRpcController(), request.build()); + return response.getIsEnabled(); + } + }); + } + /** * Connect to peer and check the table descriptor on peer: *
    diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index a310e7d864f..7970f8f3e11 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -295,6 +295,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; @@ -3490,6 +3492,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return future; } + @Override + public CompletableFuture isReplicationPeerEnabled(String peerId) { + GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder(); + request.setPeerId(peerId); + return this. newMasterCaller() + .action((controller, stub) -> this. call(controller, stub, request.build(), + (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done), + resp -> resp.getIsEnabled())) + .call(); + } + private CompletableFuture getTableSplits(TableName tableName) { CompletableFuture future = new CompletableFuture<>(); addListener( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java index 59d0dc592bd..6827700d9f7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java @@ -191,6 +191,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; @@ -669,6 +671,12 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection { return stub.listReplicationPeers(controller, request); } + @Override + public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController controller, + GetReplicationPeerStateRequest request) throws ServiceException { + return stub.isReplicationPeerEnabled(controller, request); + } + @Override public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(RpcController controller, GetSpaceQuotaRegionSizesRequest request) throws ServiceException { diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index 8c540bf7387..29fc22d618c 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -1101,6 +1101,10 @@ service MasterService { rpc ListReplicationPeers(ListReplicationPeersRequest) returns(ListReplicationPeersResponse); + /** Returns the stat of a replication peer */ + rpc IsReplicationPeerEnabled(GetReplicationPeerStateRequest) + returns(GetReplicationPeerStateResponse); + /** Returns a list of ServerNames marked as decommissioned. */ rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest) returns(ListDecommissionedRegionServersResponse); diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 1d483ce3345..8b4aa481321 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -137,3 +137,11 @@ message ListReplicationPeersRequest { message ListReplicationPeersResponse { repeated ReplicationPeerDescription peer_desc = 1; } + +message GetReplicationPeerStateRequest { + required string peer_id = 1; +} + +message GetReplicationPeerStateResponse { + required bool is_enabled = 1; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 29972fd1abf..437d9d8a07b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -359,6 +359,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; @@ -2113,6 +2115,18 @@ public class MasterRpcServices extends RSRpcServices return response.build(); } + @Override + public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController controller, + GetReplicationPeerStateRequest request) throws ServiceException { + boolean isEnabled; + try { + isEnabled = master.getReplicationPeerManager().getPeerState(request.getPeerId()); + } catch (ReplicationException ioe) { + throw new ServiceException(ioe); + } + return GetReplicationPeerStateResponse.newBuilder().setIsEnabled(isEnabled).build(); + } + @Override public ListDecommissionedRegionServersResponse listDecommissionedRegionServers( RpcController controller, ListDecommissionedRegionServersRequest request) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 58c9e33510d..9a386e64f15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -198,6 +198,15 @@ public class ReplicationPeerManager { peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig())); } + public boolean getPeerState(String peerId) throws ReplicationException { + ReplicationPeerDescription desc = peers.get(peerId); + if (desc != null) { + return desc.isEnabled(); + } else { + throw new ReplicationException("Replication Peer of " + peerId + " does not exist."); + } + } + public void enablePeer(String peerId) throws ReplicationException { setPeerState(peerId, true); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 8e93da0af0d..2757a7dc0ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -442,4 +443,19 @@ public class TestReplicationSmallTests extends TestReplicationBase { } } } + + /** + * Test for HBASE-27448 Add an admin method to get replication enabled state + */ + @Test + public void testGetReplicationPeerState() throws Exception { + + // Test disable replication peer + hbaseAdmin.disableReplicationPeer("2"); + assertFalse(hbaseAdmin.isReplicationPeerEnabled("2")); + + // Test enable replication peer + hbaseAdmin.enableReplicationPeer("2"); + assertTrue(hbaseAdmin.isReplicationPeerEnabled("2")); + } } diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index cb040c5cf2b..2084467a313 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -1280,6 +1280,11 @@ public class ThriftAdmin implements Admin { } + @Override + public boolean isReplicationPeerEnabled(String peerId) throws IOException { + throw new NotImplementedException("isReplicationPeerEnabled not supported in ThriftAdmin"); + } + @Override public void clearCompactionQueues(ServerName serverName, Set queues) { throw new NotImplementedException("clearCompactionQueues not supported in ThriftAdmin");