HBASE-27448 Add an admin method to get replication enabled state (#4858)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
7b0d705a1a
commit
1530d855ea
|
@ -3009,6 +3009,14 @@ public interface Admin extends Abortable, Closeable {
|
||||||
*/
|
*/
|
||||||
void disableTableReplication(TableName tableName) throws IOException;
|
void disableTableReplication(TableName tableName) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a replication peer is enabled.
|
||||||
|
* @param peerId id of replication peer to check
|
||||||
|
* @return <code>true</code> if replication peer is enabled
|
||||||
|
* @throws IOException if a remote or network exception occurs
|
||||||
|
*/
|
||||||
|
boolean isReplicationPeerEnabled(String peerId) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clear compacting queues on a regionserver.
|
* Clear compacting queues on a regionserver.
|
||||||
* @param serverName the region server name
|
* @param serverName the region server name
|
||||||
|
|
|
@ -773,6 +773,14 @@ public interface AsyncAdmin {
|
||||||
*/
|
*/
|
||||||
CompletableFuture<Void> disableTableReplication(TableName tableName);
|
CompletableFuture<Void> disableTableReplication(TableName tableName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a replication peer is enabled.
|
||||||
|
* @param peerId id of replication peer to check
|
||||||
|
* @return true if replication peer is enabled. The return value will be wrapped by a
|
||||||
|
* {@link CompletableFuture}.
|
||||||
|
*/
|
||||||
|
CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
|
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
|
||||||
* taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique
|
* taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique
|
||||||
|
|
|
@ -475,6 +475,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
|
||||||
return wrap(rawAdmin.disableTableReplication(tableName));
|
return wrap(rawAdmin.disableTableReplication(tableName));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
|
||||||
|
return wrap(rawAdmin.isReplicationPeerEnabled(peerId));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
|
public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
|
||||||
return wrap(rawAdmin.snapshot(snapshot));
|
return wrap(rawAdmin.snapshot(snapshot));
|
||||||
|
|
|
@ -160,6 +160,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
||||||
|
@ -1851,6 +1853,12 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
return stub.listReplicationPeers(controller, request);
|
return stub.listReplicationPeers(controller, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController controller,
|
||||||
|
GetReplicationPeerStateRequest request) throws ServiceException {
|
||||||
|
return stub.isReplicationPeerEnabled(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(RpcController controller,
|
public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(RpcController controller,
|
||||||
GetSpaceQuotaRegionSizesRequest request) throws ServiceException {
|
GetSpaceQuotaRegionSizesRequest request) throws ServiceException {
|
||||||
|
|
|
@ -231,6 +231,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddRe
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
|
||||||
|
@ -3977,6 +3979,21 @@ public class HBaseAdmin implements Admin {
|
||||||
setTableRep(tableName, false);
|
setTableRep(tableName, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
|
||||||
|
return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
|
||||||
|
@Override
|
||||||
|
protected Boolean rpcCall() throws Exception {
|
||||||
|
GetReplicationPeerStateRequest.Builder request =
|
||||||
|
GetReplicationPeerStateRequest.newBuilder();
|
||||||
|
request.setPeerId(peerId);
|
||||||
|
GetReplicationPeerStateResponse response =
|
||||||
|
master.isReplicationPeerEnabled(getRpcController(), request.build());
|
||||||
|
return response.getIsEnabled();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connect to peer and check the table descriptor on peer:
|
* Connect to peer and check the table descriptor on peer:
|
||||||
* <ol>
|
* <ol>
|
||||||
|
|
|
@ -295,6 +295,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
||||||
|
@ -3490,6 +3492,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
|
||||||
|
GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
|
||||||
|
request.setPeerId(peerId);
|
||||||
|
return this.<Boolean> newMasterCaller()
|
||||||
|
.action((controller, stub) -> this.<GetReplicationPeerStateRequest,
|
||||||
|
GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
|
||||||
|
(s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
|
||||||
|
resp -> resp.getIsEnabled()))
|
||||||
|
.call();
|
||||||
|
}
|
||||||
|
|
||||||
private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
|
private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
|
||||||
CompletableFuture<byte[][]> future = new CompletableFuture<>();
|
CompletableFuture<byte[][]> future = new CompletableFuture<>();
|
||||||
addListener(
|
addListener(
|
||||||
|
|
|
@ -191,6 +191,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
||||||
|
@ -669,6 +671,12 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
|
||||||
return stub.listReplicationPeers(controller, request);
|
return stub.listReplicationPeers(controller, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController controller,
|
||||||
|
GetReplicationPeerStateRequest request) throws ServiceException {
|
||||||
|
return stub.isReplicationPeerEnabled(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(RpcController controller,
|
public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(RpcController controller,
|
||||||
GetSpaceQuotaRegionSizesRequest request) throws ServiceException {
|
GetSpaceQuotaRegionSizesRequest request) throws ServiceException {
|
||||||
|
|
|
@ -1101,6 +1101,10 @@ service MasterService {
|
||||||
rpc ListReplicationPeers(ListReplicationPeersRequest)
|
rpc ListReplicationPeers(ListReplicationPeersRequest)
|
||||||
returns(ListReplicationPeersResponse);
|
returns(ListReplicationPeersResponse);
|
||||||
|
|
||||||
|
/** Returns the stat of a replication peer */
|
||||||
|
rpc IsReplicationPeerEnabled(GetReplicationPeerStateRequest)
|
||||||
|
returns(GetReplicationPeerStateResponse);
|
||||||
|
|
||||||
/** Returns a list of ServerNames marked as decommissioned. */
|
/** Returns a list of ServerNames marked as decommissioned. */
|
||||||
rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest)
|
rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest)
|
||||||
returns(ListDecommissionedRegionServersResponse);
|
returns(ListDecommissionedRegionServersResponse);
|
||||||
|
|
|
@ -137,3 +137,11 @@ message ListReplicationPeersRequest {
|
||||||
message ListReplicationPeersResponse {
|
message ListReplicationPeersResponse {
|
||||||
repeated ReplicationPeerDescription peer_desc = 1;
|
repeated ReplicationPeerDescription peer_desc = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message GetReplicationPeerStateRequest {
|
||||||
|
required string peer_id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetReplicationPeerStateResponse {
|
||||||
|
required bool is_enabled = 1;
|
||||||
|
}
|
||||||
|
|
|
@ -359,6 +359,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
||||||
|
@ -2113,6 +2115,18 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
return response.build();
|
return response.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController controller,
|
||||||
|
GetReplicationPeerStateRequest request) throws ServiceException {
|
||||||
|
boolean isEnabled;
|
||||||
|
try {
|
||||||
|
isEnabled = master.getReplicationPeerManager().getPeerState(request.getPeerId());
|
||||||
|
} catch (ReplicationException ioe) {
|
||||||
|
throw new ServiceException(ioe);
|
||||||
|
}
|
||||||
|
return GetReplicationPeerStateResponse.newBuilder().setIsEnabled(isEnabled).build();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(
|
public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(
|
||||||
RpcController controller, ListDecommissionedRegionServersRequest request)
|
RpcController controller, ListDecommissionedRegionServersRequest request)
|
||||||
|
|
|
@ -198,6 +198,15 @@ public class ReplicationPeerManager {
|
||||||
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig()));
|
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean getPeerState(String peerId) throws ReplicationException {
|
||||||
|
ReplicationPeerDescription desc = peers.get(peerId);
|
||||||
|
if (desc != null) {
|
||||||
|
return desc.isEnabled();
|
||||||
|
} else {
|
||||||
|
throw new ReplicationException("Replication Peer of " + peerId + " does not exist.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void enablePeer(String peerId) throws ReplicationException {
|
public void enablePeer(String peerId) throws ReplicationException {
|
||||||
setPeerState(peerId, true);
|
setPeerState(peerId, true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@ -442,4 +443,19 @@ public class TestReplicationSmallTests extends TestReplicationBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for HBASE-27448 Add an admin method to get replication enabled state
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testGetReplicationPeerState() throws Exception {
|
||||||
|
|
||||||
|
// Test disable replication peer
|
||||||
|
hbaseAdmin.disableReplicationPeer("2");
|
||||||
|
assertFalse(hbaseAdmin.isReplicationPeerEnabled("2"));
|
||||||
|
|
||||||
|
// Test enable replication peer
|
||||||
|
hbaseAdmin.enableReplicationPeer("2");
|
||||||
|
assertTrue(hbaseAdmin.isReplicationPeerEnabled("2"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1280,6 +1280,11 @@ public class ThriftAdmin implements Admin {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
|
||||||
|
throw new NotImplementedException("isReplicationPeerEnabled not supported in ThriftAdmin");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clearCompactionQueues(ServerName serverName, Set<String> queues) {
|
public void clearCompactionQueues(ServerName serverName, Set<String> queues) {
|
||||||
throw new NotImplementedException("clearCompactionQueues not supported in ThriftAdmin");
|
throw new NotImplementedException("clearCompactionQueues not supported in ThriftAdmin");
|
||||||
|
|
Loading…
Reference in New Issue