From 2916d4f3568184f92006dba9a1e4ef18492643ea Mon Sep 17 00:00:00 2001 From: stack Date: Fri, 24 Oct 2014 08:47:06 -0700 Subject: [PATCH] HBASE-12277 Refactor bulkLoad methods in AccessController to its own interface (Madhan Neethiraj) --- .../hbase/coprocessor/BulkLoadObserver.java | 54 +++++++++++++++++++ .../hbase/coprocessor/CoprocessorHost.java | 20 +++++++ .../security/access/AccessController.java | 25 ++++++--- .../access/SecureBulkLoadEndpoint.java | 36 ++++++++++--- .../hbase/coprocessor/TestClassLoading.java | 13 +++++ 5 files changed, 134 insertions(+), 14 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java new file mode 100644 index 00000000000..c7f0b901a69 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest; + +/** + * Coprocessors implement this interface to observe and mediate bulk load operations. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public interface BulkLoadObserver extends Coprocessor { + /** + * Called as part of SecureBulkLoadEndpoint.prepareBulkLoad() RPC call. + * It can't bypass the default action, e.g., ctx.bypass() won't have effect. + * @param ctx the environment to interact with the framework and master + * @throws IOException + */ + void prePrepareBulkLoad(ObserverContext ctx, + PrepareBulkLoadRequest request) throws IOException; + + /** + * Called as part of SecureBulkLoadEndpoint.cleanupBulkLoad() RPC call. + * It can't bypass the default action, e.g., ctx.bypass() won't have effect. + * @param ctx the environment to interact with the framework and master + * @throws IOException + */ + void preCleanupBulkLoad(ObserverContext ctx, + CleanupBulkLoadRequest request) throws IOException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index c9bd0b60a36..17fcabc58b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -280,6 +280,26 @@ public abstract class CoprocessorHost { return null; } + /** + * Find list of coprocessors that extend/implement the given class/interface + * @param cls the class/interface to look for + * @return the list of coprocessors, or null if not found + */ + public List findCoprocessors(Class cls) { + ArrayList ret = new ArrayList(); + + for (E env: coprocessors) { + Coprocessor cp = env.getInstance(); + + if(cp != null) { + if (cls.isAssignableFrom(cp.getClass())) { + ret.add((T)cp); + } + } + } + return ret; + } + /** * Find a coprocessor environment by class name * @param className the class name diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 400e20a4783..4023e07a13b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.client.Query; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver; +import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.EndpointObserver; @@ -84,6 +85,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; @@ -150,7 +153,7 @@ import com.google.protobuf.Service; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class AccessController extends BaseMasterAndRegionObserver implements RegionServerObserver, - AccessControlService.Interface, CoprocessorService, EndpointObserver { + AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver { public static final Log LOG = LogFactory.getLog(AccessController.class); @@ -1891,11 +1894,15 @@ public class AccessController extends BaseMasterAndRegionObserver /** * Authorization check for * SecureBulkLoadProtocol.prepareBulkLoad() - * @param e + * @param ctx the context + * @param request the request * @throws IOException */ - //TODO this should end up as a coprocessor hook - public void prePrepareBulkLoad(RegionCoprocessorEnvironment e) throws IOException { + @Override + public void prePrepareBulkLoad(ObserverContext ctx, + PrepareBulkLoadRequest request) throws IOException { + RegionCoprocessorEnvironment e = ctx.getEnvironment(); + AuthResult authResult = hasSomeAccess(e, "prePrepareBulkLoad", Action.WRITE); logResult(authResult); if (!authResult.isAllowed()) { @@ -1907,11 +1914,15 @@ public class AccessController extends BaseMasterAndRegionObserver /** * Authorization security check for * SecureBulkLoadProtocol.cleanupBulkLoad() - * @param e + * @param ctx the context + * @param request the request * @throws IOException */ - //TODO this should end up as a coprocessor hook - public void preCleanupBulkLoad(RegionCoprocessorEnvironment e) throws IOException { + @Override + public void preCleanupBulkLoad(ObserverContext ctx, + CleanupBulkLoadRequest request) throws IOException { + RegionCoprocessorEnvironment e = ctx.getEnvironment(); + AuthResult authResult = hasSomeAccess(e, "preCleanupBulkLoad", Action.WRITE); logResult(authResult); if (!authResult.isAllowed()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java index 930d9b33ae1..764a12c09f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java @@ -34,7 +34,9 @@ import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.ipc.RequestContext; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -157,9 +159,18 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService PrepareBulkLoadRequest request, RpcCallback done){ try { - if(userProvider.isHBaseSecurityEnabled()) { - getAccessController().prePrepareBulkLoad(env); + List bulkLoadObservers = getBulkLoadObservers(); + + if(bulkLoadObservers != null) { + ObserverContext ctx = + new ObserverContext(); + ctx.prepare(env); + + for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) { + bulkLoadObserver.prePrepareBulkLoad(ctx, request); + } } + String bulkToken = createStagingDir(baseStagingDir, getActiveUser(), ProtobufUtil.toTableName(request.getTableName())).toString(); done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build()); @@ -174,9 +185,18 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService CleanupBulkLoadRequest request, RpcCallback done) { try { - if (userProvider.isHBaseSecurityEnabled()) { - getAccessController().preCleanupBulkLoad(env); + List bulkLoadObservers = getBulkLoadObservers(); + + if(bulkLoadObservers != null) { + ObserverContext ctx = + new ObserverContext(); + ctx.prepare(env); + + for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) { + bulkLoadObserver.preCleanupBulkLoad(ctx, request); + } } + fs.delete(createStagingDir(baseStagingDir, getActiveUser(), new Path(request.getBulkToken()).getName()), @@ -292,9 +312,11 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build()); } - private AccessController getAccessController() { - return (AccessController) this.env.getRegion() - .getCoprocessorHost().findCoprocessor(AccessController.class.getName()); + private List getBulkLoadObservers() { + List coprocessorList = + this.env.getRegion().getCoprocessorHost().findCoprocessors(BulkLoadObserver.class); + + return coprocessorList; } private Path createStagingDir(Path baseDir, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java index 2fe121df64d..140c3b9d6c2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java @@ -531,6 +531,19 @@ public class TestClassLoading { assertEquals(loadedMasterCoprocessorsVerify, loadedMasterCoprocessors); } + @Test + public void testFindCoprocessors() { + // HBASE 12277: + CoprocessorHost masterCpHost = + TEST_UTIL.getHBaseCluster().getMaster().getMasterCoprocessorHost(); + + List masterObservers = masterCpHost.findCoprocessors(MasterObserver.class); + + assertTrue(masterObservers != null && masterObservers.size() > 0); + assertEquals(masterCoprocessor.getSimpleName(), + masterObservers.get(0).getClass().getSimpleName()); + } + private void waitForTable(TableName name) throws InterruptedException, IOException { // First wait until all regions are online TEST_UTIL.waitTableEnabled(name);