HBASE-21789 Rewrite MetaTableAccessor.multiMutate with Table.coprocessorService
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
2d4819dbed
commit
f359d260d3
|
@ -50,23 +50,22 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
|||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.RegionServerCallable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableState;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -79,6 +78,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
@ -1735,56 +1735,41 @@ public class MetaTableAccessor {
|
|||
private static void multiMutate(Connection connection, final Table table, byte[] row,
|
||||
final List<Mutation> mutations) throws IOException {
|
||||
debugLogMutations(mutations);
|
||||
// TODO: Need rollback!!!!
|
||||
// TODO: Need Retry!!!
|
||||
// TODO: What for a timeout? Default write timeout? GET FROM HTABLE?
|
||||
// TODO: Review when we come through with ProcedureV2.
|
||||
RegionServerCallable<MutateRowsResponse,
|
||||
MultiRowMutationProtos.MultiRowMutationService.BlockingInterface> callable =
|
||||
new RegionServerCallable<MutateRowsResponse,
|
||||
MultiRowMutationProtos.MultiRowMutationService.BlockingInterface>(
|
||||
connection, table.getName(), row, null/*RpcController not used in this CPEP!*/) {
|
||||
@Override
|
||||
protected MutateRowsResponse rpcCall() throws Exception {
|
||||
final MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
|
||||
for (Mutation mutation : mutations) {
|
||||
if (mutation instanceof Put) {
|
||||
builder.addMutationRequest(ProtobufUtil.toMutation(
|
||||
ClientProtos.MutationProto.MutationType.PUT, mutation));
|
||||
} else if (mutation instanceof Delete) {
|
||||
builder.addMutationRequest(ProtobufUtil.toMutation(
|
||||
ClientProtos.MutationProto.MutationType.DELETE, mutation));
|
||||
} else {
|
||||
throw new DoNotRetryIOException("multi in MetaEditor doesn't support "
|
||||
+ mutation.getClass().getName());
|
||||
}
|
||||
}
|
||||
// The call to #prepare that ran before this invocation will have populated HRegionLocation.
|
||||
HRegionLocation hrl = getLocation();
|
||||
RegionSpecifier region = ProtobufUtil.buildRegionSpecifier(
|
||||
RegionSpecifierType.REGION_NAME, hrl.getRegion().getRegionName());
|
||||
builder.setRegion(region);
|
||||
// The rpcController here is awkward. The Coprocessor Endpoint wants an instance of a
|
||||
// com.google.protobuf but we are going over an rpc that is all shaded protobuf so it
|
||||
// wants a org.apache.h.h.shaded.com.google.protobuf.RpcController. Set up a factory
|
||||
// that makes com.google.protobuf.RpcController and then copy into it configs.
|
||||
return getStub().mutateRows(null, builder.build());
|
||||
}
|
||||
Batch.Call<MultiRowMutationService, MutateRowsResponse> callable =
|
||||
new Batch.Call<MultiRowMutationService, MutateRowsResponse>() {
|
||||
|
||||
@Override
|
||||
// Called on the end of the super.prepare call. Set the stub.
|
||||
protected void setStubByServiceName(ServerName serviceName/*Ignored*/) throws IOException {
|
||||
CoprocessorRpcChannel channel = table.coprocessorService(getRow());
|
||||
setStub(MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel));
|
||||
}
|
||||
};
|
||||
int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||
connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
// The region location should be cached in connection. Call prepare so this callable picks
|
||||
// up the region location (see super.prepare method).
|
||||
callable.prepare(false);
|
||||
callable.call(writeTimeout);
|
||||
@Override
|
||||
public MutateRowsResponse call(MultiRowMutationService instance) throws IOException {
|
||||
MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
|
||||
for (Mutation mutation : mutations) {
|
||||
if (mutation instanceof Put) {
|
||||
builder.addMutationRequest(
|
||||
ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, mutation));
|
||||
} else if (mutation instanceof Delete) {
|
||||
builder.addMutationRequest(
|
||||
ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.DELETE, mutation));
|
||||
} else {
|
||||
throw new DoNotRetryIOException(
|
||||
"multi in MetaEditor doesn't support " + mutation.getClass().getName());
|
||||
}
|
||||
}
|
||||
ServerRpcController controller = new ServerRpcController();
|
||||
CoprocessorRpcUtils.BlockingRpcCallback<MutateRowsResponse> rpcCallback =
|
||||
new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
||||
instance.mutateRows(controller, builder.build(), rpcCallback);
|
||||
MutateRowsResponse resp = rpcCallback.get();
|
||||
if (controller.failedOnException()) {
|
||||
throw controller.getFailedOn();
|
||||
}
|
||||
return resp;
|
||||
}
|
||||
};
|
||||
try {
|
||||
table.coprocessorService(MultiRowMutationService.class, row, row, callable);
|
||||
} catch (Throwable e) {
|
||||
Throwables.propagateIfPossible(e, IOException.class);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -172,7 +172,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
|||
* avoiding port contention if another local HBase instance is already running).
|
||||
* <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
|
||||
* setting it to true.
|
||||
* For triggering pre commit
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@SuppressWarnings("deprecation")
|
||||
|
|
Loading…
Reference in New Issue