HBASE-17335 enable/disable replication peer requests should be routed through master

This commit is contained in:
Guanghao Zhang 2016-12-22 20:47:36 +08:00
parent 45da294a17
commit b3f2bec099
19 changed files with 2366 additions and 11 deletions

View File

@ -1842,4 +1842,20 @@ public interface Admin extends Abortable, Closeable {
*/
default void removeReplicationPeer(final String peerId) throws IOException {
}
/**
* Restart the replication stream to the specified peer
* @param peerId a short name that identifies the peer
* @throws IOException
*/
default void enableReplicationPeer(final String peerId) throws IOException {
}
/**
* Stop the replication stream to the specified peer
* @param peerId a short name that identifies the peer
* @throws IOException
*/
default void disableReplicationPeer(final String peerId) throws IOException {
}
}

View File

@ -90,6 +90,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormali
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.util.Bytes;
@ -1653,6 +1657,18 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
RemoveReplicationPeerRequest request) throws ServiceException {
return stub.removeReplicationPeer(controller, request);
}
@Override
public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller,
EnableReplicationPeerRequest request) throws ServiceException {
return stub.enableReplicationPeer(controller, request);
}
@Override
public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller,
DisableReplicationPeerRequest request) throws ServiceException {
return stub.disableReplicationPeer(controller, request);
}
};
}

View File

@ -3770,4 +3770,28 @@ public class HBaseAdmin implements Admin {
}
});
}
@Override
public void enableReplicationPeer(final String peerId) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
protected Void rpcCall() throws Exception {
master.enableReplicationPeer(getRpcController(),
RequestConverter.buildEnableReplicationPeerRequest(peerId));
return null;
}
});
}
@Override
public void disableReplicationPeer(final String peerId) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
protected Void rpcCall() throws Exception {
master.disableReplicationPeer(getRpcController(),
RequestConverter.buildDisableReplicationPeerRequest(peerId));
return null;
}
});
}
}

View File

@ -228,16 +228,16 @@ public class ReplicationAdmin implements Closeable {
* Restart the replication stream to the specified peer.
* @param id a short name that identifies the cluster
*/
public void enablePeer(String id) throws ReplicationException {
this.replicationPeers.enablePeer(id);
public void enablePeer(String id) throws IOException {
this.admin.enableReplicationPeer(id);
}
/**
* Stop the replication stream to the specified peer.
* @param id a short name that identifies the cluster
*/
public void disablePeer(String id) throws ReplicationException {
this.replicationPeers.disablePeer(id);
public void disablePeer(String id) throws IOException {
this.admin.disableReplicationPeer(id);
}
/**

View File

@ -114,6 +114,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -1580,4 +1582,18 @@ public final class RequestConverter {
builder.setPeerId(peerId);
return builder.build();
}
public static ReplicationProtos.EnableReplicationPeerRequest buildEnableReplicationPeerRequest(
String peerId) {
EnableReplicationPeerRequest.Builder builder = EnableReplicationPeerRequest.newBuilder();
builder.setPeerId(peerId);
return builder.build();
}
public static ReplicationProtos.DisableReplicationPeerRequest buildDisableReplicationPeerRequest(
String peerId) {
DisableReplicationPeerRequest.Builder builder = DisableReplicationPeerRequest.newBuilder();
builder.setPeerId(peerId);
return builder.build();
}
}

View File

@ -66368,6 +66368,30 @@ public final class MasterProtos {
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest request,
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse> done);
/**
* <pre>
** Enable a replication peer
* </pre>
*
* <code>rpc EnableReplicationPeer(.hbase.pb.EnableReplicationPeerRequest) returns (.hbase.pb.EnableReplicationPeerResponse);</code>
*/
public abstract void enableReplicationPeer(
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest request,
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse> done);
/**
* <pre>
** Disable a replication peer
* </pre>
*
* <code>rpc DisableReplicationPeer(.hbase.pb.DisableReplicationPeerRequest) returns (.hbase.pb.DisableReplicationPeerResponse);</code>
*/
public abstract void disableReplicationPeer(
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest request,
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse> done);
}
public static org.apache.hadoop.hbase.shaded.com.google.protobuf.Service newReflectiveService(
@ -66853,6 +66877,22 @@ public final class MasterProtos {
impl.removeReplicationPeer(controller, request, done);
}
@java.lang.Override
public void enableReplicationPeer(
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest request,
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse> done) {
impl.enableReplicationPeer(controller, request, done);
}
@java.lang.Override
public void disableReplicationPeer(
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest request,
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse> done) {
impl.disableReplicationPeer(controller, request, done);
}
};
}
@ -66995,6 +67035,10 @@ public final class MasterProtos {
return impl.addReplicationPeer(controller, (org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest)request);
case 59:
return impl.removeReplicationPeer(controller, (org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest)request);
case 60:
return impl.enableReplicationPeer(controller, (org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest)request);
case 61:
return impl.disableReplicationPeer(controller, (org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest)request);
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -67129,6 +67173,10 @@ public final class MasterProtos {
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest.getDefaultInstance();
case 59:
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest.getDefaultInstance();
case 60:
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest.getDefaultInstance();
case 61:
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -67263,6 +67311,10 @@ public final class MasterProtos {
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse.getDefaultInstance();
case 59:
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse.getDefaultInstance();
case 60:
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse.getDefaultInstance();
case 61:
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -68020,6 +68072,30 @@ public final class MasterProtos {
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest request,
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse> done);
/**
* <pre>
** Enable a replication peer
* </pre>
*
* <code>rpc EnableReplicationPeer(.hbase.pb.EnableReplicationPeerRequest) returns (.hbase.pb.EnableReplicationPeerResponse);</code>
*/
public abstract void enableReplicationPeer(
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest request,
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse> done);
/**
* <pre>
** Disable a replication peer
* </pre>
*
* <code>rpc DisableReplicationPeer(.hbase.pb.DisableReplicationPeerRequest) returns (.hbase.pb.DisableReplicationPeerResponse);</code>
*/
public abstract void disableReplicationPeer(
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest request,
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse> done);
public static final
org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.ServiceDescriptor
getDescriptor() {
@ -68342,6 +68418,16 @@ public final class MasterProtos {
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse>specializeCallback(
done));
return;
case 60:
this.enableReplicationPeer(controller, (org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest)request,
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse>specializeCallback(
done));
return;
case 61:
this.disableReplicationPeer(controller, (org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest)request,
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse>specializeCallback(
done));
return;
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -68476,6 +68562,10 @@ public final class MasterProtos {
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest.getDefaultInstance();
case 59:
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest.getDefaultInstance();
case 60:
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest.getDefaultInstance();
case 61:
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -68610,6 +68700,10 @@ public final class MasterProtos {
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse.getDefaultInstance();
case 59:
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse.getDefaultInstance();
case 60:
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse.getDefaultInstance();
case 61:
return org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -69530,6 +69624,36 @@ public final class MasterProtos {
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse.class,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse.getDefaultInstance()));
}
public void enableReplicationPeer(
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest request,
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse> done) {
channel.callMethod(
getDescriptor().getMethods().get(60),
controller,
request,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse.getDefaultInstance(),
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcUtil.generalizeCallback(
done,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse.class,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse.getDefaultInstance()));
}
public void disableReplicationPeer(
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest request,
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse> done) {
channel.callMethod(
getDescriptor().getMethods().get(61),
controller,
request,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse.getDefaultInstance(),
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcUtil.generalizeCallback(
done,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse.class,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse.getDefaultInstance()));
}
}
public static BlockingInterface newBlockingStub(
@ -69837,6 +69961,16 @@ public final class MasterProtos {
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest request)
throws org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
public org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse enableReplicationPeer(
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest request)
throws org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
public org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse disableReplicationPeer(
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest request)
throws org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
}
private static final class BlockingStub implements BlockingInterface {
@ -70565,6 +70699,30 @@ public final class MasterProtos {
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse.getDefaultInstance());
}
public org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse enableReplicationPeer(
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest request)
throws org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException {
return (org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse) channel.callBlockingMethod(
getDescriptor().getMethods().get(60),
controller,
request,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse.getDefaultInstance());
}
public org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse disableReplicationPeer(
org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest request)
throws org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException {
return (org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse) channel.callBlockingMethod(
getDescriptor().getMethods().get(61),
controller,
request,
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse.getDefaultInstance());
}
}
// @@protoc_insertion_point(class_scope:hbase.pb.MasterService)
@ -71346,7 +71504,7 @@ public final class MasterProtos {
"ENTICATION\020\000\022\031\n\025SECURE_AUTHENTICATION\020\001\022",
"\021\n\rAUTHORIZATION\020\002\022\026\n\022CELL_AUTHORIZATION" +
"\020\003\022\023\n\017CELL_VISIBILITY\020\004*(\n\020MasterSwitchT" +
"ype\022\t\n\005SPLIT\020\000\022\t\n\005MERGE\020\0012\374*\n\rMasterServ" +
"ype\022\t\n\005SPLIT\020\000\022\t\n\005MERGE\020\0012\323,\n\rMasterServ" +
"ice\022e\n\024GetSchemaAlterStatus\022%.hbase.pb.G" +
"etSchemaAlterStatusRequest\032&.hbase.pb.Ge" +
"tSchemaAlterStatusResponse\022b\n\023GetTableDe" +
@ -71484,9 +71642,14 @@ public final class MasterProtos {
"eplicationPeerResponse\022h\n\025RemoveReplicat" +
"ionPeer\022&.hbase.pb.RemoveReplicationPeer" +
"Request\032\'.hbase.pb.RemoveReplicationPeer",
"ResponseBI\n1org.apache.hadoop.hbase.shad" +
"ed.protobuf.generatedB\014MasterProtosH\001\210\001\001" +
"\240\001\001"
"Response\022h\n\025EnableReplicationPeer\022&.hbas" +
"e.pb.EnableReplicationPeerRequest\032\'.hbas" +
"e.pb.EnableReplicationPeerResponse\022k\n\026Di" +
"sableReplicationPeer\022\'.hbase.pb.DisableR" +
"eplicationPeerRequest\032(.hbase.pb.Disable" +
"ReplicationPeerResponseBI\n1org.apache.ha" +
"doop.hbase.shaded.protobuf.generatedB\014Ma" +
"sterProtosH\001\210\001\001\240\001\001"
};
org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() {

View File

@ -855,4 +855,12 @@ service MasterService {
/** Remove a replication peer */
rpc RemoveReplicationPeer(RemoveReplicationPeerRequest)
returns(RemoveReplicationPeerResponse);
/** Enable a replication peer */
rpc EnableReplicationPeer(EnableReplicationPeerRequest)
returns(EnableReplicationPeerResponse);
/** Disable a replication peer */
rpc DisableReplicationPeer(DisableReplicationPeerRequest)
returns(DisableReplicationPeerResponse);
}

View File

@ -40,3 +40,17 @@ message RemoveReplicationPeerRequest {
message RemoveReplicationPeerResponse {
}
message EnableReplicationPeerRequest {
required string peer_id = 1;
}
message EnableReplicationPeerResponse {
}
message DisableReplicationPeerRequest {
required string peer_id = 1;
}
message DisableReplicationPeerResponse {
}

View File

@ -1869,4 +1869,44 @@ public interface MasterObserver extends Coprocessor {
default void postRemoveReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String peerId) throws IOException {
}
/**
* Called before enable a replication peer
* @param ctx
* @param peerId a short name that identifies the peer
* @throws IOException on failure
*/
default void preEnableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String peerId) throws IOException {
}
/**
* Called after enable a replication peer
* @param ctx
* @param peerId a short name that identifies the peer
* @throws IOException on failure
*/
default void postEnableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String peerId) throws IOException {
}
/**
* Called before disable a replication peer
* @param ctx
* @param peerId a short name that identifies the peer
* @throws IOException on failure
*/
default void preDisableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String peerId) throws IOException {
}
/**
* Called after disable a replication peer
* @param ctx
* @param peerId a short name that identifies the peer
* @throws IOException on failure
*/
default void postDisableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String peerId) throws IOException {
}
}

View File

@ -3172,4 +3172,28 @@ public class HMaster extends HRegionServer implements MasterServices {
cpHost.postRemoveReplicationPeer(peerId);
}
}
@Override
public void enableReplicationPeer(String peerId) throws ReplicationException, IOException {
if (cpHost != null) {
cpHost.preEnableReplicationPeer(peerId);
}
LOG.info(getClientIdAuditPrefix() + " enable replication peer, id=" + peerId);
this.replicationManager.enableReplicationPeer(peerId);
if (cpHost != null) {
cpHost.postEnableReplicationPeer(peerId);
}
}
@Override
public void disableReplicationPeer(String peerId) throws ReplicationException, IOException {
if (cpHost != null) {
cpHost.preDisableReplicationPeer(peerId);
}
LOG.info(getClientIdAuditPrefix() + " disable replication peer, id=" + peerId);
this.replicationManager.disableReplicationPeer(peerId);
if (cpHost != null) {
cpHost.postDisableReplicationPeer(peerId);
}
}
}

View File

@ -1687,4 +1687,44 @@ public class MasterCoprocessorHost
}
});
}
public void preEnableReplicationPeer(final String peerId) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(MasterObserver observer, ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
observer.preEnableReplicationPeer(ctx, peerId);
}
});
}
public void postEnableReplicationPeer(final String peerId) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(MasterObserver observer, ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
observer.postEnableReplicationPeer(ctx, peerId);
}
});
}
public void preDisableReplicationPeer(final String peerId) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(MasterObserver observer, ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
observer.preDisableReplicationPeer(ctx, peerId);
}
});
}
public void postDisableReplicationPeer(final String peerId) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(MasterObserver observer, ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
observer.postDisableReplicationPeer(ctx, peerId);
}
});
}
}

View File

@ -89,6 +89,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
@ -1667,4 +1671,26 @@ public class MasterRpcServices extends RSRpcServices
throw new ServiceException(e);
}
}
@Override
public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller,
EnableReplicationPeerRequest request) throws ServiceException {
try {
master.enableReplicationPeer(request.getPeerId());
return EnableReplicationPeerResponse.newBuilder().build();
} catch (ReplicationException | IOException e) {
throw new ServiceException(e);
}
}
@Override
public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller,
DisableReplicationPeerRequest request) throws ServiceException {
try {
master.disableReplicationPeer(request.getPeerId());
return DisableReplicationPeerResponse.newBuilder().build();
} catch (ReplicationException | IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -431,4 +431,16 @@ public interface MasterServices extends Server {
* @param peerId a short name that identifies the peer
*/
void removeReplicationPeer(String peerId) throws ReplicationException, IOException;
/**
* Restart the replication stream to the specified peer
* @param peerId a short name that identifies the peer
*/
void enableReplicationPeer(String peerId) throws ReplicationException, IOException;
/**
* Stop the replication stream to the specified peer
* @param peerId a short name that identifies the peer
*/
void disableReplicationPeer(String peerId) throws ReplicationException, IOException;
}

View File

@ -73,6 +73,14 @@ public class ReplicationManager {
this.replicationPeers.unregisterPeer(peerId);
}
public void enableReplicationPeer(String peerId) throws ReplicationException {
this.replicationPeers.enablePeer(peerId);
}
public void disableReplicationPeer(String peerId) throws ReplicationException {
this.replicationPeers.disablePeer(peerId);
}
/**
* Set a namespace in the peer config means that all tables in this namespace
* will be replicated to the peer cluster.

View File

@ -2708,4 +2708,16 @@ public class AccessController extends BaseMasterAndRegionObserver
String peerId) throws IOException {
requirePermission(getActiveUser(ctx), "removeReplicationPeer", Action.ADMIN);
}
@Override
public void preEnableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String peerId) throws IOException {
requirePermission(getActiveUser(ctx), "enableReplicationPeer", Action.ADMIN);
}
@Override
public void preDisableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String peerId) throws IOException {
requirePermission(getActiveUser(ctx), "disableReplicationPeer", Action.ADMIN);
}
}

View File

@ -391,4 +391,12 @@ public class MockNoopMasterServices implements MasterServices, Server {
@Override
public void removeReplicationPeer(String peerId) throws ReplicationException {
}
@Override
public void enableReplicationPeer(String peerId) throws ReplicationException, IOException {
}
@Override
public void disableReplicationPeer(String peerId) throws ReplicationException, IOException {
}
}

View File

@ -2900,4 +2900,34 @@ public class TestAccessController extends SecureTestUtil {
verifyAllowed(action, SUPERUSER, USER_ADMIN);
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
}
@Test
public void testEnableReplicationPeer() throws Exception {
AccessTestAction action = new AccessTestAction() {
@Override
public Object run() throws Exception {
ACCESS_CONTROLLER.preEnableReplicationPeer(ObserverContext.createAndPrepare(CP_ENV, null),
"test");
return null;
}
};
verifyAllowed(action, SUPERUSER, USER_ADMIN);
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
}
@Test
public void testDisableReplicationPeer() throws Exception {
AccessTestAction action = new AccessTestAction() {
@Override
public Object run() throws Exception {
ACCESS_CONTROLLER.preDisableReplicationPeer(ObserverContext.createAndPrepare(CP_ENV, null),
"test");
return null;
}
};
verifyAllowed(action, SUPERUSER, USER_ADMIN);
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
}
}

View File

@ -118,6 +118,8 @@ In case the table goes out of date, the unit tests which check for accuracy of p
| | setNamespaceQuota | superuser\|global(A)
| | addReplicationPeer | superuser\|global(A)
| | removeReplicationPeer | superuser\|global(A)
| | enableReplicationPeer | superuser\|global(A)
| | disableReplicationPeer | superuser\|global(A)
| Region | openRegion | superuser\|global(A)
| | closeRegion | superuser\|global(A)
| | flush | superuser\|global(A)\|global\(C)\|TableOwner\|table(A)\|table\(C)