HBASE-18859 Purge PB from BulkLoadObserver

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Michael Stack 2017-09-21 16:28:33 -07:00
parent 2ceeb54c92
commit 410368b84e
6 changed files with 26 additions and 17 deletions

View File

@ -80,6 +80,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
try { try {
SecureBulkLoadManager secureBulkLoadManager = SecureBulkLoadManager secureBulkLoadManager =
this.env.getRegionServerServices().getSecureBulkLoadManager(); this.env.getRegionServerServices().getSecureBulkLoadManager();
String bulkToken = secureBulkLoadManager.prepareBulkLoad(this.env.getRegion(), String bulkToken = secureBulkLoadManager.prepareBulkLoad(this.env.getRegion(),
convert(request)); convert(request));
done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build()); done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
@ -115,6 +116,10 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
done.run(null); done.run(null);
} }
/**
* Convert from CPEP protobuf 2.5 to internal protobuf 3.3.
* @throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException
*/
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest
convert(CleanupBulkLoadRequest request) convert(CleanupBulkLoadRequest request)
throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException { throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException {

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
@ -52,16 +53,22 @@ public interface BulkLoadObserver extends Coprocessor {
/** /**
* Called as part of SecureBulkLoadEndpoint.prepareBulkLoad() RPC call. * Called as part of SecureBulkLoadEndpoint.prepareBulkLoad() RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect. * It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* If you need to get the region or table name, get it from the
* <code>ctx</code> as follows: <code>code>ctx.getEnvironment().getRegion()</code>. Use
* getRegionInfo to fetch the encodedName and use getTabldDescriptor() to get the tableName.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
*/ */
default void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx, default void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
PrepareBulkLoadRequest request) throws IOException {} throws IOException {}
/** /**
* Called as part of SecureBulkLoadEndpoint.cleanupBulkLoad() RPC call. * Called as part of SecureBulkLoadEndpoint.cleanupBulkLoad() RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect. * It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* If you need to get the region or table name, get it from the
* <code>ctx</code> as follows: <code>code>ctx.getEnvironment().getRegion()</code>. Use
* getRegionInfo to fetch the encodedName and use getTabldDescriptor() to get the tableName.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
*/ */
default void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx, default void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
CleanupBulkLoadRequest request) throws IOException {} throws IOException {}
} }

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import com.google.protobuf.Message; import com.google.protobuf.Message;
/** /**
* Defines the procedure to atomically perform multiple scans and mutations * Defines the procedures to atomically perform multiple scans and mutations
* on a HRegion. * on a HRegion.
* *
* This is invoked by {@link Region#processRowsWithLocks(RowProcessor)}. * This is invoked by {@link Region#processRowsWithLocks(RowProcessor)}.

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
@ -145,7 +146,7 @@ public class SecureBulkLoadManager {
.findCoprocessorEnvironment(BulkLoadObserver.class).get(0)); .findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) { for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
bulkLoadObserver.prePrepareBulkLoad(ctx, request); bulkLoadObserver.prePrepareBulkLoad(ctx);
} }
} }
@ -166,7 +167,7 @@ public class SecureBulkLoadManager {
.findCoprocessorEnvironment(BulkLoadObserver.class).get(0)); .findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) { for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
bulkLoadObserver.preCleanupBulkLoad(ctx, request); bulkLoadObserver.preCleanupBulkLoad(ctx);
} }
} }

View File

@ -2155,12 +2155,11 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS
* Authorization check for * Authorization check for
* SecureBulkLoadProtocol.prepareBulkLoad() * SecureBulkLoadProtocol.prepareBulkLoad()
* @param ctx the context * @param ctx the context
* @param request the request
* @throws IOException * @throws IOException
*/ */
@Override @Override
public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx, public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
PrepareBulkLoadRequest request) throws IOException { throws IOException {
requireAccess(getActiveUser(ctx), "prePrepareBulkLoad", requireAccess(getActiveUser(ctx), "prePrepareBulkLoad",
ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.CREATE); ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.CREATE);
} }
@ -2169,12 +2168,11 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS
* Authorization security check for * Authorization security check for
* SecureBulkLoadProtocol.cleanupBulkLoad() * SecureBulkLoadProtocol.cleanupBulkLoad()
* @param ctx the context * @param ctx the context
* @param request the request
* @throws IOException * @throws IOException
*/ */
@Override @Override
public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx, public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
CleanupBulkLoadRequest request) throws IOException { throws IOException {
requireAccess(getActiveUser(ctx), "preCleanupBulkLoad", requireAccess(getActiveUser(ctx), "preCleanupBulkLoad",
ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.CREATE); ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.CREATE);
} }

View File

@ -2878,16 +2878,14 @@ public class TestAccessController extends SecureTestUtil {
AccessTestAction prepareBulkLoadAction = new AccessTestAction() { AccessTestAction prepareBulkLoadAction = new AccessTestAction() {
@Override @Override
public Object run() throws Exception { public Object run() throws Exception {
ACCESS_CONTROLLER.prePrepareBulkLoad(ObserverContext.createAndPrepare(RCP_ENV, null), ACCESS_CONTROLLER.prePrepareBulkLoad(ObserverContext.createAndPrepare(RCP_ENV, null));
null);
return null; return null;
} }
}; };
AccessTestAction cleanupBulkLoadAction = new AccessTestAction() { AccessTestAction cleanupBulkLoadAction = new AccessTestAction() {
@Override @Override
public Object run() throws Exception { public Object run() throws Exception {
ACCESS_CONTROLLER.preCleanupBulkLoad(ObserverContext.createAndPrepare(RCP_ENV, null), ACCESS_CONTROLLER.preCleanupBulkLoad(ObserverContext.createAndPrepare(RCP_ENV, null));
null);
return null; return null;
} }
}; };