HBASE-21789 Rewrite MetaTableAccessor.multiMutate with Table.coprocessorService

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
zhangduo 2019-01-26 23:22:13 +08:00
parent 1995f61d7f
commit 274e4ccea8
2 changed files with 39 additions and 55 deletions

View File

@ -50,23 +50,22 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -79,6 +78,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
/** /**
* <p> * <p>
@ -1735,56 +1735,41 @@ public class MetaTableAccessor {
private static void multiMutate(Connection connection, final Table table, byte[] row, private static void multiMutate(Connection connection, final Table table, byte[] row,
final List<Mutation> mutations) throws IOException { final List<Mutation> mutations) throws IOException {
debugLogMutations(mutations); debugLogMutations(mutations);
// TODO: Need rollback!!!! Batch.Call<MultiRowMutationService, MutateRowsResponse> callable =
// TODO: Need Retry!!! new Batch.Call<MultiRowMutationService, MutateRowsResponse>() {
// TODO: What for a timeout? Default write timeout? GET FROM HTABLE?
// TODO: Review when we come through with ProcedureV2.
RegionServerCallable<MutateRowsResponse,
MultiRowMutationProtos.MultiRowMutationService.BlockingInterface> callable =
new RegionServerCallable<MutateRowsResponse,
MultiRowMutationProtos.MultiRowMutationService.BlockingInterface>(
connection, table.getName(), row, null/*RpcController not used in this CPEP!*/) {
@Override
protected MutateRowsResponse rpcCall() throws Exception {
final MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
for (Mutation mutation : mutations) {
if (mutation instanceof Put) {
builder.addMutationRequest(ProtobufUtil.toMutation(
ClientProtos.MutationProto.MutationType.PUT, mutation));
} else if (mutation instanceof Delete) {
builder.addMutationRequest(ProtobufUtil.toMutation(
ClientProtos.MutationProto.MutationType.DELETE, mutation));
} else {
throw new DoNotRetryIOException("multi in MetaEditor doesn't support "
+ mutation.getClass().getName());
}
}
// The call to #prepare that ran before this invocation will have populated HRegionLocation.
HRegionLocation hrl = getLocation();
RegionSpecifier region = ProtobufUtil.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, hrl.getRegion().getRegionName());
builder.setRegion(region);
// The rpcController here is awkward. The Coprocessor Endpoint wants an instance of a
// com.google.protobuf but we are going over an rpc that is all shaded protobuf so it
// wants a org.apache.h.h.shaded.com.google.protobuf.RpcController. Set up a factory
// that makes com.google.protobuf.RpcController and then copy into it configs.
return getStub().mutateRows(null, builder.build());
}
@Override @Override
// Called on the end of the super.prepare call. Set the stub. public MutateRowsResponse call(MultiRowMutationService instance) throws IOException {
protected void setStubByServiceName(ServerName serviceName/*Ignored*/) throws IOException { MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
CoprocessorRpcChannel channel = table.coprocessorService(getRow()); for (Mutation mutation : mutations) {
setStub(MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel)); if (mutation instanceof Put) {
} builder.addMutationRequest(
}; ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, mutation));
int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, } else if (mutation instanceof Delete) {
connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, builder.addMutationRequest(
HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.DELETE, mutation));
// The region location should be cached in connection. Call prepare so this callable picks } else {
// up the region location (see super.prepare method). throw new DoNotRetryIOException(
callable.prepare(false); "multi in MetaEditor doesn't support " + mutation.getClass().getName());
callable.call(writeTimeout); }
}
ServerRpcController controller = new ServerRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<MutateRowsResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
instance.mutateRows(controller, builder.build(), rpcCallback);
MutateRowsResponse resp = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return resp;
}
};
try {
table.coprocessorService(MultiRowMutationService.class, row, row, callable);
} catch (Throwable e) {
Throwables.propagateIfPossible(e, IOException.class);
throw new IOException(e);
}
} }
/** /**

View File

@ -172,7 +172,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
* avoiding port contention if another local HBase instance is already running). * avoiding port contention if another local HBase instance is already running).
* <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" * <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
* setting it to true. * setting it to true.
* For triggering pre commit
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")