HBASE-27448 Add an admin method to get replication enabled state (#4855)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
984d226010
commit
dc4fa05a2b
|
@ -2163,6 +2163,14 @@ public interface Admin extends Abortable, Closeable {
|
|||
return peers.get(0).getSyncReplicationState();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a replication peer is enabled.
|
||||
* @param peerId id of replication peer to check
|
||||
* @return <code>true</code> if replication peer is enabled
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
boolean isReplicationPeerEnabled(String peerId) throws IOException;
|
||||
|
||||
/**
|
||||
* Mark region server(s) as decommissioned to prevent additional regions from getting assigned to
|
||||
* them. Optionally unload the regions on the servers. If there are multiple servers to be
|
||||
|
|
|
@ -876,6 +876,11 @@ class AdminOverAsyncAdmin implements Admin {
|
|||
return admin.transitReplicationPeerSyncReplicationState(peerId, state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
|
||||
return get(admin.isReplicationPeerEnabled(peerId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decommissionRegionServers(List<ServerName> servers, boolean offload)
|
||||
throws IOException {
|
||||
|
|
|
@ -795,6 +795,14 @@ public interface AsyncAdmin {
|
|||
*/
|
||||
CompletableFuture<Void> disableTableReplication(TableName tableName);
|
||||
|
||||
/**
|
||||
* Check if a replication peer is enabled.
|
||||
* @param peerId id of replication peer to check
|
||||
* @return true if replication peer is enabled. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId);
|
||||
|
||||
/**
|
||||
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
|
||||
* taken. If the table is disabled, an offline snapshot is taken. Snapshots are taken sequentially
|
||||
|
|
|
@ -481,6 +481,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
return wrap(rawAdmin.disableTableReplication(tableName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
|
||||
return wrap(rawAdmin.isReplicationPeerEnabled(peerId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
|
||||
return wrap(rawAdmin.snapshot(snapshot));
|
||||
|
|
|
@ -325,6 +325,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
||||
|
@ -3734,6 +3736,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
|
||||
GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
|
||||
request.setPeerId(peerId);
|
||||
return this.<Boolean> newMasterCaller()
|
||||
.action((controller, stub) -> this.<GetReplicationPeerStateRequest,
|
||||
GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
|
||||
(s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
|
||||
resp -> resp.getIsEnabled()))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
|
||||
CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
|
||||
|
|
|
@ -1106,6 +1106,9 @@ service MasterService {
|
|||
rpc TransitReplicationPeerSyncReplicationState(TransitReplicationPeerSyncReplicationStateRequest)
|
||||
returns(TransitReplicationPeerSyncReplicationStateResponse);
|
||||
|
||||
rpc IsReplicationPeerEnabled(GetReplicationPeerStateRequest)
|
||||
returns(GetReplicationPeerStateResponse);
|
||||
|
||||
/** Returns a list of ServerNames marked as decommissioned. */
|
||||
rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest)
|
||||
returns(ListDecommissionedRegionServersResponse);
|
||||
|
|
|
@ -161,3 +161,10 @@ message TransitReplicationPeerSyncReplicationStateRequest {
|
|||
message TransitReplicationPeerSyncReplicationStateResponse {
|
||||
required uint64 proc_id = 1;
|
||||
}
|
||||
|
||||
message GetReplicationPeerStateRequest {
|
||||
required string peer_id = 1;
|
||||
}
|
||||
message GetReplicationPeerStateResponse {
|
||||
required bool is_enabled = 1;
|
||||
}
|
||||
|
|
|
@ -417,6 +417,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
||||
|
@ -2105,6 +2107,18 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
|
|||
return response.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController controller,
|
||||
GetReplicationPeerStateRequest request) throws ServiceException {
|
||||
boolean isEnabled;
|
||||
try {
|
||||
isEnabled = server.getReplicationPeerManager().getPeerState(request.getPeerId());
|
||||
} catch (ReplicationException ioe) {
|
||||
throw new ServiceException(ioe);
|
||||
}
|
||||
return GetReplicationPeerStateResponse.newBuilder().setIsEnabled(isEnabled).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(
|
||||
RpcController controller, ListDecommissionedRegionServersRequest request)
|
||||
|
|
|
@ -271,6 +271,15 @@ public class ReplicationPeerManager {
|
|||
desc.getSyncReplicationState()));
|
||||
}
|
||||
|
||||
public boolean getPeerState(String peerId) throws ReplicationException {
|
||||
ReplicationPeerDescription desc = peers.get(peerId);
|
||||
if (desc != null) {
|
||||
return desc.isEnabled();
|
||||
} else {
|
||||
throw new ReplicationException("Replication Peer of " + peerId + " does not exist.");
|
||||
}
|
||||
}
|
||||
|
||||
public void enablePeer(String peerId) throws ReplicationException {
|
||||
setPeerState(peerId, true);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
|
|||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -441,4 +442,19 @@ public class TestReplicationSmallTests extends TestReplicationBase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for HBASE-27448 Add an admin method to get replication enabled state
|
||||
*/
|
||||
@Test
|
||||
public void testGetReplicationPeerState() throws Exception {
|
||||
|
||||
// Test disable replication peer
|
||||
hbaseAdmin.disableReplicationPeer("2");
|
||||
assertFalse(hbaseAdmin.isReplicationPeerEnabled("2"));
|
||||
|
||||
// Test enable replication peer
|
||||
hbaseAdmin.enableReplicationPeer("2");
|
||||
assertTrue(hbaseAdmin.isReplicationPeerEnabled("2"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -676,6 +676,11 @@ public class VerifyingRSGroupAdmin implements Admin, Closeable {
|
|||
return admin.transitReplicationPeerSyncReplicationStateAsync(peerId, state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
|
||||
return admin.isReplicationPeerEnabled(peerId);
|
||||
}
|
||||
|
||||
public void decommissionRegionServers(List<ServerName> servers, boolean offload)
|
||||
throws IOException {
|
||||
admin.decommissionRegionServers(servers, offload);
|
||||
|
|
|
@ -1031,6 +1031,11 @@ public class ThriftAdmin implements Admin {
|
|||
"transitReplicationPeerSyncReplicationStateAsync not supported in ThriftAdmin");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
|
||||
throw new NotImplementedException("isReplicationPeerEnabled not supported in ThriftAdmin");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decommissionRegionServers(List<ServerName> servers, boolean offload) {
|
||||
throw new NotImplementedException("decommissionRegionServers not supported in ThriftAdmin");
|
||||
|
|
Loading…
Reference in New Issue