HBASE-12916 No access control for replicating WAL entries (Liu Shaohui)
This commit is contained in:
parent
f7df0990c2
commit
0f6faaf5fc
|
@ -20,9 +20,11 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.Mutation;
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||||
|
|
||||||
|
@ -83,4 +85,11 @@ public class BaseRegionServerObserver implements RegionServerObserver {
|
||||||
return endpoint;
|
return endpoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||||
|
List<WALEntry> entries, CellScanner cells) throws IOException { }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||||
|
List<WALEntry> entries, CellScanner cells) throws IOException { }
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,9 +21,11 @@ package org.apache.hadoop.hbase.coprocessor;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.Coprocessor;
|
import org.apache.hadoop.hbase.Coprocessor;
|
||||||
import org.apache.hadoop.hbase.MetaMutationAnnotation;
|
import org.apache.hadoop.hbase.MetaMutationAnnotation;
|
||||||
import org.apache.hadoop.hbase.client.Mutation;
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||||
|
|
||||||
|
@ -131,4 +133,23 @@ public interface RegionServerObserver extends Coprocessor {
|
||||||
ReplicationEndpoint postCreateReplicationEndPoint(
|
ReplicationEndpoint postCreateReplicationEndPoint(
|
||||||
ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint);
|
ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This will be called before executing replication request to shipping log entries.
|
||||||
|
* @param ctx An instance of ObserverContext
|
||||||
|
* @param entries list of WALEntries to replicate
|
||||||
|
* @param cells Cells that the WALEntries refer to (if cells is non-null)
|
||||||
|
* @throws IOException Signals that an I/O exception has occurred.
|
||||||
|
*/
|
||||||
|
void preReplicateLogEntries(final ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||||
|
List<WALEntry> entries, CellScanner cells) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This will be called after executing replication request to shipping log entries.
|
||||||
|
* @param ctx An instance of ObserverContext
|
||||||
|
* @param entries list of WALEntries to replicate
|
||||||
|
* @param cells Cells that the WALEntries refer to (if cells is non-null)
|
||||||
|
* @throws IOException Signals that an I/O exception has occurred.
|
||||||
|
*/
|
||||||
|
void postReplicateLogEntries(final ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||||
|
List<WALEntry> entries, CellScanner cells) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1520,8 +1520,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
if (regionServer.replicationSinkHandler != null) {
|
if (regionServer.replicationSinkHandler != null) {
|
||||||
checkOpen();
|
checkOpen();
|
||||||
requestCount.increment();
|
requestCount.increment();
|
||||||
regionServer.replicationSinkHandler.replicateLogEntries(request.getEntryList(),
|
List<WALEntry> entries = request.getEntryList();
|
||||||
((PayloadCarryingRpcController)controller).cellScanner());
|
CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
|
||||||
|
regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
|
||||||
|
regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
|
||||||
|
regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
|
||||||
}
|
}
|
||||||
return ReplicateWALEntryResponse.newBuilder().build();
|
return ReplicateWALEntryResponse.newBuilder().build();
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.List;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.Coprocessor;
|
import org.apache.hadoop.hbase.Coprocessor;
|
||||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
|
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
|
import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
|
||||||
import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
|
import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||||
|
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
||||||
|
@ -158,6 +160,28 @@ public class RegionServerCoprocessorHost extends
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void preReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
|
||||||
|
throws IOException {
|
||||||
|
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
|
||||||
|
@Override
|
||||||
|
public void call(RegionServerObserver oserver,
|
||||||
|
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
|
||||||
|
oserver.preReplicateLogEntries(ctx, entries, cells);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void postReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
|
||||||
|
throws IOException {
|
||||||
|
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
|
||||||
|
@Override
|
||||||
|
public void call(RegionServerObserver oserver,
|
||||||
|
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
|
||||||
|
oserver.postReplicateLogEntries(ctx, entries, cells);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
|
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
|
return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
|
||||||
|
|
|
@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
|
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
|
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
|
||||||
|
@ -2421,4 +2422,15 @@ public class AccessController extends BaseMasterAndRegionObserver
|
||||||
ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
|
ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
|
||||||
return endpoint;
|
return endpoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||||
|
List<WALEntry> entries, CellScanner cells) throws IOException {
|
||||||
|
requirePermission("replicateLogEntries", Action.WRITE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||||
|
List<WALEntry> entries, CellScanner cells) throws IOException {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2639,4 +2639,21 @@ public class TestAccessController extends SecureTestUtil {
|
||||||
verifyAnyCreate(prepareBulkLoadAction);
|
verifyAnyCreate(prepareBulkLoadAction);
|
||||||
verifyAnyCreate(cleanupBulkLoadAction);
|
verifyAnyCreate(cleanupBulkLoadAction);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicateLogEntries() throws Exception {
|
||||||
|
AccessTestAction replicateLogEntriesAction = new AccessTestAction() {
|
||||||
|
@Override
|
||||||
|
public Object run() throws Exception {
|
||||||
|
ACCESS_CONTROLLER.preReplicateLogEntries(ObserverContext.createAndPrepare(RSCP_ENV, null),
|
||||||
|
null, null);
|
||||||
|
ACCESS_CONTROLLER.postReplicateLogEntries(ObserverContext.createAndPrepare(RSCP_ENV, null),
|
||||||
|
null, null);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
verifyAllowed(replicateLogEntriesAction, SUPERUSER, USER_ADMIN);
|
||||||
|
verifyDenied(replicateLogEntriesAction, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue