diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java index c21cdf884dc..1f34f884e67 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java @@ -20,9 +20,11 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @@ -83,4 +85,11 @@ public class BaseRegionServerObserver implements RegionServerObserver { return endpoint; } + @Override + public void preReplicateLogEntries(ObserverContext ctx, + List entries, CellScanner cells) throws IOException { } + + @Override + public void postReplicateLogEntries(ObserverContext ctx, + List entries, CellScanner cells) throws IOException { } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java index 5c07fd2180a..316bad50881 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java @@ -21,9 +21,11 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; import java.util.List; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.MetaMutationAnnotation; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @@ -131,4 +133,23 @@ public interface RegionServerObserver extends Coprocessor { ReplicationEndpoint postCreateReplicationEndPoint( ObserverContext ctx, ReplicationEndpoint endpoint); + /** + * This will be called before executing replication request to shipping log entries. + * @param ctx An instance of ObserverContext + * @param entries list of WALEntries to replicate + * @param cells Cells that the WALEntries refer to (if cells is non-null) + * @throws IOException Signals that an I/O exception has occurred. + */ + void preReplicateLogEntries(final ObserverContext ctx, + List entries, CellScanner cells) throws IOException; + + /** + * This will be called after executing replication request to shipping log entries. + * @param ctx An instance of ObserverContext + * @param entries list of WALEntries to replicate + * @param cells Cells that the WALEntries refer to (if cells is non-null) + * @throws IOException Signals that an I/O exception has occurred. + */ + void postReplicateLogEntries(final ObserverContext ctx, + List entries, CellScanner cells) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 7f3b7cb3d74..3bdb52833f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1520,8 +1520,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (regionServer.replicationSinkHandler != null) { checkOpen(); requestCount.increment(); - regionServer.replicationSinkHandler.replicateLogEntries(request.getEntryList(), - ((PayloadCarryingRpcController)controller).cellScanner()); + List entries = request.getEntryList(); + CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner(); + regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner); + regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner); + regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner); } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java index c4bcac776ac..43a3f32757a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @@ -158,6 +160,28 @@ public class RegionServerCoprocessorHost extends }); } + public void preReplicateLogEntries(final List entries, final CellScanner cells) + throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext ctx) throws IOException { + oserver.preReplicateLogEntries(ctx, entries, cells); + } + }); + } + + public void postReplicateLogEntries(final List entries, final CellScanner cells) + throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext ctx) throws IOException { + oserver.postReplicateLogEntries(ctx, entries, cells); + } + }); + } + public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) throws IOException { return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 350991c8a63..ca1fba893df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest; @@ -2421,4 +2422,15 @@ public class AccessController extends BaseMasterAndRegionObserver ObserverContext ctx, ReplicationEndpoint endpoint) { return endpoint; } + + @Override + public void preReplicateLogEntries(ObserverContext ctx, + List entries, CellScanner cells) throws IOException { + requirePermission("replicateLogEntries", Action.WRITE); + } + + @Override + public void postReplicateLogEntries(ObserverContext ctx, + List entries, CellScanner cells) throws IOException { + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index 45bb5359ee5..3c8fea58ea8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -2639,4 +2639,21 @@ public class TestAccessController extends SecureTestUtil { verifyAnyCreate(prepareBulkLoadAction); verifyAnyCreate(cleanupBulkLoadAction); } + + @Test + public void testReplicateLogEntries() throws Exception { + AccessTestAction replicateLogEntriesAction = new AccessTestAction() { + @Override + public Object run() throws Exception { + ACCESS_CONTROLLER.preReplicateLogEntries(ObserverContext.createAndPrepare(RSCP_ENV, null), + null, null); + ACCESS_CONTROLLER.postReplicateLogEntries(ObserverContext.createAndPrepare(RSCP_ENV, null), + null, null); + return null; + } + }; + + verifyAllowed(replicateLogEntriesAction, SUPERUSER, USER_ADMIN); + verifyDenied(replicateLogEntriesAction, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER); + } }