HBASE-16516 Revisit the implementation of PayloadCarryingRpcController

This commit is contained in:
zhangduo 2016-09-01 22:33:18 +08:00
parent 592245ff13
commit 91227f8f98
38 changed files with 633 additions and 420 deletions

View File

@ -25,7 +25,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
@ -65,7 +65,7 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
} }
@Override @Override
protected FlushRegionResponse call(PayloadCarryingRpcController controller) throws Exception { protected FlushRegionResponse call(HBaseRpcController controller) throws Exception {
// Check whether we should still do the flush to this region. If the regions are changed due // Check whether we should still do the flush to this region. If the regions are changed due
// to splits or merges, etc return success // to splits or merges, etc return success
if (!Bytes.equals(location.getRegionInfo().getRegionName(), regionName)) { if (!Bytes.equals(location.getRegionInfo().getRegionName(), regionName)) {

View File

@ -69,7 +69,7 @@ import org.apache.hadoop.hbase.client.security.SecurityCapability;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -1036,7 +1036,7 @@ public class HBaseAdmin implements Admin {
CloseRegionRequest request = CloseRegionRequest request =
RequestConverter.buildCloseRegionRequest(sn, encodedRegionName); RequestConverter.buildCloseRegionRequest(sn, encodedRegionName);
// TODO: There is no timeout on this controller. Set one! // TODO: There is no timeout on this controller. Set one!
PayloadCarryingRpcController controller = this.rpcControllerFactory.newController(); HBaseRpcController controller = this.rpcControllerFactory.newController();
try { try {
CloseRegionResponse response = admin.closeRegion(controller, request); CloseRegionResponse response = admin.closeRegion(controller, request);
boolean closed = response.getClosed(); boolean closed = response.getClosed();
@ -1053,7 +1053,7 @@ public class HBaseAdmin implements Admin {
public void closeRegion(final ServerName sn, final HRegionInfo hri) throws IOException { public void closeRegion(final ServerName sn, final HRegionInfo hri) throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(sn); AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
// TODO: There is no timeout on this controller. Set one! // TODO: There is no timeout on this controller. Set one!
PayloadCarryingRpcController controller = rpcControllerFactory.newController(); HBaseRpcController controller = rpcControllerFactory.newController();
// Close the region without updating zk state. // Close the region without updating zk state.
ProtobufUtil.closeRegion(controller, admin, sn, hri.getRegionName()); ProtobufUtil.closeRegion(controller, admin, sn, hri.getRegionName());
@ -1063,7 +1063,7 @@ public class HBaseAdmin implements Admin {
public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException { public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(sn); AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
// TODO: There is no timeout on this controller. Set one! // TODO: There is no timeout on this controller. Set one!
PayloadCarryingRpcController controller = rpcControllerFactory.newController(); HBaseRpcController controller = rpcControllerFactory.newController();
return ProtobufUtil.getOnlineRegions(controller, admin); return ProtobufUtil.getOnlineRegions(controller, admin);
} }
@ -1094,7 +1094,7 @@ public class HBaseAdmin implements Admin {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
// TODO: There is no timeout on this controller. Set one! // TODO: There is no timeout on this controller. Set one!
PayloadCarryingRpcController controller = rpcControllerFactory.newController(); HBaseRpcController controller = rpcControllerFactory.newController();
FlushRegionRequest request = FlushRegionRequest request =
RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName()); RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
admin.flushRegion(controller, request); admin.flushRegion(controller, request);
@ -1257,7 +1257,7 @@ public class HBaseAdmin implements Admin {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
// TODO: There is no timeout on this controller. Set one! // TODO: There is no timeout on this controller. Set one!
PayloadCarryingRpcController controller = rpcControllerFactory.newController(); HBaseRpcController controller = rpcControllerFactory.newController();
CompactRegionRequest request = CompactRegionRequest request =
RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family); RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
admin.compactRegion(controller, request); admin.compactRegion(controller, request);
@ -1649,7 +1649,7 @@ public class HBaseAdmin implements Admin {
throw new IOException("should not give a splitkey which equals to startkey!"); throw new IOException("should not give a splitkey which equals to startkey!");
} }
// TODO: There is no timeout on this controller. Set one! // TODO: There is no timeout on this controller. Set one!
PayloadCarryingRpcController controller = rpcControllerFactory.newController(); HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(hri.getTable()); controller.setPriority(hri.getTable());
// TODO: this does not do retries, it should. Set priority and timeout in controller // TODO: this does not do retries, it should. Set priority and timeout in controller
@ -1837,7 +1837,7 @@ public class HBaseAdmin implements Admin {
final AdminService.BlockingInterface admin = final AdminService.BlockingInterface admin =
this.connection.getAdmin(ServerName.valueOf(hostname, port, 0)); this.connection.getAdmin(ServerName.valueOf(hostname, port, 0));
// TODO: There is no timeout on this controller. Set one! // TODO: There is no timeout on this controller. Set one!
PayloadCarryingRpcController controller = rpcControllerFactory.newController(); HBaseRpcController controller = rpcControllerFactory.newController();
controller.setPriority(HConstants.HIGH_QOS); controller.setPriority(HConstants.HIGH_QOS);
StopServerRequest request = RequestConverter.buildStopServerRequest( StopServerRequest request = RequestConverter.buildStopServerRequest(
"Called by admin client " + this.connection.toString()); "Called by admin client " + this.connection.toString());
@ -2191,7 +2191,7 @@ public class HBaseAdmin implements Admin {
final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest(); RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
// TODO: There is no timeout on this controller. Set one! // TODO: There is no timeout on this controller. Set one!
PayloadCarryingRpcController controller = rpcControllerFactory.newController(); HBaseRpcController controller = rpcControllerFactory.newController();
try { try {
return admin.rollWALWriter(controller, request); return admin.rollWALWriter(controller, request);
} catch (ServiceException e) { } catch (ServiceException e) {
@ -2272,7 +2272,7 @@ public class HBaseAdmin implements Admin {
ServerName sn = regionServerPair.getSecond(); ServerName sn = regionServerPair.getSecond();
final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
// TODO: There is no timeout on this controller. Set one! // TODO: There is no timeout on this controller. Set one!
PayloadCarryingRpcController controller = rpcControllerFactory.newController(); HBaseRpcController controller = rpcControllerFactory.newController();
GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
regionServerPair.getFirst().getRegionName(), true); regionServerPair.getFirst().getRegionName(), true);
GetRegionInfoResponse response; GetRegionInfoResponse response;
@ -3034,7 +3034,7 @@ public class HBaseAdmin implements Admin {
AdminProtos.GetRegionInfoResponse.CompactionState.NONE; AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
checkTableExists(tableName); checkTableExists(tableName);
// TODO: There is no timeout on this controller. Set one! // TODO: There is no timeout on this controller. Set one!
final PayloadCarryingRpcController rpcController = rpcControllerFactory.newController(); final HBaseRpcController rpcController = rpcControllerFactory.newController();
switch (compactType) { switch (compactType) {
case MOB: case MOB:
final AdminProtos.AdminService.BlockingInterface masterAdmin = final AdminProtos.AdminService.BlockingInterface masterAdmin =

View File

@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -631,7 +631,7 @@ public class HTable implements Table {
new NoncedRegionServerCallable<Result>(this.connection, new NoncedRegionServerCallable<Result>(this.connection,
this.rpcControllerFactory, getName(), append.getRow()) { this.rpcControllerFactory, getName(), append.getRow()) {
@Override @Override
protected Result call(PayloadCarryingRpcController controller) throws Exception { protected Result call(HBaseRpcController controller) throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest( MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce()); getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
MutateResponse response = getStub().mutate(controller, request); MutateResponse response = getStub().mutate(controller, request);
@ -653,7 +653,7 @@ public class HTable implements Table {
new NoncedRegionServerCallable<Result>(this.connection, new NoncedRegionServerCallable<Result>(this.connection,
this.rpcControllerFactory, getName(), increment.getRow()) { this.rpcControllerFactory, getName(), increment.getRow()) {
@Override @Override
protected Result call(PayloadCarryingRpcController controller) throws Exception { protected Result call(HBaseRpcController controller) throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest( MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce()); getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
MutateResponse response = getStub().mutate(controller, request); MutateResponse response = getStub().mutate(controller, request);
@ -699,7 +699,7 @@ public class HTable implements Table {
new NoncedRegionServerCallable<Long>(this.connection, this.rpcControllerFactory, getName(), new NoncedRegionServerCallable<Long>(this.connection, this.rpcControllerFactory, getName(),
row) { row) {
@Override @Override
protected Long call(PayloadCarryingRpcController controller) throws Exception { protected Long call(HBaseRpcController controller) throws Exception {
MutateRequest request = RequestConverter.buildIncrementRequest( MutateRequest request = RequestConverter.buildIncrementRequest(
getLocation().getRegionInfo().getRegionName(), row, family, getLocation().getRegionInfo().getRegionName(), row, family,
qualifier, amount, durability, getNonceGroup(), getNonce()); qualifier, amount, durability, getNonceGroup(), getNonce());

View File

@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.util.Bytes;
abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable { abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
protected final ClusterConnection connection; protected final ClusterConnection connection;
protected MasterKeepAliveConnection master; protected MasterKeepAliveConnection master;
private final PayloadCarryingRpcController rpcController; private final HBaseRpcController rpcController;
MasterCallable(final Connection connection, final RpcControllerFactory rpcConnectionFactory) { MasterCallable(final Connection connection, final RpcControllerFactory rpcConnectionFactory) {
this.connection = (ClusterConnection) connection; this.connection = (ClusterConnection) connection;
@ -111,7 +111,7 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
*/ */
protected abstract V rpcCall() throws Exception; protected abstract V rpcCall() throws Exception;
PayloadCarryingRpcController getRpcController() { HBaseRpcController getRpcController() {
return this.rpcController; return this.rpcController;
} }

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter;

View File

@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class NoncedRegionServerCallable<T> extends AbstractRegionServerCallable<T> { public abstract class NoncedRegionServerCallable<T> extends AbstractRegionServerCallable<T> {
private ClientService.BlockingInterface stub; private ClientService.BlockingInterface stub;
private final PayloadCarryingRpcController rpcController; private final HBaseRpcController rpcController;
private final long nonce; private final long nonce;
/** /**
@ -59,7 +59,7 @@ public abstract class NoncedRegionServerCallable<T> extends AbstractRegionServer
this(connection, rpcControllerFactory.newController(), tableName, row); this(connection, rpcControllerFactory.newController(), tableName, row);
} }
public NoncedRegionServerCallable(Connection connection, PayloadCarryingRpcController rpcController, public NoncedRegionServerCallable(Connection connection, HBaseRpcController rpcController,
TableName tableName, byte [] row) { TableName tableName, byte [] row) {
super(connection, tableName, row); super(connection, tableName, row);
this.rpcController = rpcController; this.rpcController = rpcController;
@ -111,9 +111,9 @@ public abstract class NoncedRegionServerCallable<T> extends AbstractRegionServer
* class. * class.
* @throws Exception * @throws Exception
*/ */
protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception; protected abstract T call(HBaseRpcController rpcController) throws Exception;
public PayloadCarryingRpcController getRpcController() { public HBaseRpcController getRpcController() {
return this.rpcController; return this.rpcController;
} }

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.util.Bytes;
public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> { public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> {
protected AdminService.BlockingInterface stub; protected AdminService.BlockingInterface stub;
protected final RpcControllerFactory rpcControllerFactory; protected final RpcControllerFactory rpcControllerFactory;
private PayloadCarryingRpcController controller = null; private HBaseRpcController controller = null;
protected final ClusterConnection connection; protected final ClusterConnection connection;
protected HRegionLocation location; protected HRegionLocation location;
@ -186,7 +186,7 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
} }
} }
PayloadCarryingRpcController getCurrentPayloadCarryingRpcController() { HBaseRpcController getCurrentPayloadCarryingRpcController() {
return this.controller; return this.controller;
} }
@ -197,5 +197,5 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
* class. * class.
* @throws Exception * @throws Exception
*/ */
protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception; protected abstract T call(HBaseRpcController rpcController) throws Exception;
} }

View File

@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
@ -103,8 +103,8 @@ public abstract class RegionServerCallable<T> extends AbstractRegionServerCallab
if (this.rpcController != null) { if (this.rpcController != null) {
// Do a reset to clear previous states, such as CellScanner. // Do a reset to clear previous states, such as CellScanner.
this.rpcController.reset(); this.rpcController.reset();
if (this.rpcController instanceof PayloadCarryingRpcController) { if (this.rpcController instanceof HBaseRpcController) {
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)this.rpcController; HBaseRpcController pcrc = (HBaseRpcController)this.rpcController;
// If it is an instance of PayloadCarryingRpcController, we can set priority on the // If it is an instance of PayloadCarryingRpcController, we can set priority on the
// controller based off the tableName. RpcController may be null in tests when mocking so allow // controller based off the tableName. RpcController may be null in tests when mocking so allow
// for null controller. // for null controller.
@ -141,10 +141,10 @@ public abstract class RegionServerCallable<T> extends AbstractRegionServerCallab
* a Coproccessor Endpoint context. Should never happen. * a Coproccessor Endpoint context. Should never happen.
*/ */
protected CellScanner getRpcControllerCellScanner() { protected CellScanner getRpcControllerCellScanner() {
return ((PayloadCarryingRpcController)this.rpcController).cellScanner(); return ((HBaseRpcController)this.rpcController).cellScanner();
} }
protected void setRpcControllerCellScanner(CellScanner cellScanner) { protected void setRpcControllerCellScanner(CellScanner cellScanner) {
((PayloadCarryingRpcController)this.rpcController).setCellScanner(cellScanner); ((HBaseRpcController)this.rpcController).setCellScanner(cellScanner);
} }
} }

View File

@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
@ -89,7 +89,7 @@ public class RpcRetryingCallerWithReadReplicas {
*/ */
class ReplicaRegionServerCallable extends RegionServerCallable<Result> implements Cancellable { class ReplicaRegionServerCallable extends RegionServerCallable<Result> implements Cancellable {
final int id; final int id;
private final PayloadCarryingRpcController controller; private final HBaseRpcController controller;
public ReplicaRegionServerCallable(int id, HRegionLocation location) { public ReplicaRegionServerCallable(int id, HRegionLocation location) {
super(RpcRetryingCallerWithReadReplicas.this.cConnection, rpcControllerFactory, super(RpcRetryingCallerWithReadReplicas.this.cConnection, rpcControllerFactory,
@ -144,7 +144,7 @@ public class RpcRetryingCallerWithReadReplicas {
ClientProtos.GetRequest request = ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(reg, get); RequestConverter.buildGetRequest(reg, get);
// Presumption that we are passed a PayloadCarryingRpcController here! // Presumption that we are passed a PayloadCarryingRpcController here!
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller; HBaseRpcController pcrc = (HBaseRpcController)controller;
pcrc.setCallTimeout(callTimeout); pcrc.setCallTimeout(callTimeout);
ClientProtos.GetResponse response = getStub().get(controller, request); ClientProtos.GetResponse response = getStub().get(controller, request);
if (response == null) { if (response == null) {

View File

@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;

View File

@ -214,11 +214,11 @@ public abstract class AbstractRpcClient implements RpcClient {
* new Connection each time. * new Connection each time.
* @return A pair with the Message response and the Cell data (if any). * @return A pair with the Message response and the Cell data (if any).
*/ */
private Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc, private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController pcrc,
Message param, Message returnType, final User ticket, final InetSocketAddress isa) Message param, Message returnType, final User ticket, final InetSocketAddress isa)
throws ServiceException { throws ServiceException {
if (pcrc == null) { if (pcrc == null) {
pcrc = new PayloadCarryingRpcController(); pcrc = new HBaseRpcControllerImpl();
} }
Pair<Message, CellScanner> val; Pair<Message, CellScanner> val;
@ -257,7 +257,7 @@ public abstract class AbstractRpcClient implements RpcClient {
* @throws InterruptedException if call is interrupted * @throws InterruptedException if call is interrupted
* @throws java.io.IOException if transport failed * @throws java.io.IOException if transport failed
*/ */
protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, protected abstract Pair<Message, CellScanner> call(HBaseRpcController pcrc,
Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
InetSocketAddress isa, MetricsConnection.CallStats callStats) InetSocketAddress isa, MetricsConnection.CallStats callStats)
throws IOException, InterruptedException; throws IOException, InterruptedException;
@ -274,16 +274,16 @@ public abstract class AbstractRpcClient implements RpcClient {
* @param channelOperationTimeout timeout for operation * @param channelOperationTimeout timeout for operation
* @return configured payload controller * @return configured payload controller
*/ */
static PayloadCarryingRpcController configurePayloadCarryingRpcController( static HBaseRpcController configurePayloadCarryingRpcController(
RpcController controller, int channelOperationTimeout) { RpcController controller, int channelOperationTimeout) {
PayloadCarryingRpcController pcrc; HBaseRpcController pcrc;
if (controller != null && controller instanceof PayloadCarryingRpcController) { if (controller != null && controller instanceof HBaseRpcController) {
pcrc = (PayloadCarryingRpcController) controller; pcrc = (HBaseRpcController) controller;
if (!pcrc.hasCallTimeout()) { if (!pcrc.hasCallTimeout()) {
pcrc.setCallTimeout(channelOperationTimeout); pcrc.setCallTimeout(channelOperationTimeout);
} }
} else { } else {
pcrc = new PayloadCarryingRpcController(); pcrc = new HBaseRpcControllerImpl();
pcrc.setCallTimeout(channelOperationTimeout); pcrc.setCallTimeout(channelOperationTimeout);
} }
return pcrc; return pcrc;
@ -317,7 +317,7 @@ public abstract class AbstractRpcClient implements RpcClient {
@Override @Override
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType) throws ServiceException { Message param, Message returnType) throws ServiceException {
PayloadCarryingRpcController pcrc = configurePayloadCarryingRpcController( HBaseRpcController pcrc = configurePayloadCarryingRpcController(
controller, controller,
channelOperationTimeout); channelOperationTimeout);

View File

@ -430,7 +430,7 @@ public class AsyncRpcChannel {
requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
} }
// Only pass priority if there one. Let zero be same as no priority. // Only pass priority if there one. Let zero be same as no priority.
if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { if (call.getPriority() != HBaseRpcController.PRIORITY_UNSET) {
requestHeaderBuilder.setPriority(call.getPriority()); requestHeaderBuilder.setPriority(call.getPriority());
} }
requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ? requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ?

View File

@ -231,12 +231,12 @@ public class AsyncRpcClient extends AbstractRpcClient {
* @throws java.io.IOException if a connection failure is encountered * @throws java.io.IOException if a connection failure is encountered
*/ */
@Override @Override
protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, protected Pair<Message, CellScanner> call(HBaseRpcController pcrc,
Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
InetSocketAddress addr, MetricsConnection.CallStats callStats) InetSocketAddress addr, MetricsConnection.CallStats callStats)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (pcrc == null) { if (pcrc == null) {
pcrc = new PayloadCarryingRpcController(); pcrc = new HBaseRpcControllerImpl();
} }
final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
@ -269,7 +269,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
} }
private MessageConverter<Message, Message> getMessageConverterWithRpcController( private MessageConverter<Message, Message> getMessageConverterWithRpcController(
final PayloadCarryingRpcController pcrc) { final HBaseRpcController pcrc) {
return new return new
MessageConverter<Message, Message>() { MessageConverter<Message, Message>() {
@Override @Override
@ -284,7 +284,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
* Call method async * Call method async
*/ */
private void callMethod(final Descriptors.MethodDescriptor md, private void callMethod(final Descriptors.MethodDescriptor md,
final PayloadCarryingRpcController pcrc, final Message param, Message returnType, User ticket, final HBaseRpcController pcrc, final Message param, Message returnType, User ticket,
InetSocketAddress addr, final RpcCallback<Message> done) { InetSocketAddress addr, final RpcCallback<Message> done) {
final AsyncRpcChannel connection; final AsyncRpcChannel connection;
try { try {
@ -490,7 +490,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
@Override @Override
public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType, RpcCallback<Message> done) { Message param, Message returnType, RpcCallback<Message> done) {
PayloadCarryingRpcController pcrc = HBaseRpcController pcrc =
configurePayloadCarryingRpcController(controller, channelOperationTimeout); configurePayloadCarryingRpcController(controller, channelOperationTimeout);
this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done); this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done);

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.RpcCallback;
import java.io.IOException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Simple delegating controller for use with the {@link RpcControllerFactory} to help override
* standard behavior of a {@link HBaseRpcController}.
*/
@InterfaceAudience.Private
public class DelegatingHBaseRpcController implements HBaseRpcController {
private final HBaseRpcController delegate;
public DelegatingHBaseRpcController(HBaseRpcController delegate) {
this.delegate = delegate;
}
@Override
public void reset() {
delegate.reset();
}
@Override
public boolean failed() {
return delegate.failed();
}
@Override
public String errorText() {
return delegate.errorText();
}
@Override
public void startCancel() {
delegate.startCancel();
}
@Override
public void setFailed(String reason) {
delegate.setFailed(reason);
}
@Override
public boolean isCanceled() {
return delegate.isCanceled();
}
@Override
public void notifyOnCancel(RpcCallback<Object> callback) {
delegate.notifyOnCancel(callback);
}
@Override
public CellScanner cellScanner() {
return delegate.cellScanner();
}
@Override
public void setCellScanner(CellScanner cellScanner) {
delegate.setCellScanner(cellScanner);
}
@Override
public void setPriority(int priority) {
delegate.setPriority(priority);
}
@Override
public void setPriority(TableName tn) {
delegate.setPriority(tn);
}
@Override
public int getPriority() {
return delegate.getPriority();
}
@Override
public int getCallTimeout() {
return delegate.getCallTimeout();
}
@Override
public void setCallTimeout(int callTimeout) {
delegate.setCallTimeout(callTimeout);
}
@Override
public boolean hasCallTimeout() {
return delegate.hasCallTimeout();
}
@Override
public void setFailed(IOException e) {
delegate.setFailed(e);
}
@Override
public IOException getFailed() {
return delegate.getFailed();
}
@Override
public void setDone(CellScanner cellScanner) {
delegate.setDone(cellScanner);
}
@Override
public void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action)
throws IOException {
delegate.notifyOnCancel(callback, action);
}
}

View File

@ -1,60 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Simple delegating controller for use with the {@link RpcControllerFactory} to help override
* standard behavior of a {@link PayloadCarryingRpcController}.
*/
@InterfaceAudience.Private
public class DelegatingPayloadCarryingRpcController extends PayloadCarryingRpcController {
private final PayloadCarryingRpcController delegate;
public DelegatingPayloadCarryingRpcController(PayloadCarryingRpcController delegate) {
this.delegate = delegate;
}
@Override
public CellScanner cellScanner() {
return delegate.cellScanner();
}
@Override
public void setCellScanner(final CellScanner cellScanner) {
delegate.setCellScanner(cellScanner);
}
@Override
public void setPriority(int priority) {
delegate.setPriority(priority);
}
@Override
public void setPriority(final TableName tn) {
delegate.setPriority(tn);
}
@Override
public int getPriority() {
return delegate.getPriority();
}
}

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.io.IOException;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Optionally carries Cells across the proxy/service interface down into ipc. On its way out it
* optionally carries a set of result Cell data. We stick the Cells here when we want to avoid
* having to protobuf them (for performance reasons). This class is used ferrying data across the
* proxy/protobuf service chasm. Also does call timeout. Used by client and server ipc'ing.
*/
@InterfaceAudience.Private
public interface HBaseRpcController extends RpcController, CellScannable {
static final int PRIORITY_UNSET = -1;
/**
* Only used to send cells to rpc server, the returned cells should be set by
* {@link #setDone(CellScanner)}.
*/
void setCellScanner(CellScanner cellScanner);
/**
* @param priority Priority for this request; should fall roughly in the range
* {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}
*/
void setPriority(int priority);
/**
* @param tn Set priority based off the table we are going against.
*/
void setPriority(final TableName tn);
/**
* @return The priority of this request
*/
int getPriority();
int getCallTimeout();
void setCallTimeout(int callTimeout);
boolean hasCallTimeout();
/**
* Set failed with an exception to pass on. For use in async rpc clients
* @param e exception to set with
*/
void setFailed(IOException e);
/**
* Return the failed exception, null if not failed.
*/
IOException getFailed();
/**
* <b>IMPORTANT:</b> always call this method if the call finished without any exception to tell
* the {@code HBaseRpcController} that we are done.
*/
void setDone(CellScanner cellScanner);
/**
* A little different from the basic RpcController:
* <ol>
* <li>You can register multiple callbacks to an {@code HBaseRpcController}.</li>
* <li>The callback will not be called if the rpc call is finished without any cancellation.</li>
* <li>You can call me at client side also.</li>
* </ol>
*/
@Override
void notifyOnCancel(RpcCallback<Object> callback);
interface CancellationCallback {
void run(boolean cancelled) throws IOException;
}
/**
* If not cancelled, add the callback to cancellation callback list. And then execute the action
* with the cancellation state as a parameter. The implementation should guarantee that the
* cancellation state does not change during this call.
*/
void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action) throws IOException;
}

View File

@ -0,0 +1,244 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.RpcCallback;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Optionally carries Cells across the proxy/service interface down into ipc. On its way out it
* optionally carries a set of result Cell data. We stick the Cells here when we want to avoid
* having to protobuf them (for performance reasons). This class is used ferrying data across the
* proxy/protobuf service chasm. Also does call timeout. Used by client and server ipc'ing.
*/
@InterfaceAudience.Private
public class HBaseRpcControllerImpl implements HBaseRpcController {
/**
* The time, in ms before the call should expire.
*/
private Integer callTimeout;
private boolean done = false;
private boolean cancelled = false;
private final List<RpcCallback<Object>> cancellationCbs = new ArrayList<>();
private IOException exception;
/**
* Priority to set on this request. Set it here in controller so available composing the request.
* This is the ordained way of setting priorities going forward. We will be undoing the old
* annotation-based mechanism.
*/
private int priority = PRIORITY_UNSET;
/**
* They are optionally set on construction, cleared after we make the call, and then optionally
* set on response with the result. We use this lowest common denominator access to Cells because
* sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded
* block that implements CellScanner.
*/
private CellScanner cellScanner;
public HBaseRpcControllerImpl() {
this((CellScanner) null);
}
public HBaseRpcControllerImpl(final CellScanner cellScanner) {
this.cellScanner = cellScanner;
}
public HBaseRpcControllerImpl(final List<CellScannable> cellIterables) {
this.cellScanner = cellIterables == null ? null : CellUtil.createCellScanner(cellIterables);
}
/**
* @return One-shot cell scanner (you cannot back it up and restart)
*/
@Override
public CellScanner cellScanner() {
return cellScanner;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
justification = "The only possible race method is startCancel")
@Override
public void setCellScanner(final CellScanner cellScanner) {
this.cellScanner = cellScanner;
}
@Override
public void setPriority(int priority) {
this.priority = priority;
}
@Override
public void setPriority(final TableName tn) {
setPriority(
tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS);
}
@Override
public int getPriority() {
return priority;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
justification = "The only possible race method is startCancel")
@Override
public void reset() {
priority = 0;
cellScanner = null;
exception = null;
callTimeout = null;
// In the implementations of some callable with replicas, rpc calls are executed in a executor
// and we could cancel the operation from outside which means there could be a race between
// reset and startCancel. Although I think the race should be handled by the callable since the
// reset may clear the cancel state...
synchronized (this) {
done = false;
cancelled = false;
cancellationCbs.clear();
}
}
@Override
public int getCallTimeout() {
if (callTimeout != null) {
return callTimeout.intValue();
} else {
return 0;
}
}
@Override
public void setCallTimeout(int callTimeout) {
this.callTimeout = callTimeout;
}
@Override
public boolean hasCallTimeout() {
return callTimeout != null;
}
@Override
public synchronized String errorText() {
if (!done || exception == null) {
return null;
}
return exception.getMessage();
}
@Override
public synchronized boolean failed() {
return done && this.exception != null;
}
@Override
public synchronized boolean isCanceled() {
return cancelled;
}
@Override
public void notifyOnCancel(RpcCallback<Object> callback) {
synchronized (this) {
if (done) {
return;
}
if (!cancelled) {
cancellationCbs.add(callback);
return;
}
}
// run it directly as we have already been cancelled.
callback.run(null);
}
@Override
public synchronized void setFailed(String reason) {
if (done) {
return;
}
done = true;
exception = new IOException(reason);
}
@Override
public synchronized void setFailed(IOException e) {
if (done) {
return;
}
done = true;
exception = e;
}
@Override
public synchronized IOException getFailed() {
return done ? exception : null;
}
@Override
public synchronized void setDone(CellScanner cellScanner) {
if (done) {
return;
}
done = true;
this.cellScanner = cellScanner;
}
@Override
public void startCancel() {
// As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need
// to copy it.
List<RpcCallback<Object>> cbs;
synchronized (this) {
if (done) {
return;
}
done = true;
cancelled = true;
cbs = new ArrayList<>(cancellationCbs);
}
for (RpcCallback<?> cb : cbs) {
cb.run(null);
}
}
@Override
public synchronized void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action)
throws IOException {
if (cancelled) {
action.run(true);
} else {
cancellationCbs.add(callback);
action.run(false);
}
}
}

View File

@ -1,214 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
/**
* Optionally carries Cells across the proxy/service interface down into ipc. On its
* way out it optionally carries a set of result Cell data. We stick the Cells here when we want
* to avoid having to protobuf them (for performance reasons). This class is used ferrying data
* across the proxy/protobuf service chasm. Also does call timeout. Used by client and server
* ipc'ing.
*/
@InterfaceAudience.Private
public class PayloadCarryingRpcController implements RpcController, CellScannable {
/**
* The time, in ms before the call should expire.
*/
protected volatile Integer callTimeout;
protected volatile boolean cancelled = false;
protected final AtomicReference<RpcCallback<Object>> cancellationCb = new AtomicReference<>(null);
protected final AtomicReference<RpcCallback<IOException>> failureCb = new AtomicReference<>(null);
private IOException exception;
public static final int PRIORITY_UNSET = -1;
/**
* Priority to set on this request. Set it here in controller so available composing the
* request. This is the ordained way of setting priorities going forward. We will be
* undoing the old annotation-based mechanism.
*/
private int priority = PRIORITY_UNSET;
/**
* They are optionally set on construction, cleared after we make the call, and then optionally
* set on response with the result. We use this lowest common denominator access to Cells because
* sometimes the scanner is backed by a List of Cells and other times, it is backed by an
* encoded block that implements CellScanner.
*/
private CellScanner cellScanner;
public PayloadCarryingRpcController() {
this((CellScanner)null);
}
public PayloadCarryingRpcController(final CellScanner cellScanner) {
this.cellScanner = cellScanner;
}
public PayloadCarryingRpcController(final List<CellScannable> cellIterables) {
this.cellScanner = cellIterables == null? null: CellUtil.createCellScanner(cellIterables);
}
/**
* @return One-shot cell scanner (you cannot back it up and restart)
*/
@Override
public CellScanner cellScanner() {
return cellScanner;
}
public void setCellScanner(final CellScanner cellScanner) {
this.cellScanner = cellScanner;
}
/**
* @param priority Priority for this request; should fall roughly in the range
* {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}
*/
public void setPriority(int priority) {
this.priority = priority;
}
/**
* @param tn Set priority based off the table we are going against.
*/
public void setPriority(final TableName tn) {
setPriority(tn != null && tn.isSystemTable()? HConstants.SYSTEMTABLE_QOS:
HConstants.NORMAL_QOS);
}
/**
* @return The priority of this request
*/
public int getPriority() {
return priority;
}
@Override
public void reset() {
priority = 0;
cellScanner = null;
exception = null;
cancelled = false;
failureCb.set(null);
cancellationCb.set(null);
callTimeout = null;
}
public int getCallTimeout() {
if (callTimeout != null) {
return callTimeout;
} else {
return 0;
}
}
public void setCallTimeout(int callTimeout) {
this.callTimeout = callTimeout;
}
public boolean hasCallTimeout(){
return callTimeout != null;
}
@Override
public String errorText() {
if (exception != null) {
return exception.getMessage();
} else {
return null;
}
}
/**
* For use in async rpc clients
* @return true if failed
*/
@Override
public boolean failed() {
return this.exception != null;
}
@Override
public boolean isCanceled() {
return cancelled;
}
@Override
public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
this.cancellationCb.set(cancellationCb);
if (this.cancelled) {
cancellationCb.run(null);
}
}
/**
* Notify a callback on error.
* For use in async rpc clients
*
* @param failureCb the callback to call on error
*/
public void notifyOnFail(RpcCallback<IOException> failureCb) {
this.failureCb.set(failureCb);
if (this.exception != null) {
failureCb.run(this.exception);
}
}
@Override
public void setFailed(String reason) {
this.exception = new IOException(reason);
if (this.failureCb.get() != null) {
this.failureCb.get().run(this.exception);
}
}
/**
* Set failed with an exception to pass on.
* For use in async rpc clients
*
* @param e exception to set with
*/
public void setFailed(IOException e) {
this.exception = e;
if (this.failureCb.get() != null) {
this.failureCb.get().run(this.exception);
}
}
@Override
public void startCancel() {
cancelled = true;
if (cancellationCb.get() != null) {
cancellationCb.get().run(null);
}
}
}

View File

@ -903,7 +903,7 @@ public class RpcClientImpl extends AbstractRpcClient {
builder.setCellBlockMeta(cellBlockBuilder.build()); builder.setCellBlockMeta(cellBlockBuilder.build());
} }
// Only pass priority if there is one set. // Only pass priority if there is one set.
if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) { if (priority != HBaseRpcController.PRIORITY_UNSET) {
builder.setPriority(priority); builder.setPriority(priority);
} }
builder.setTimeout(call.timeout); builder.setTimeout(call.timeout);
@ -1208,12 +1208,12 @@ public class RpcClientImpl extends AbstractRpcClient {
* @throws IOException if something fails on the connection * @throws IOException if something fails on the connection
*/ */
@Override @Override
protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md, protected Pair<Message, CellScanner> call(HBaseRpcController pcrc, MethodDescriptor md,
Message param, Message returnType, User ticket, InetSocketAddress addr, Message param, Message returnType, User ticket, InetSocketAddress addr,
MetricsConnection.CallStats callStats) MetricsConnection.CallStats callStats)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (pcrc == null) { if (pcrc == null) {
pcrc = new PayloadCarryingRpcController(); pcrc = new HBaseRpcControllerImpl();
} }
Call call = this.call(md, param, returnType, pcrc, ticket, addr, callStats); Call call = this.call(md, param, returnType, pcrc, ticket, addr, callStats);
@ -1236,7 +1236,7 @@ public class RpcClientImpl extends AbstractRpcClient {
* @throws IOException if something fails on the connection * @throws IOException if something fails on the connection
*/ */
private <R extends Message> Call call(MethodDescriptor method, Message request, private <R extends Message> Call call(MethodDescriptor method, Message request,
R responsePrototype, PayloadCarryingRpcController pcrc, User ticket, R responsePrototype, HBaseRpcController pcrc, User ticket,
InetSocketAddress addr, MetricsConnection.CallStats callStats) InetSocketAddress addr, MetricsConnection.CallStats callStats)
throws IOException, InterruptedException { throws IOException, InterruptedException {

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
/** /**
* Factory to create a {@link PayloadCarryingRpcController} * Factory to create a {@link HBaseRpcController}
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class RpcControllerFactory { public class RpcControllerFactory {
@ -46,17 +46,17 @@ public class RpcControllerFactory {
this.conf = conf; this.conf = conf;
} }
public PayloadCarryingRpcController newController() { public HBaseRpcController newController() {
// TODO: Set HConstants default rpc timeout here rather than nothing? // TODO: Set HConstants default rpc timeout here rather than nothing?
return new PayloadCarryingRpcController(); return new HBaseRpcControllerImpl();
} }
public PayloadCarryingRpcController newController(final CellScanner cellScanner) { public HBaseRpcController newController(final CellScanner cellScanner) {
return new PayloadCarryingRpcController(cellScanner); return new HBaseRpcControllerImpl(cellScanner);
} }
public PayloadCarryingRpcController newController(final List<CellScannable> cellIterables) { public HBaseRpcController newController(final List<CellScannable> cellIterables) {
return new PayloadCarryingRpcController(cellIterables); return new HBaseRpcControllerImpl(cellIterables);
} }

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.client.security.SecurityCapability;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService.BlockingInterface; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService.BlockingInterface;
@ -94,7 +94,7 @@ public class AccessControlClient {
public static void grant(Connection connection, final TableName tableName, public static void grant(Connection connection, final TableName tableName,
final String userName, final byte[] family, final byte[] qual, final String userName, final byte[] family, final byte[] qual,
final Permission.Action... actions) throws Throwable { final Permission.Action... actions) throws Throwable {
PayloadCarryingRpcController controller HBaseRpcController controller
= ((ClusterConnection) connection).getRpcControllerFactory().newController(); = ((ClusterConnection) connection).getRpcControllerFactory().newController();
controller.setPriority(tableName); controller.setPriority(tableName);
try (Table table = connection.getTable(ACL_TABLE_NAME)) { try (Table table = connection.getTable(ACL_TABLE_NAME)) {
@ -113,7 +113,7 @@ public class AccessControlClient {
*/ */
public static void grant(Connection connection, final String namespace, public static void grant(Connection connection, final String namespace,
final String userName, final Permission.Action... actions) throws Throwable { final String userName, final Permission.Action... actions) throws Throwable {
PayloadCarryingRpcController controller HBaseRpcController controller
= ((ClusterConnection) connection).getRpcControllerFactory().newController(); = ((ClusterConnection) connection).getRpcControllerFactory().newController();
try (Table table = connection.getTable(ACL_TABLE_NAME)) { try (Table table = connection.getTable(ACL_TABLE_NAME)) {
@ -128,7 +128,7 @@ public class AccessControlClient {
*/ */
public static void grant(Connection connection, final String userName, public static void grant(Connection connection, final String userName,
final Permission.Action... actions) throws Throwable { final Permission.Action... actions) throws Throwable {
PayloadCarryingRpcController controller HBaseRpcController controller
= ((ClusterConnection) connection).getRpcControllerFactory().newController(); = ((ClusterConnection) connection).getRpcControllerFactory().newController();
try (Table table = connection.getTable(ACL_TABLE_NAME)) { try (Table table = connection.getTable(ACL_TABLE_NAME)) {
ProtobufUtil.grant(controller, getAccessControlServiceStub(table), userName, actions); ProtobufUtil.grant(controller, getAccessControlServiceStub(table), userName, actions);
@ -155,7 +155,7 @@ public class AccessControlClient {
public static void revoke(Connection connection, final TableName tableName, public static void revoke(Connection connection, final TableName tableName,
final String username, final byte[] family, final byte[] qualifier, final String username, final byte[] family, final byte[] qualifier,
final Permission.Action... actions) throws Throwable { final Permission.Action... actions) throws Throwable {
PayloadCarryingRpcController controller HBaseRpcController controller
= ((ClusterConnection) connection).getRpcControllerFactory().newController(); = ((ClusterConnection) connection).getRpcControllerFactory().newController();
controller.setPriority(tableName); controller.setPriority(tableName);
try (Table table = connection.getTable(ACL_TABLE_NAME)) { try (Table table = connection.getTable(ACL_TABLE_NAME)) {
@ -174,7 +174,7 @@ public class AccessControlClient {
*/ */
public static void revoke(Connection connection, final String namespace, public static void revoke(Connection connection, final String namespace,
final String userName, final Permission.Action... actions) throws Throwable { final String userName, final Permission.Action... actions) throws Throwable {
PayloadCarryingRpcController controller HBaseRpcController controller
= ((ClusterConnection) connection).getRpcControllerFactory().newController(); = ((ClusterConnection) connection).getRpcControllerFactory().newController();
try (Table table = connection.getTable(ACL_TABLE_NAME)) { try (Table table = connection.getTable(ACL_TABLE_NAME)) {
ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), userName, namespace, ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), userName, namespace,
@ -188,7 +188,7 @@ public class AccessControlClient {
*/ */
public static void revoke(Connection connection, final String userName, public static void revoke(Connection connection, final String userName,
final Permission.Action... actions) throws Throwable { final Permission.Action... actions) throws Throwable {
PayloadCarryingRpcController controller HBaseRpcController controller
= ((ClusterConnection) connection).getRpcControllerFactory().newController(); = ((ClusterConnection) connection).getRpcControllerFactory().newController();
try (Table table = connection.getTable(ACL_TABLE_NAME)) { try (Table table = connection.getTable(ACL_TABLE_NAME)) {
ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), userName, actions); ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), userName, actions);
@ -206,7 +206,7 @@ public class AccessControlClient {
*/ */
public static List<UserPermission> getUserPermissions(Connection connection, String tableRegex) public static List<UserPermission> getUserPermissions(Connection connection, String tableRegex)
throws Throwable { throws Throwable {
PayloadCarryingRpcController controller HBaseRpcController controller
= ((ClusterConnection) connection).getRpcControllerFactory().newController(); = ((ClusterConnection) connection).getRpcControllerFactory().newController();
List<UserPermission> permList = new ArrayList<UserPermission>(); List<UserPermission> permList = new ArrayList<UserPermission>();
try (Table table = connection.getTable(ACL_TABLE_NAME)) { try (Table table = connection.getTable(ACL_TABLE_NAME)) {

View File

@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.ipc.FailedServerException; import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -318,7 +318,7 @@ public class MetaTableLocator {
return false; return false;
} }
Throwable t; Throwable t;
PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController(); HBaseRpcController controller = connection.getRpcControllerFactory().newController();
try { try {
// Try and get regioninfo from the hosting server. // Try and get regioninfo from the hosting server.
return ProtobufUtil.getRegionInfo(controller, hostingServer, regionName) != null; return ProtobufUtil.getRegionInfo(controller, hostingServer, regionName) != null;

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
@ -88,7 +88,7 @@ public class TestSnapshotFromAdmin {
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf);
RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class);
Mockito.when(controllerFactory.newController()).thenReturn( Mockito.when(controllerFactory.newController()).thenReturn(
Mockito.mock(PayloadCarryingRpcController.class)); Mockito.mock(HBaseRpcController.class));
Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory); Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory);
// set the max wait time for the snapshot to complete // set the max wait time for the snapshot to complete
@ -136,7 +136,7 @@ public class TestSnapshotFromAdmin {
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf);
RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class);
Mockito.when(controllerFactory.newController()).thenReturn( Mockito.when(controllerFactory.newController()).thenReturn(
Mockito.mock(PayloadCarryingRpcController.class)); Mockito.mock(HBaseRpcController.class));
Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory); Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory);
Admin admin = new HBaseAdmin(mockConnection); Admin admin = new HBaseAdmin(mockConnection);

View File

@ -34,9 +34,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@Category({ ClientTests.class, SmallTests.class })
public class TestHBaseRpcControllerImpl {
@Category({ClientTests.class, SmallTests.class})
public class TestPayloadCarryingRpcController {
@Test @Test
public void testListOfCellScannerables() throws IOException { public void testListOfCellScannerables() throws IOException {
List<CellScannable> cells = new ArrayList<CellScannable>(); List<CellScannable> cells = new ArrayList<CellScannable>();
@ -44,12 +44,12 @@ public class TestPayloadCarryingRpcController {
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
cells.add(createCell(i)); cells.add(createCell(i));
} }
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells); HBaseRpcController controller = new HBaseRpcControllerImpl(cells);
CellScanner cellScanner = controller.cellScanner(); CellScanner cellScanner = controller.cellScanner();
int index = 0; int index = 0;
for (; cellScanner.advance(); index++) { for (; cellScanner.advance(); index++) {
Cell cell = cellScanner.current(); Cell cell = cellScanner.current();
byte [] indexBytes = Bytes.toBytes(index); byte[] indexBytes = Bytes.toBytes(index);
assertTrue("" + index, Bytes.equals(indexBytes, 0, indexBytes.length, cell.getValueArray(), assertTrue("" + index, Bytes.equals(indexBytes, 0, indexBytes.length, cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength())); cell.getValueOffset(), cell.getValueLength()));
} }
@ -67,7 +67,7 @@ public class TestPayloadCarryingRpcController {
return new CellScanner() { return new CellScanner() {
@Override @Override
public Cell current() { public Cell current() {
// Fake out a Cell. All this Cell has is a value that is an int in size and equal // Fake out a Cell. All this Cell has is a value that is an int in size and equal
// to the above 'index' param serialized as an int. // to the above 'index' param serialized as an int.
return new Cell() { return new Cell() {
private final int i = index; private final int i = index;
@ -180,6 +180,7 @@ public class TestPayloadCarryingRpcController {
} }
private boolean hasCell = true; private boolean hasCell = true;
@Override @Override
public boolean advance() { public boolean advance() {
// We have one Cell only so return true first time then false ever after. // We have one Cell only so return true first time then false ever after.

View File

@ -2205,7 +2205,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
status.setRPCPacket(param); status.setRPCPacket(param);
status.resume("Servicing call"); status.resume("Servicing call");
//get an instance of the method arg type //get an instance of the method arg type
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner); HBaseRpcController controller = new HBaseRpcControllerImpl(cellScanner);
controller.setCallTimeout(timeout); controller.setCallTimeout(timeout);
Message result = service.callBlockingMethod(md, controller, param); Message result = service.callBlockingMethod(md, controller, param);
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();

View File

@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@ -793,7 +793,7 @@ public class ServerManager {
} }
} }
private PayloadCarryingRpcController newRpcController() { private HBaseRpcController newRpcController() {
return rpcControllerFactory == null ? null : rpcControllerFactory.newController(); return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
} }
@ -817,7 +817,7 @@ public class ServerManager {
region.getRegionNameAsString() + region.getRegionNameAsString() +
" failed because no RPC connection found to this server"); " failed because no RPC connection found to this server");
} }
PayloadCarryingRpcController controller = newRpcController(); HBaseRpcController controller = newRpcController();
return ProtobufUtil.closeRegion(controller, admin, server, region.getRegionName(), dest); return ProtobufUtil.closeRegion(controller, admin, server, region.getRegionName(), dest);
} }
@ -839,7 +839,7 @@ public class ServerManager {
if (server == null) return; if (server == null) return;
try { try {
AdminService.BlockingInterface admin = getRsAdmin(server); AdminService.BlockingInterface admin = getRsAdmin(server);
PayloadCarryingRpcController controller = newRpcController(); HBaseRpcController controller = newRpcController();
ProtobufUtil.warmupRegion(controller, admin, region); ProtobufUtil.warmupRegion(controller, admin, region);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Received exception in RPC for warmup server:" + LOG.error("Received exception in RPC for warmup server:" +
@ -855,7 +855,7 @@ public class ServerManager {
public static void closeRegionSilentlyAndWait(ClusterConnection connection, public static void closeRegionSilentlyAndWait(ClusterConnection connection,
ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException { ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException {
AdminService.BlockingInterface rs = connection.getAdmin(server); AdminService.BlockingInterface rs = connection.getAdmin(server);
PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController(); HBaseRpcController controller = connection.getRpcControllerFactory().newController();
try { try {
ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName()); ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName());
} catch (IOException e) { } catch (IOException e) {
@ -906,7 +906,7 @@ public class ServerManager {
+ region_b.getRegionNameAsString() + region_b.getRegionNameAsString()
+ " failed because no RPC connection found to this server"); + " failed because no RPC connection found to this server");
} }
PayloadCarryingRpcController controller = newRpcController(); HBaseRpcController controller = newRpcController();
ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible, user); ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible, user);
} }
@ -920,7 +920,7 @@ public class ServerManager {
RetryCounter retryCounter = pingRetryCounterFactory.create(); RetryCounter retryCounter = pingRetryCounterFactory.create();
while (retryCounter.shouldRetry()) { while (retryCounter.shouldRetry()) {
try { try {
PayloadCarryingRpcController controller = newRpcController(); HBaseRpcController controller = newRpcController();
AdminService.BlockingInterface admin = getRsAdmin(server); AdminService.BlockingInterface admin = getRsAdmin(server);
if (admin != null) { if (admin != null) {
ServerInfo info = ProtobufUtil.getServerInfo(controller, admin); ServerInfo info = ProtobufUtil.getServerInfo(controller, admin);

View File

@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.protobuf; package org.apache.hadoop.hbase.protobuf;
import com.google.protobuf.ServiceException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
@ -35,7 +37,8 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
@ -46,8 +49,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
import com.google.protobuf.ServiceException;
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationProtbufUtil { public class ReplicationProtbufUtil {
/** /**
@ -66,7 +67,7 @@ public class ReplicationProtbufUtil {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir, buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
sourceHFileArchiveDir); sourceHFileArchiveDir);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond());
try { try {
admin.replicateWALEntry(controller, p.getFirst()); admin.replicateWALEntry(controller, p.getFirst());
} catch (ServiceException se) { } catch (ServiceException se) {

View File

@ -77,7 +77,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.ipc.RpcCallContext;
@ -465,7 +465,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
private void addResult(final MutateResponse.Builder builder, final Result result, private void addResult(final MutateResponse.Builder builder, final Result result,
final PayloadCarryingRpcController rpcc) { final HBaseRpcController rpcc) {
if (result == null) return; if (result == null) return;
if (isClientCellBlockSupport()) { if (isClientCellBlockSupport()) {
builder.setResult(ProtobufUtil.toResultNoData(result)); builder.setResult(ProtobufUtil.toResultNoData(result));
@ -485,7 +485,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
builder.addCellsPerResult(res.size()); builder.addCellsPerResult(res.size());
builder.addPartialFlagPerResult(res.isPartial()); builder.addPartialFlagPerResult(res.isPartial());
} }
((PayloadCarryingRpcController)controller). ((HBaseRpcController)controller).
setCellScanner(CellUtil.createCellScanner(results)); setCellScanner(CellUtil.createCellScanner(results));
} else { } else {
for (Result res: results) { for (Result res: results) {
@ -1839,7 +1839,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
public ReplicateWALEntryResponse replay(final RpcController controller, public ReplicateWALEntryResponse replay(final RpcController controller,
final ReplicateWALEntryRequest request) throws ServiceException { final ReplicateWALEntryRequest request) throws ServiceException {
long before = EnvironmentEdgeManager.currentTime(); long before = EnvironmentEdgeManager.currentTime();
CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); CellScanner cells = ((HBaseRpcController) controller).cellScanner();
try { try {
checkOpen(); checkOpen();
List<WALEntry> entries = request.getEntryList(); List<WALEntry> entries = request.getEntryList();
@ -1944,7 +1944,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (regionServer.replicationSinkHandler != null) { if (regionServer.replicationSinkHandler != null) {
requestCount.increment(); requestCount.increment();
List<WALEntry> entries = request.getEntryList(); List<WALEntry> entries = request.getEntryList();
CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner(); CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner();
regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner); regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner, regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
@ -2208,10 +2208,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} else if (r != null) { } else if (r != null) {
ClientProtos.Result pbr; ClientProtos.Result pbr;
RpcCallContext call = RpcServer.getCurrentCall(); RpcCallContext call = RpcServer.getCurrentCall();
if (isClientCellBlockSupport(call) && controller instanceof PayloadCarryingRpcController if (isClientCellBlockSupport(call) && controller instanceof HBaseRpcController
&& VersionInfoUtil.hasMinimumVersion(call.getClientVersionInfo(), 1, 3)) { && VersionInfoUtil.hasMinimumVersion(call.getClientVersionInfo(), 1, 3)) {
pbr = ProtobufUtil.toResultNoData(r); pbr = ProtobufUtil.toResultNoData(r);
((PayloadCarryingRpcController) controller).setCellScanner(CellUtil.createCellScanner(r ((HBaseRpcController) controller).setCellScanner(CellUtil.createCellScanner(r
.rawCells())); .rawCells()));
} else { } else {
pbr = ProtobufUtil.toResult(r); pbr = ProtobufUtil.toResult(r);
@ -2298,7 +2298,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
// It is also the conduit via which we pass back data. // It is also the conduit via which we pass back data.
PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; HBaseRpcController controller = (HBaseRpcController)rpcc;
CellScanner cellScanner = controller != null ? controller.cellScanner(): null; CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
if (controller != null) { if (controller != null) {
controller.setCellScanner(null); controller.setCellScanner(null);
@ -2436,7 +2436,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
final MutateRequest request) throws ServiceException { final MutateRequest request) throws ServiceException {
// rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
// It is also the conduit via which we pass back data. // It is also the conduit via which we pass back data.
PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; HBaseRpcController controller = (HBaseRpcController)rpcc;
CellScanner cellScanner = controller != null ? controller.cellScanner() : null; CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
OperationQuota quota = null; OperationQuota quota = null;
// Clear scanner so we are not holding on to reference across call. // Clear scanner so we are not holding on to reference across call.
@ -2768,9 +2768,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
} }
if (controller != null) { if (controller != null) {
if (controller instanceof PayloadCarryingRpcController) { if (controller instanceof HBaseRpcController) {
PayloadCarryingRpcController pRpcController = HBaseRpcController pRpcController =
(PayloadCarryingRpcController)controller; (HBaseRpcController)controller;
if (pRpcController.getCallTimeout() > 0) { if (pRpcController.getCallTimeout() > 0) {
timeLimitDelta = Math.min(timeLimitDelta, pRpcController.getCallTimeout()); timeLimitDelta = Math.min(timeLimitDelta, pRpcController.getCallTimeout());
} }

View File

@ -55,7 +55,7 @@ import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RetryingCallable; import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
@ -625,7 +625,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes(); this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
} }
public ReplicateWALEntryResponse call(PayloadCarryingRpcController controller) throws Exception { public ReplicateWALEntryResponse call(HBaseRpcController controller) throws Exception {
// Check whether we should still replay this entry. If the regions are changed, or the // Check whether we should still replay this entry. If the regions are changed, or the
// entry is not coming form the primary region, filter it out because we do not need it. // entry is not coming form the primary region, filter it out because we do not need it.
// Regions can change because of (1) region split (2) region merge (3) table recreated // Regions can change because of (1) region split (2) region merge (3) table recreated

View File

@ -32,7 +32,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
@ -165,7 +165,7 @@ public class TestMetaTableAccessorNoCluster {
.thenReturn(ScanResponse.newBuilder().setScannerId(1234567890L).build()) .thenReturn(ScanResponse.newBuilder().setScannerId(1234567890L).build())
.thenAnswer(new Answer<ScanResponse>() { .thenAnswer(new Answer<ScanResponse>() {
public ScanResponse answer(InvocationOnMock invocation) throws Throwable { public ScanResponse answer(InvocationOnMock invocation) throws Throwable {
((PayloadCarryingRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil ((HBaseRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil
.createCellScanner(cellScannables)); .createCellScanner(cellScannables));
return builder.build(); return builder.build();
} }

View File

@ -31,7 +31,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState;
@ -254,7 +254,7 @@ public class TestMetaTableLocator {
thenReturn(implementation); thenReturn(implementation);
RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class);
Mockito.when(controllerFactory.newController()).thenReturn( Mockito.when(controllerFactory.newController()).thenReturn(
Mockito.mock(PayloadCarryingRpcController.class)); Mockito.mock(HBaseRpcController.class));
Mockito.when(connection.getRpcControllerFactory()).thenReturn(controllerFactory); Mockito.when(connection.getRpcControllerFactory()).thenReturn(controllerFactory);
ServerName sn = ServerName.valueOf("example.com", 1234, System.currentTimeMillis()); ServerName sn = ServerName.valueOf("example.com", 1234, System.currentTimeMillis());

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
@ -301,7 +301,7 @@ public class TestHBaseAdminNoCluster {
RpcControllerFactory rpcControllerFactory = Mockito.mock(RpcControllerFactory.class); RpcControllerFactory rpcControllerFactory = Mockito.mock(RpcControllerFactory.class);
Mockito.when(connection.getRpcControllerFactory()).thenReturn(rpcControllerFactory); Mockito.when(connection.getRpcControllerFactory()).thenReturn(rpcControllerFactory);
Mockito.when(rpcControllerFactory.newController()).thenReturn( Mockito.when(rpcControllerFactory.newController()).thenReturn(
Mockito.mock(PayloadCarryingRpcController.class)); Mockito.mock(HBaseRpcController.class));
// we need a real retrying caller // we need a real retrying caller
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(configuration); RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(configuration);

View File

@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
@ -548,7 +548,7 @@ public class TestHCM {
(ClusterConnection) TEST_UTIL.getConnection(), (ClusterConnection) TEST_UTIL.getConnection(),
new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, ROW) { new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, ROW) {
@Override @Override
public Object call(PayloadCarryingRpcController controller) throws Exception { public Object call(HBaseRpcController controller) throws Exception {
return null; return null;
} }
}; };

View File

@ -36,8 +36,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService; import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -57,27 +57,27 @@ public class TestRpcControllerFactory {
} }
@Override @Override
public PayloadCarryingRpcController newController() { public HBaseRpcController newController() {
return new CountingRpcController(super.newController()); return new CountingRpcController(super.newController());
} }
@Override @Override
public PayloadCarryingRpcController newController(final CellScanner cellScanner) { public HBaseRpcController newController(final CellScanner cellScanner) {
return new CountingRpcController(super.newController(cellScanner)); return new CountingRpcController(super.newController(cellScanner));
} }
@Override @Override
public PayloadCarryingRpcController newController(final List<CellScannable> cellIterables) { public HBaseRpcController newController(final List<CellScannable> cellIterables) {
return new CountingRpcController(super.newController(cellIterables)); return new CountingRpcController(super.newController(cellIterables));
} }
} }
public static class CountingRpcController extends DelegatingPayloadCarryingRpcController { public static class CountingRpcController extends DelegatingHBaseRpcController {
private static AtomicInteger INT_PRIORITY = new AtomicInteger(); private static AtomicInteger INT_PRIORITY = new AtomicInteger();
private static AtomicInteger TABLE_PRIORITY = new AtomicInteger(); private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
public CountingRpcController(PayloadCarryingRpcController delegate) { public CountingRpcController(HBaseRpcController delegate) {
super(delegate); super(delegate);
} }

View File

@ -105,7 +105,7 @@ public abstract class AbstractTestIPC {
try (AbstractRpcClient client = createRpcClientNoCodec(conf)) { try (AbstractRpcClient client = createRpcClientNoCodec(conf)) {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(); HBaseRpcController pcrc = new HBaseRpcControllerImpl();
String message = "hello"; String message = "hello";
assertEquals(message, assertEquals(message,
stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
@ -135,8 +135,7 @@ public abstract class AbstractTestIPC {
try (AbstractRpcClient client = createRpcClient(conf)) { try (AbstractRpcClient client = createRpcClient(conf)) {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController( HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
CellUtil.createCellScanner(cells));
String message = "hello"; String message = "hello";
assertEquals(message, assertEquals(message,
stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
@ -212,7 +211,7 @@ public abstract class AbstractTestIPC {
// set total RPC size bigger than 100 bytes // set total RPC size bigger than 100 bytes
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build(); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build();
stub.echo( stub.echo(
new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), new HBaseRpcControllerImpl(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))),
param); param);
fail("RPC should have failed because it exceeds max request size"); fail("RPC should have failed because it exceeds max request size");
} catch (ServiceException e) { } catch (ServiceException e) {
@ -266,7 +265,7 @@ public abstract class AbstractTestIPC {
try (AbstractRpcClient client = createRpcClient(CONF)) { try (AbstractRpcClient client = createRpcClient(CONF)) {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(); HBaseRpcController pcrc = new HBaseRpcControllerImpl();
int ms = 1000; int ms = 1000;
int timeout = 100; int timeout = 100;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
@ -306,7 +305,7 @@ public abstract class AbstractTestIPC {
class FailingConnection extends Connection { class FailingConnection extends Connection {
public FailingConnection(SocketChannel channel, long lastContact) { public FailingConnection(SocketChannel channel, long lastContact) {
super(channel, lastContact); super(channel, lastContact);
} }
@Override @Override
@ -319,12 +318,12 @@ public abstract class AbstractTestIPC {
@Override @Override
protected Connection getConnection(SocketChannel channel, long time) { protected Connection getConnection(SocketChannel channel, long time) {
return new FailingConnection(channel, time); return new FailingConnection(channel, time);
} }
} }
/** Tests that the connection closing is handled by the client with outstanding RPC calls */ /** Tests that the connection closing is handled by the client with outstanding RPC calls */
@Test (timeout = 30000) @Test
public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException { public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
Configuration conf = new Configuration(CONF); Configuration conf = new Configuration(CONF);
RpcServer rpcServer = new TestFailingRpcServer(conf); RpcServer rpcServer = new TestFailingRpcServer(conf);
@ -332,9 +331,7 @@ public abstract class AbstractTestIPC {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
stub.echo( stub.echo(null, param);
new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))),
param);
fail("RPC should have failed because connection closed"); fail("RPC should have failed because connection closed");
} catch (ServiceException e) { } catch (ServiceException e) {
LOG.info("Caught expected exception: " + e.toString()); LOG.info("Caught expected exception: " + e.toString());

View File

@ -76,8 +76,8 @@ public class TestProtobufRpcServiceImpl implements BlockingInterface {
@Override @Override
public EchoResponseProto echo(RpcController controller, EchoRequestProto request) public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
throws ServiceException { throws ServiceException {
if (controller instanceof PayloadCarryingRpcController) { if (controller instanceof HBaseRpcController) {
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; HBaseRpcController pcrc = (HBaseRpcController) controller;
// If cells, scan them to check we are able to iterate what we were given and since this is an // If cells, scan them to check we are able to iterate what we were given and since this is an
// echo, just put them back on the controller creating a new block. Tests our block building. // echo, just put them back on the controller creating a new block. Tests our block building.
CellScanner cellScanner = pcrc.cellScanner(); CellScanner cellScanner = pcrc.cellScanner();
@ -93,7 +93,7 @@ public class TestProtobufRpcServiceImpl implements BlockingInterface {
} }
} }
cellScanner = CellUtil.createCellScanner(list); cellScanner = CellUtil.createCellScanner(list);
((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); pcrc.setCellScanner(cellScanner);
} }
return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
} }

View File

@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager; import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -412,7 +412,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
builder.addCellsPerResult(result.size()); builder.addCellsPerResult(result.size());
List<CellScannable> results = new ArrayList<CellScannable>(1); List<CellScannable> results = new ArrayList<CellScannable>(1);
results.add(result); results.add(result);
((PayloadCarryingRpcController) controller).setCellScanner(CellUtil ((HBaseRpcController) controller).setCellScanner(CellUtil
.createCellScanner(results)); .createCellScanner(results));
builder.setMoreResults(true); builder.setMoreResults(true);
} }

View File

@ -22,6 +22,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import com.google.protobuf.ServiceException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -52,7 +56,7 @@ import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@ -69,10 +73,6 @@ import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import com.google.protobuf.ServiceException;
@Category(LargeTests.class) @Category(LargeTests.class)
public class TestEndToEndSplitTransaction { public class TestEndToEndSplitTransaction {
private static final Log LOG = LogFactory.getLog(TestEndToEndSplitTransaction.class); private static final Log LOG = LogFactory.getLog(TestEndToEndSplitTransaction.class);
@ -164,7 +164,7 @@ public class TestEndToEndSplitTransaction {
regionName, new Scan(row), 1, true); regionName, new Scan(row), 1, true);
try { try {
server.getRSRpcServices().scan( server.getRSRpcServices().scan(
new PayloadCarryingRpcController(), scanRequest); new HBaseRpcControllerImpl(), scanRequest);
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);
} }