HBASE-16607 Make NoncedRegionServerCallable extend CancellableRegionServerCallable

This commit is contained in:
chenheng 2016-09-12 11:03:29 +08:00
parent 2c3b0f2c0b
commit c19d2cabbd
2 changed files with 31 additions and 95 deletions

View File

@ -630,17 +630,17 @@ public class HTable implements Table {
public Result append(final Append append) throws IOException {
checkHasFamilies(append);
NoncedRegionServerCallable<Result> callable =
new NoncedRegionServerCallable<Result>(this.connection,
this.rpcControllerFactory, getName(), append.getRow()) {
@Override
protected Result call(HBaseRpcController controller) throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
MutateResponse response = getStub().mutate(controller, request);
if (!response.hasResult()) return null;
return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
}
};
new NoncedRegionServerCallable<Result>(this.connection, this.rpcControllerFactory,
getName(), append.getRow()) {
@Override
protected Result rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
MutateResponse response = getStub().mutate(getRpcController(), request);
if (!response.hasResult()) return null;
return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
}
};
return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeout).
callWithRetries(callable, this.operationTimeout);
}
@ -652,16 +652,16 @@ public class HTable implements Table {
public Result increment(final Increment increment) throws IOException {
checkHasFamilies(increment);
NoncedRegionServerCallable<Result> callable =
new NoncedRegionServerCallable<Result>(this.connection,
this.rpcControllerFactory, getName(), increment.getRow()) {
@Override
protected Result call(HBaseRpcController controller) throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
MutateResponse response = getStub().mutate(controller, request);
// Should this check for null like append does?
return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
}
new NoncedRegionServerCallable<Result>(this.connection,
this.rpcControllerFactory, getName(), increment.getRow()) {
@Override
protected Result rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
MutateResponse response = getStub().mutate(getRpcController(), request);
// Should this check for null like append does?
return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
}
};
return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
@ -701,12 +701,12 @@ public class HTable implements Table {
new NoncedRegionServerCallable<Long>(this.connection, this.rpcControllerFactory, getName(),
row) {
@Override
protected Long call(HBaseRpcController controller) throws Exception {
protected Long rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildIncrementRequest(
getLocation().getRegionInfo().getRegionName(), row, family,
qualifier, amount, durability, getNonceGroup(), getNonce());
MutateResponse response = getStub().mutate(controller, request);
Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
getLocation().getRegionInfo().getRegionName(), row, family,
qualifier, amount, durability, getNonceGroup(), getNonce());
MutateResponse response = getStub().mutate(getRpcController(), request);
Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
}
};

View File

@ -18,15 +18,9 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
/**
* Implementations make an rpc call against a RegionService via a protobuf Service.
@ -44,9 +38,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
* @param <T> the class that the ServerCallable handles
*/
@InterfaceAudience.Private
public abstract class NoncedRegionServerCallable<T> extends AbstractRegionServerCallable<T> {
private ClientService.BlockingInterface stub;
private final HBaseRpcController rpcController;
public abstract class NoncedRegionServerCallable<T> extends CancellableRegionServerCallable<T> {
private final long nonce;
/**
@ -54,69 +46,13 @@ public abstract class NoncedRegionServerCallable<T> extends AbstractRegionServer
* @param tableName Table name to which <code>row</code> belongs.
* @param row The row we want in <code>tableName</code>.
*/
public NoncedRegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory,
TableName tableName, byte [] row) {
this(connection, rpcControllerFactory.newController(), tableName, row);
}
public NoncedRegionServerCallable(Connection connection, HBaseRpcController rpcController,
TableName tableName, byte [] row) {
super(connection, tableName, row);
this.rpcController = rpcController;
public NoncedRegionServerCallable(Connection connection,
RpcControllerFactory rpcControllerFactory,
TableName tableName, byte [] row) {
super(connection, tableName, row, rpcControllerFactory);
this.nonce = getConnection().getNonceGenerator().newNonce();
}
void setClientByServiceName(ServerName service) throws IOException {
this.setStub(getConnection().getClient(service));
}
/**
* @return Client Rpc protobuf communication stub
*/
protected ClientService.BlockingInterface getStub() {
return this.stub;
}
/**
* Set the client protobuf communication stub
* @param stub to set
*/
void setStub(final ClientService.BlockingInterface stub) {
this.stub = stub;
}
/**
* Override that changes Exception from {@link Exception} to {@link IOException}. It also does
* setup of an rpcController and calls through to the unimplemented
* call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
*/
@Override
public T call(int callTimeout) throws IOException {
if (this.rpcController != null) {
this.rpcController.reset();
this.rpcController.setPriority(tableName);
this.rpcController.setCallTimeout(callTimeout);
}
try {
return call(this.rpcController);
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
}
}
/**
* Run RPC call.
* @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a
* facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this
* class.
* @throws Exception
*/
protected abstract T call(HBaseRpcController rpcController) throws Exception;
public HBaseRpcController getRpcController() {
return this.rpcController;
}
long getNonceGroup() {
return getConnection().getNonceGenerator().getNonceGroup();
}