HBASE-18859 Purge PB from BulkLoadObserver
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
b6863d867d
commit
2fee188156
|
@ -80,6 +80,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
||||||
try {
|
try {
|
||||||
SecureBulkLoadManager secureBulkLoadManager =
|
SecureBulkLoadManager secureBulkLoadManager =
|
||||||
this.env.getRegionServerServices().getSecureBulkLoadManager();
|
this.env.getRegionServerServices().getSecureBulkLoadManager();
|
||||||
|
|
||||||
String bulkToken = secureBulkLoadManager.prepareBulkLoad(this.env.getRegion(),
|
String bulkToken = secureBulkLoadManager.prepareBulkLoad(this.env.getRegion(),
|
||||||
convert(request));
|
convert(request));
|
||||||
done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
|
done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
|
||||||
|
@ -115,6 +116,10 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
||||||
done.run(null);
|
done.run(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert from CPEP protobuf 2.5 to internal protobuf 3.3.
|
||||||
|
* @throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException
|
||||||
|
*/
|
||||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest
|
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest
|
||||||
convert(CleanupBulkLoadRequest request)
|
convert(CleanupBulkLoadRequest request)
|
||||||
throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException {
|
throws org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException {
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.Coprocessor;
|
import org.apache.hadoop.hbase.Coprocessor;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
|
||||||
|
@ -52,16 +53,22 @@ public interface BulkLoadObserver extends Coprocessor {
|
||||||
/**
|
/**
|
||||||
* Called as part of SecureBulkLoadEndpoint.prepareBulkLoad() RPC call.
|
* Called as part of SecureBulkLoadEndpoint.prepareBulkLoad() RPC call.
|
||||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||||
|
* If you need to get the region or table name, get it from the
|
||||||
|
* <code>ctx</code> as follows: <code>code>ctx.getEnvironment().getRegion()</code>. Use
|
||||||
|
* getRegionInfo to fetch the encodedName and use getTabldDescriptor() to get the tableName.
|
||||||
* @param ctx the environment to interact with the framework and master
|
* @param ctx the environment to interact with the framework and master
|
||||||
*/
|
*/
|
||||||
default void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
|
default void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
|
||||||
PrepareBulkLoadRequest request) throws IOException {}
|
throws IOException {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called as part of SecureBulkLoadEndpoint.cleanupBulkLoad() RPC call.
|
* Called as part of SecureBulkLoadEndpoint.cleanupBulkLoad() RPC call.
|
||||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||||
|
* If you need to get the region or table name, get it from the
|
||||||
|
* <code>ctx</code> as follows: <code>code>ctx.getEnvironment().getRegion()</code>. Use
|
||||||
|
* getRegionInfo to fetch the encodedName and use getTabldDescriptor() to get the tableName.
|
||||||
* @param ctx the environment to interact with the framework and master
|
* @param ctx the environment to interact with the framework and master
|
||||||
*/
|
*/
|
||||||
default void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
|
default void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
|
||||||
CleanupBulkLoadRequest request) throws IOException {}
|
throws IOException {}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defines the procedure to atomically perform multiple scans and mutations
|
* Defines the procedures to atomically perform multiple scans and mutations
|
||||||
* on a HRegion.
|
* on a HRegion.
|
||||||
*
|
*
|
||||||
* This is invoked by {@link Region#processRowsWithLocks(RowProcessor)}.
|
* This is invoked by {@link Region#processRowsWithLocks(RowProcessor)}.
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
|
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
|
||||||
|
@ -145,7 +146,7 @@ public class SecureBulkLoadManager {
|
||||||
.findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
|
.findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
|
||||||
|
|
||||||
for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
|
for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
|
||||||
bulkLoadObserver.prePrepareBulkLoad(ctx, request);
|
bulkLoadObserver.prePrepareBulkLoad(ctx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,7 +167,7 @@ public class SecureBulkLoadManager {
|
||||||
.findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
|
.findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
|
||||||
|
|
||||||
for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
|
for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
|
||||||
bulkLoadObserver.preCleanupBulkLoad(ctx, request);
|
bulkLoadObserver.preCleanupBulkLoad(ctx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2155,12 +2155,11 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS
|
||||||
* Authorization check for
|
* Authorization check for
|
||||||
* SecureBulkLoadProtocol.prepareBulkLoad()
|
* SecureBulkLoadProtocol.prepareBulkLoad()
|
||||||
* @param ctx the context
|
* @param ctx the context
|
||||||
* @param request the request
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
|
public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
|
||||||
PrepareBulkLoadRequest request) throws IOException {
|
throws IOException {
|
||||||
requireAccess(getActiveUser(ctx), "prePrepareBulkLoad",
|
requireAccess(getActiveUser(ctx), "prePrepareBulkLoad",
|
||||||
ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.CREATE);
|
ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.CREATE);
|
||||||
}
|
}
|
||||||
|
@ -2169,12 +2168,11 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS
|
||||||
* Authorization security check for
|
* Authorization security check for
|
||||||
* SecureBulkLoadProtocol.cleanupBulkLoad()
|
* SecureBulkLoadProtocol.cleanupBulkLoad()
|
||||||
* @param ctx the context
|
* @param ctx the context
|
||||||
* @param request the request
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
|
public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
|
||||||
CleanupBulkLoadRequest request) throws IOException {
|
throws IOException {
|
||||||
requireAccess(getActiveUser(ctx), "preCleanupBulkLoad",
|
requireAccess(getActiveUser(ctx), "preCleanupBulkLoad",
|
||||||
ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.CREATE);
|
ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.CREATE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2878,16 +2878,14 @@ public class TestAccessController extends SecureTestUtil {
|
||||||
AccessTestAction prepareBulkLoadAction = new AccessTestAction() {
|
AccessTestAction prepareBulkLoadAction = new AccessTestAction() {
|
||||||
@Override
|
@Override
|
||||||
public Object run() throws Exception {
|
public Object run() throws Exception {
|
||||||
ACCESS_CONTROLLER.prePrepareBulkLoad(ObserverContext.createAndPrepare(RCP_ENV, null),
|
ACCESS_CONTROLLER.prePrepareBulkLoad(ObserverContext.createAndPrepare(RCP_ENV, null));
|
||||||
null);
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
AccessTestAction cleanupBulkLoadAction = new AccessTestAction() {
|
AccessTestAction cleanupBulkLoadAction = new AccessTestAction() {
|
||||||
@Override
|
@Override
|
||||||
public Object run() throws Exception {
|
public Object run() throws Exception {
|
||||||
ACCESS_CONTROLLER.preCleanupBulkLoad(ObserverContext.createAndPrepare(RCP_ENV, null),
|
ACCESS_CONTROLLER.preCleanupBulkLoad(ObserverContext.createAndPrepare(RCP_ENV, null));
|
||||||
null);
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue