HBASE-10566 cleanup rpcTimeout in the client

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1571727 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2014-02-25 16:28:57 +00:00
parent 48d8e996ed
commit 21ddb60dc0
25 changed files with 394 additions and 366 deletions

View File

@ -165,16 +165,15 @@ public class ClientSmallScanner extends ClientScanner {
this.scan.setStartRow(localStartKey); this.scan.setStartRow(localStartKey);
RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>( RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
getConnection(), getTable(), scan.getStartRow()) { getConnection(), getTable(), scan.getStartRow()) {
public Result[] call() throws IOException { public Result[] call(int callTimeout) throws IOException {
ScanRequest request = RequestConverter.buildScanRequest(getLocation() ScanRequest request = RequestConverter.buildScanRequest(getLocation()
.getRegionInfo().getRegionName(), scan, cacheNum, true); .getRegionInfo().getRegionName(), scan, cacheNum, true);
ScanResponse response = null;
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try { try {
controller.setPriority(getTableName()); ScanResponse response = getStub().scan(controller, request);
response = getStub().scan(controller, request); return ResponseConverter.getResults(controller.cellScanner(), response);
return ResponseConverter.getResults(controller.cellScanner(),
response);
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);
} }

View File

@ -608,9 +608,7 @@ class ConnectionManager {
@Override @Override
public void newDead(ServerName sn) { public void newDead(ServerName sn) {
clearCaches(sn); clearCaches(sn);
rpcClient.cancelConnections(sn.getHostname(), sn.getPort(), rpcClient.cancelConnections(sn.getHostname(), sn.getPort());
new SocketException(sn.getServerName() +
" is dead: closing its connection."));
} }
}, conf, listenerClass); }, conf, listenerClass);
} }
@ -1516,8 +1514,7 @@ class ConnectionManager {
synchronized (connectionLock.get(key)) { synchronized (connectionLock.get(key)) {
stub = stubs.get(key); stub = stubs.get(key);
if (stub == null) { if (stub == null) {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
user, rpcTimeout);
stub = makeStub(channel); stub = makeStub(channel);
isMasterRunning(); isMasterRunning();
stubs.put(key, stub); stubs.put(key, stub);
@ -1635,8 +1632,8 @@ class ConnectionManager {
synchronized (this.connectionLock.get(key)) { synchronized (this.connectionLock.get(key)) {
stub = (AdminService.BlockingInterface)this.stubs.get(key); stub = (AdminService.BlockingInterface)this.stubs.get(key);
if (stub == null) { if (stub == null) {
BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName, BlockingRpcChannel channel =
user, this.rpcTimeout); this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
stub = AdminService.newBlockingStub(channel); stub = AdminService.newBlockingStub(channel);
this.stubs.put(key, stub); this.stubs.put(key, stub);
} }
@ -1656,8 +1653,8 @@ class ConnectionManager {
synchronized (this.connectionLock.get(key)) { synchronized (this.connectionLock.get(key)) {
stub = (ClientService.BlockingInterface)this.stubs.get(key); stub = (ClientService.BlockingInterface)this.stubs.get(key);
if (stub == null) { if (stub == null) {
BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, BlockingRpcChannel channel =
user, this.rpcTimeout); this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
stub = ClientService.newBlockingStub(channel); stub = ClientService.newBlockingStub(channel);
// In old days, after getting stub/proxy, we'd make a call. We are not doing that here. // In old days, after getting stub/proxy, we'd make a call. We are not doing that here.
// Just fail on first actual call rather than in here on setup. // Just fail on first actual call rather than in here on setup.

View File

@ -28,8 +28,8 @@ public class DelegatingRetryingCallable<T, D extends RetryingCallable<T>> implem
} }
@Override @Override
public T call() throws Exception { public T call(int callTimeout) throws Exception {
return delegate.call(); return delegate.call(callTimeout);
} }
@Override @Override

View File

@ -607,7 +607,7 @@ public class HBaseAdmin implements Abortable, Closeable {
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws ServiceException { public Void call(int callTimeout) throws ServiceException {
CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys); CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys);
master.createTable(null, request); master.createTable(null, request);
return null; return null;
@ -636,7 +636,7 @@ public class HBaseAdmin implements Abortable, Closeable {
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws ServiceException { public Void call(int callTimeout) throws ServiceException {
DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName); DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName);
master.deleteTable(null,req); master.deleteTable(null,req);
return null; return null;
@ -841,7 +841,7 @@ public class HBaseAdmin implements Abortable, Closeable {
TableName.isLegalFullyQualifiedTableName(tableName.getName()); TableName.isLegalFullyQualifiedTableName(tableName.getName());
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws ServiceException { public Void call(int callTimeout) throws ServiceException {
LOG.info("Started enable of " + tableName); LOG.info("Started enable of " + tableName);
EnableTableRequest req = RequestConverter.buildEnableTableRequest(tableName); EnableTableRequest req = RequestConverter.buildEnableTableRequest(tableName);
master.enableTable(null,req); master.enableTable(null,req);
@ -918,7 +918,7 @@ public class HBaseAdmin implements Abortable, Closeable {
TableName.isLegalFullyQualifiedTableName(tableName.getName()); TableName.isLegalFullyQualifiedTableName(tableName.getName());
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws ServiceException { public Void call(int callTimeout) throws ServiceException {
LOG.info("Started disable of " + tableName); LOG.info("Started disable of " + tableName);
DisableTableRequest req = RequestConverter.buildDisableTableRequest(tableName); DisableTableRequest req = RequestConverter.buildDisableTableRequest(tableName);
master.disableTable(null,req); master.disableTable(null,req);
@ -1136,7 +1136,7 @@ public class HBaseAdmin implements Abortable, Closeable {
throws IOException { throws IOException {
return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) { return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) {
@Override @Override
public Pair<Integer, Integer> call() throws ServiceException { public Pair<Integer, Integer> call(int callTimeout) throws ServiceException {
GetSchemaAlterStatusRequest req = RequestConverter GetSchemaAlterStatusRequest req = RequestConverter
.buildGetSchemaAlterStatusRequest(tableName); .buildGetSchemaAlterStatusRequest(tableName);
GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(null, req); GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(null, req);
@ -1203,7 +1203,7 @@ public class HBaseAdmin implements Abortable, Closeable {
throws IOException { throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws ServiceException { public Void call(int callTimeout) throws ServiceException {
AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, column); AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, column);
master.addColumn(null,req); master.addColumn(null,req);
return null; return null;
@ -1249,7 +1249,7 @@ public class HBaseAdmin implements Abortable, Closeable {
throws IOException { throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws ServiceException { public Void call(int callTimeout) throws ServiceException {
DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(tableName, columnName); DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(tableName, columnName);
master.deleteColumn(null,req); master.deleteColumn(null,req);
return null; return null;
@ -1297,7 +1297,7 @@ public class HBaseAdmin implements Abortable, Closeable {
throws IOException { throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws ServiceException { public Void call(int callTimeout) throws ServiceException {
ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(tableName, descriptor); ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(tableName, descriptor);
master.modifyColumn(null,req); master.modifyColumn(null,req);
return null; return null;
@ -1707,7 +1707,7 @@ public class HBaseAdmin implements Abortable, Closeable {
final byte[] toBeAssigned = getRegionName(regionName); final byte[] toBeAssigned = getRegionName(regionName);
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws ServiceException { public Void call(int callTimeout) throws ServiceException {
AssignRegionRequest request = AssignRegionRequest request =
RequestConverter.buildAssignRegionRequest(toBeAssigned); RequestConverter.buildAssignRegionRequest(toBeAssigned);
master.assignRegion(null,request); master.assignRegion(null,request);
@ -1735,7 +1735,7 @@ public class HBaseAdmin implements Abortable, Closeable {
final byte[] toBeUnassigned = getRegionName(regionName); final byte[] toBeUnassigned = getRegionName(regionName);
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws ServiceException { public Void call(int callTimeout) throws ServiceException {
UnassignRegionRequest request = UnassignRegionRequest request =
RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force); RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
master.unassignRegion(null,request); master.unassignRegion(null,request);
@ -1992,7 +1992,7 @@ public class HBaseAdmin implements Abortable, Closeable {
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws ServiceException { public Void call(int callTimeout) throws ServiceException {
ModifyTableRequest request = RequestConverter.buildModifyTableRequest(tableName, htd); ModifyTableRequest request = RequestConverter.buildModifyTableRequest(tableName, htd);
master.modifyTable(null, request); master.modifyTable(null, request);
return null; return null;
@ -2105,7 +2105,7 @@ public class HBaseAdmin implements Abortable, Closeable {
public synchronized void shutdown() throws IOException { public synchronized void shutdown() throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws ServiceException { public Void call(int callTimeout) throws ServiceException {
master.shutdown(null,ShutdownRequest.newBuilder().build()); master.shutdown(null,ShutdownRequest.newBuilder().build());
return null; return null;
} }
@ -2121,7 +2121,7 @@ public class HBaseAdmin implements Abortable, Closeable {
public synchronized void stopMaster() throws IOException { public synchronized void stopMaster() throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws ServiceException { public Void call(int callTimeout) throws ServiceException {
master.stopMaster(null,StopMasterRequest.newBuilder().build()); master.stopMaster(null,StopMasterRequest.newBuilder().build());
return null; return null;
} }
@ -2157,7 +2157,7 @@ public class HBaseAdmin implements Abortable, Closeable {
public ClusterStatus getClusterStatus() throws IOException { public ClusterStatus getClusterStatus() throws IOException {
return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) { return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) {
@Override @Override
public ClusterStatus call() throws ServiceException { public ClusterStatus call(int callTimeout) throws ServiceException {
GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest(); GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest();
return ClusterStatus.convert(master.getClusterStatus(null,req).getClusterStatus()); return ClusterStatus.convert(master.getClusterStatus(null,req).getClusterStatus());
} }
@ -2185,7 +2185,7 @@ public class HBaseAdmin implements Abortable, Closeable {
public void createNamespace(final NamespaceDescriptor descriptor) throws IOException { public void createNamespace(final NamespaceDescriptor descriptor) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws Exception { public Void call(int callTimeout) throws Exception {
master.createNamespace(null, master.createNamespace(null,
CreateNamespaceRequest.newBuilder() CreateNamespaceRequest.newBuilder()
.setNamespaceDescriptor(ProtobufUtil .setNamespaceDescriptor(ProtobufUtil
@ -2203,7 +2203,7 @@ public class HBaseAdmin implements Abortable, Closeable {
public void modifyNamespace(final NamespaceDescriptor descriptor) throws IOException { public void modifyNamespace(final NamespaceDescriptor descriptor) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws Exception { public Void call(int callTimeout) throws Exception {
master.modifyNamespace(null, ModifyNamespaceRequest.newBuilder(). master.modifyNamespace(null, ModifyNamespaceRequest.newBuilder().
setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build()); setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
return null; return null;
@ -2219,7 +2219,7 @@ public class HBaseAdmin implements Abortable, Closeable {
public void deleteNamespace(final String name) throws IOException { public void deleteNamespace(final String name) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws Exception { public Void call(int callTimeout) throws Exception {
master.deleteNamespace(null, DeleteNamespaceRequest.newBuilder(). master.deleteNamespace(null, DeleteNamespaceRequest.newBuilder().
setNamespaceName(name).build()); setNamespaceName(name).build());
return null; return null;
@ -2237,7 +2237,7 @@ public class HBaseAdmin implements Abortable, Closeable {
return return
executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) { executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) {
@Override @Override
public NamespaceDescriptor call() throws Exception { public NamespaceDescriptor call(int callTimeout) throws Exception {
return ProtobufUtil.toNamespaceDescriptor( return ProtobufUtil.toNamespaceDescriptor(
master.getNamespaceDescriptor(null, GetNamespaceDescriptorRequest.newBuilder(). master.getNamespaceDescriptor(null, GetNamespaceDescriptorRequest.newBuilder().
setNamespaceName(name).build()).getNamespaceDescriptor()); setNamespaceName(name).build()).getNamespaceDescriptor());
@ -2254,7 +2254,7 @@ public class HBaseAdmin implements Abortable, Closeable {
return return
executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) { executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) {
@Override @Override
public NamespaceDescriptor[] call() throws Exception { public NamespaceDescriptor[] call(int callTimeout) throws Exception {
List<HBaseProtos.NamespaceDescriptor> list = List<HBaseProtos.NamespaceDescriptor> list =
master.listNamespaceDescriptors(null, ListNamespaceDescriptorsRequest.newBuilder(). master.listNamespaceDescriptors(null, ListNamespaceDescriptorsRequest.newBuilder().
build()).getNamespaceDescriptorList(); build()).getNamespaceDescriptorList();
@ -2277,7 +2277,7 @@ public class HBaseAdmin implements Abortable, Closeable {
return return
executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) { executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
@Override @Override
public HTableDescriptor[] call() throws Exception { public HTableDescriptor[] call(int callTimeout) throws Exception {
List<TableSchema> list = List<TableSchema> list =
master.listTableDescriptorsByNamespace(null, ListTableDescriptorsByNamespaceRequest. master.listTableDescriptorsByNamespace(null, ListTableDescriptorsByNamespaceRequest.
newBuilder().setNamespaceName(name).build()).getTableSchemaList(); newBuilder().setNamespaceName(name).build()).getTableSchemaList();
@ -2301,7 +2301,7 @@ public class HBaseAdmin implements Abortable, Closeable {
return return
executeCallable(new MasterCallable<TableName[]>(getConnection()) { executeCallable(new MasterCallable<TableName[]>(getConnection()) {
@Override @Override
public TableName[] call() throws Exception { public TableName[] call(int callTimeout) throws Exception {
List<HBaseProtos.TableName> tableNames = List<HBaseProtos.TableName> tableNames =
master.listTableNamesByNamespace(null, ListTableNamesByNamespaceRequest. master.listTableNamesByNamespace(null, ListTableNamesByNamespaceRequest.
newBuilder().setNamespaceName(name).build()) newBuilder().setNamespaceName(name).build())
@ -2715,7 +2715,7 @@ public class HBaseAdmin implements Abortable, Closeable {
LOG.debug("Getting current status of snapshot from master..."); LOG.debug("Getting current status of snapshot from master...");
done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) { done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
@Override @Override
public IsSnapshotDoneResponse call() throws ServiceException { public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
return master.isSnapshotDone(null, request); return master.isSnapshotDone(null, request);
} }
}); });
@ -2744,7 +2744,7 @@ public class HBaseAdmin implements Abortable, Closeable {
// run the snapshot on the master // run the snapshot on the master
return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) { return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) {
@Override @Override
public SnapshotResponse call() throws ServiceException { public SnapshotResponse call(int callTimeout) throws ServiceException {
return master.snapshot(null, request); return master.snapshot(null, request);
} }
}); });
@ -2775,7 +2775,7 @@ public class HBaseAdmin implements Abortable, Closeable {
return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) { return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
@Override @Override
public IsSnapshotDoneResponse call() throws ServiceException { public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
return master.isSnapshotDone(null, return master.isSnapshotDone(null,
IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build()); IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
} }
@ -3026,7 +3026,7 @@ public class HBaseAdmin implements Abortable, Closeable {
ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>( ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
getConnection()) { getConnection()) {
@Override @Override
public ExecProcedureResponse call() throws ServiceException { public ExecProcedureResponse call(int callTimeout) throws ServiceException {
return master.execProcedure(null, request); return master.execProcedure(null, request);
} }
}); });
@ -3089,7 +3089,7 @@ public class HBaseAdmin implements Abortable, Closeable {
return executeCallable( return executeCallable(
new MasterCallable<IsProcedureDoneResponse>(getConnection()) { new MasterCallable<IsProcedureDoneResponse>(getConnection()) {
@Override @Override
public IsProcedureDoneResponse call() throws ServiceException { public IsProcedureDoneResponse call(int callTimeout) throws ServiceException {
return master.isProcedureDone(null, IsProcedureDoneRequest return master.isProcedureDone(null, IsProcedureDoneRequest
.newBuilder().setProcedure(desc).build()); .newBuilder().setProcedure(desc).build());
} }
@ -3135,7 +3135,7 @@ public class HBaseAdmin implements Abortable, Closeable {
done = executeCallable(new MasterCallable<IsRestoreSnapshotDoneResponse>( done = executeCallable(new MasterCallable<IsRestoreSnapshotDoneResponse>(
getConnection()) { getConnection()) {
@Override @Override
public IsRestoreSnapshotDoneResponse call() throws ServiceException { public IsRestoreSnapshotDoneResponse call(int callTimeout) throws ServiceException {
return master.isRestoreSnapshotDone(null, request); return master.isRestoreSnapshotDone(null, request);
} }
}); });
@ -3165,7 +3165,7 @@ public class HBaseAdmin implements Abortable, Closeable {
// run the snapshot restore on the master // run the snapshot restore on the master
return executeCallable(new MasterCallable<RestoreSnapshotResponse>(getConnection()) { return executeCallable(new MasterCallable<RestoreSnapshotResponse>(getConnection()) {
@Override @Override
public RestoreSnapshotResponse call() throws ServiceException { public RestoreSnapshotResponse call(int callTimeout) throws ServiceException {
return master.restoreSnapshot(null, request); return master.restoreSnapshot(null, request);
} }
}); });
@ -3179,7 +3179,7 @@ public class HBaseAdmin implements Abortable, Closeable {
public List<SnapshotDescription> listSnapshots() throws IOException { public List<SnapshotDescription> listSnapshots() throws IOException {
return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) { return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) {
@Override @Override
public List<SnapshotDescription> call() throws ServiceException { public List<SnapshotDescription> call(int callTimeout) throws ServiceException {
return master.getCompletedSnapshots(null, GetCompletedSnapshotsRequest.newBuilder().build()) return master.getCompletedSnapshots(null, GetCompletedSnapshotsRequest.newBuilder().build())
.getSnapshotsList(); .getSnapshotsList();
} }
@ -3235,7 +3235,7 @@ public class HBaseAdmin implements Abortable, Closeable {
// do the delete // do the delete
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws ServiceException { public Void call(int callTimeout) throws ServiceException {
master.deleteSnapshot(null, master.deleteSnapshot(null,
DeleteSnapshotRequest.newBuilder(). DeleteSnapshotRequest.newBuilder().
setSnapshot(SnapshotDescription.newBuilder().setName(snapshotName).build()).build()); setSnapshot(SnapshotDescription.newBuilder().setName(snapshotName).build()).build());
@ -3264,7 +3264,7 @@ public class HBaseAdmin implements Abortable, Closeable {
// do the delete // do the delete
executeCallable(new MasterCallable<Void>(getConnection()) { executeCallable(new MasterCallable<Void>(getConnection()) {
@Override @Override
public Void call() throws ServiceException { public Void call(int callTimeout) throws ServiceException {
this.master.deleteSnapshot(null, this.master.deleteSnapshot(null,
DeleteSnapshotRequest.newBuilder().setSnapshot(snapshot).build()); DeleteSnapshotRequest.newBuilder().setSnapshot(snapshot).build());
return null; return null;

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
@ -713,16 +714,26 @@ public class HTable implements HTableInterface {
*/ */
@Override @Override
public Result getRowOrBefore(final byte[] row, final byte[] family) public Result getRowOrBefore(final byte[] row, final byte[] family)
throws IOException { throws IOException {
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
tableName, row) { tableName, row) {
public Result call() throws IOException { public Result call(int callTimeout) throws IOException {
return ProtobufUtil.getRowOrBefore(getStub(), PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
getLocation().getRegionInfo().getRegionName(), row, family); controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
getLocation().getRegionInfo().getRegionName(), row, family);
try {
ClientProtos.GetResponse response = getStub().get(controller, request);
if (!response.hasResult()) return null;
return ProtobufUtil.toResult(response.getResult());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
} }
}; };
return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout); return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
} }
/** /**
* {@inheritDoc} * {@inheritDoc}
@ -771,11 +782,22 @@ public class HTable implements HTableInterface {
public Result get(final Get get) throws IOException { public Result get(final Get get) throws IOException {
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
getName(), get.getRow()) { getName(), get.getRow()) {
public Result call() throws IOException { public Result call(int callTimeout) throws IOException {
return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get); ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
ClientProtos.GetResponse response = getStub().get(controller, request);
if (response == null) return null;
return ProtobufUtil.toResult(response.getResult());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
} }
}; };
return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout); return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
} }
/** /**
@ -863,11 +885,15 @@ public class HTable implements HTableInterface {
throws IOException { throws IOException {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection, RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
tableName, delete.getRow()) { tableName, delete.getRow()) {
public Boolean call() throws IOException { public Boolean call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try { try {
MutateRequest request = RequestConverter.buildMutateRequest( MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), delete); getLocation().getRegionInfo().getRegionName(), delete);
MutateResponse response = getStub().mutate(null, request); MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed()); return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);
@ -999,16 +1025,17 @@ public class HTable implements HTableInterface {
public void mutateRow(final RowMutations rm) throws IOException { public void mutateRow(final RowMutations rm) throws IOException {
RegionServerCallable<Void> callable = RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(connection, getName(), rm.getRow()) { new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
public Void call() throws IOException { public Void call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try { try {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
getLocation().getRegionInfo().getRegionName(), rm); getLocation().getRegionInfo().getRegionName(), rm);
regionMutationBuilder.setAtomic(true); regionMutationBuilder.setAtomic(true);
MultiRequest request = MultiRequest request =
MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(); getStub().multi(controller, request);
pcrc.setPriority(tableName);
getStub().multi(null, request);
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);
} }
@ -1032,15 +1059,16 @@ public class HTable implements HTableInterface {
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable<Result> callable = RegionServerCallable<Result> callable =
new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) { new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
public Result call() throws IOException { public Result call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try { try {
MutateRequest request = RequestConverter.buildMutateRequest( MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce); getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController(); MutateResponse response = getStub().mutate(controller, request);
rpcController.setPriority(getTableName());
MutateResponse response = getStub().mutate(rpcController, request);
if (!response.hasResult()) return null; if (!response.hasResult()) return null;
return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner()); return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);
} }
@ -1062,14 +1090,15 @@ public class HTable implements HTableInterface {
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
getName(), increment.getRow()) { getName(), increment.getRow()) {
public Result call() throws IOException { public Result call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try { try {
MutateRequest request = RequestConverter.buildMutateRequest( MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce); getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController(); MutateResponse response = getStub().mutate(controller, request);
rpcController.setPriority(getTableName()); return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
MutateResponse response = getStub().mutate(rpcController, request);
return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);
} }
@ -1124,16 +1153,17 @@ public class HTable implements HTableInterface {
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable<Long> callable = RegionServerCallable<Long> callable =
new RegionServerCallable<Long>(connection, getName(), row) { new RegionServerCallable<Long>(connection, getName(), row) {
public Long call() throws IOException { public Long call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try { try {
MutateRequest request = RequestConverter.buildIncrementRequest( MutateRequest request = RequestConverter.buildIncrementRequest(
getLocation().getRegionInfo().getRegionName(), row, family, getLocation().getRegionInfo().getRegionName(), row, family,
qualifier, amount, durability, nonceGroup, nonce); qualifier, amount, durability, nonceGroup, nonce);
PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController(); MutateResponse response = getStub().mutate(controller, request);
rpcController.setPriority(getTableName());
MutateResponse response = getStub().mutate(rpcController, request);
Result result = Result result =
ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner()); ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);
@ -1153,12 +1183,15 @@ public class HTable implements HTableInterface {
throws IOException { throws IOException {
RegionServerCallable<Boolean> callable = RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) { new RegionServerCallable<Boolean>(connection, getName(), row) {
public Boolean call() throws IOException { public Boolean call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try { try {
MutateRequest request = RequestConverter.buildMutateRequest( MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier, getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), CompareType.EQUAL, put); new BinaryComparator(value), CompareType.EQUAL, put);
MutateResponse response = getStub().mutate(null, request); MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed()); return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);
@ -1178,13 +1211,16 @@ public class HTable implements HTableInterface {
throws IOException { throws IOException {
RegionServerCallable<Boolean> callable = RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) { new RegionServerCallable<Boolean>(connection, getName(), row) {
public Boolean call() throws IOException { public Boolean call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try { try {
CompareType compareType = CompareType.valueOf(compareOp.name()); CompareType compareType = CompareType.valueOf(compareOp.name());
MutateRequest request = RequestConverter.buildMutateRequest( MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier, getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), compareType, put); new BinaryComparator(value), compareType, put);
MutateResponse response = getStub().mutate(null, request); MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed()); return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);
@ -1204,12 +1240,15 @@ public class HTable implements HTableInterface {
throws IOException { throws IOException {
RegionServerCallable<Boolean> callable = RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) { new RegionServerCallable<Boolean>(connection, getName(), row) {
public Boolean call() throws IOException { public Boolean call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try { try {
MutateRequest request = RequestConverter.buildMutateRequest( MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier, getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), CompareType.EQUAL, delete); new BinaryComparator(value), CompareType.EQUAL, delete);
MutateResponse response = getStub().mutate(null, request); MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed()); return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);
@ -1229,13 +1268,16 @@ public class HTable implements HTableInterface {
throws IOException { throws IOException {
RegionServerCallable<Boolean> callable = RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) { new RegionServerCallable<Boolean>(connection, getName(), row) {
public Boolean call() throws IOException { public Boolean call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try { try {
CompareType compareType = CompareType.valueOf(compareOp.name()); CompareType compareType = CompareType.valueOf(compareOp.name());
MutateRequest request = RequestConverter.buildMutateRequest( MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier, getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), compareType, delete); new BinaryComparator(value), compareType, delete);
MutateResponse response = getStub().mutate(null, request); MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed()); return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -72,14 +71,14 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
@Override @Override
public HRegionInfo getHRegionInfo() { public HRegionInfo getHRegionInfo() {
throw new RuntimeException("Cannot get region info for multi-region request"); throw new RuntimeException("Cannot get region info for multi-region request");
}; }
MultiAction<R> getMulti() { MultiAction<R> getMulti() {
return this.multiAction; return this.multiAction;
} }
@Override @Override
public MultiResponse call() throws IOException { public MultiResponse call(int callTimeout) throws IOException {
int countOfActions = this.multiAction.size(); int countOfActions = this.multiAction.size();
if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions"); if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder(); MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
@ -118,6 +117,7 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
// optionally ferries cell response data back out again. // optionally ferries cell response data back out again.
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells); PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
controller.setPriority(getTableName()); controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
ClientProtos.MultiResponse responseProto; ClientProtos.MultiResponse responseProto;
ClientProtos.MultiRequest requestProto = multiRequestBuilder.build(); ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
try { try {

View File

@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* Used to perform Put operations for a single row. * Used to perform Put operations for a single row.
* <p> * <p>
* To perform a Put, instantiate a Put object with the row to insert to and * To perform a Put, instantiate a Put object with the row to insert to and
* for each column to be inserted, execute {@link #add(byte[], byte[], byte[]) add} or * for eachumn to be inserted, execute {@link #add(byte[], byte[], byte[]) add} or
* {@link #add(byte[], byte[], long, byte[]) add} if setting the timestamp. * {@link #add(byte[], byte[], long, byte[]) add} if setting the timestamp.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
/** /**
* Implementations call a RegionServer and implement {@link #call()}. * Implementations call a RegionServer and implement {@link #call(int)}.
* Passed to a {@link RpcRetryingCaller} so we retry on fail. * Passed to a {@link RpcRetryingCaller} so we retry on fail.
* TODO: this class is actually tied to one region, because most of the paths make use of * TODO: this class is actually tied to one region, because most of the paths make use of
* the regioninfo part of location when building requests. The only reason it works for * the regioninfo part of location when building requests. The only reason it works for

View File

@ -25,22 +25,21 @@ import java.util.concurrent.Callable;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
/** /**
* A Callable<T> that will be retried. If {@link #call()} invocation throws exceptions, * A Callable<T> that will be retried. If {@link #call(int)} invocation throws exceptions,
* we will call {@link #throwable(Throwable, boolean)} with whatever the exception was. * we will call {@link #throwable(Throwable, boolean)} with whatever the exception was.
* @param <T> * @param <T>
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface RetryingCallable<T> extends Callable<T> { public interface RetryingCallable<T> {
/** /**
* Prepare by setting up any connections to servers, etc., ahead of {@link #call()} invocation. * Prepare by setting up any connections to servers, etc., ahead of {@link #call(int)} invocation.
* @param reload Set this to true if need to requery locations (usually set on second invocation * @param reload Set this to true if need to requery locations
* to {@link #call()} or whatever
* @throws IOException e * @throws IOException e
*/ */
void prepare(final boolean reload) throws IOException; void prepare(final boolean reload) throws IOException;
/** /**
* Called when {@link #call()} throws an exception and we are going to retry; take action to * Called when {@link #call(int)} throws an exception and we are going to retry; take action to
* make it so we succeed on next call (clear caches, do relookup of locations, etc.). * make it so we succeed on next call (clear caches, do relookup of locations, etc.).
* @param t * @param t
* @param retrying True if we are in retrying mode (we are not in retrying mode when max * @param retrying True if we are in retrying mode (we are not in retrying mode when max
@ -48,6 +47,15 @@ public interface RetryingCallable<T> extends Callable<T> {
*/ */
void throwable(final Throwable t, boolean retrying); void throwable(final Throwable t, boolean retrying);
/**
* Computes a result, or throws an exception if unable to do so.
*
* @param callTimeout - the time available for this call. 0 for infinite.
* @return computed result
* @throws Exception if unable to compute a result
*/
T call(int callTimeout) throws Exception;
/** /**
* @return Some details from the implementation that we would like to add to a terminating * @return Some details from the implementation that we would like to add to a terminating
* exception; i.e. a fatal exception is being thrown ending retries and we might like to add * exception; i.e. a fatal exception is being thrown ending retries and we might like to add

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
@ -53,6 +52,10 @@ public class RpcRetryingCaller<T> {
* Timeout for the call including retries * Timeout for the call including retries
*/ */
private int callTimeout; private int callTimeout;
/**
* The remaining time, for the call to come. Takes into account the tries already done.
*/
private int remainingTime;
/** /**
* When we started making calls. * When we started making calls.
*/ */
@ -77,20 +80,20 @@ public class RpcRetryingCaller<T> {
} }
private void beforeCall() { private void beforeCall() {
int remaining = (int)(callTimeout - if (callTimeout > 0) {
(EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime)); remainingTime = (int) (callTimeout -
if (remaining < MIN_RPC_TIMEOUT) { (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime));
// If there is no time left, we're trying anyway. It's too late. if (remainingTime < MIN_RPC_TIMEOUT) {
// 0 means no timeout, and it's not the intent here. So we secure both cases by // If there is no time left, we're trying anyway. It's too late.
// resetting to the minimum. // 0 means no timeout, and it's not the intent here. So we secure both cases by
remaining = MIN_RPC_TIMEOUT; // resetting to the minimum.
remainingTime = MIN_RPC_TIMEOUT;
}
} else {
remainingTime = 0;
} }
RpcClient.setRpcTimeout(remaining);
} }
private void afterCall() {
RpcClient.resetRpcTimeout();
}
public synchronized T callWithRetries(RetryingCallable<T> callable) throws IOException, public synchronized T callWithRetries(RetryingCallable<T> callable) throws IOException,
RuntimeException { RuntimeException {
@ -114,12 +117,13 @@ public class RpcRetryingCaller<T> {
new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>(); new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
for (int tries = 0;; tries++) { for (int tries = 0;; tries++) {
long expectedSleep = 0; long expectedSleep;
try { try {
beforeCall();
callable.prepare(tries != 0); // if called with false, check table status on ZK callable.prepare(tries != 0); // if called with false, check table status on ZK
return callable.call(); beforeCall();
return callable.call(remainingTime);
} catch (Throwable t) { } catch (Throwable t) {
ExceptionUtil.rethrowIfInterrupt(t);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime=" + LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime=" +
(EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + "ms", t); (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + "ms", t);
@ -131,7 +135,6 @@ public class RpcRetryingCaller<T> {
new RetriesExhaustedException.ThrowableWithExtraContext(t, new RetriesExhaustedException.ThrowableWithExtraContext(t,
EnvironmentEdgeManager.currentTimeMillis(), toString()); EnvironmentEdgeManager.currentTimeMillis(), toString());
exceptions.add(qt); exceptions.add(qt);
ExceptionUtil.rethrowIfInterrupt(t);
if (tries >= retries - 1) { if (tries >= retries - 1) {
throw new RetriesExhaustedException(tries, exceptions); throw new RetriesExhaustedException(tries, exceptions);
} }
@ -147,8 +150,6 @@ public class RpcRetryingCaller<T> {
": " + callable.getExceptionMessageAdditionalDetail(); ": " + callable.getExceptionMessageAdditionalDetail();
throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t)); throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
} }
} finally {
afterCall();
} }
try { try {
Thread.sleep(expectedSleep); Thread.sleep(expectedSleep);
@ -159,7 +160,6 @@ public class RpcRetryingCaller<T> {
} }
/** /**
* @param expectedSleep
* @return Calculate how long a single call took * @return Calculate how long a single call took
*/ */
private long singleCallDuration(final long expectedSleep) { private long singleCallDuration(final long expectedSleep) {
@ -170,7 +170,7 @@ public class RpcRetryingCaller<T> {
/** /**
* Call the server once only. * Call the server once only.
* {@link RetryingCallable} has a strange shape so we can do retrys. Use this invocation if you * {@link RetryingCallable} has a strange shape so we can do retrys. Use this invocation if you
* want to do a single call only (A call to {@link RetryingCallable#call()} will not likely * want to do a single call only (A call to {@link RetryingCallable#call(int)} will not likely
* succeed). * succeed).
* @return an object of type T * @return an object of type T
* @throws IOException if a remote or network exception occurs * @throws IOException if a remote or network exception occurs
@ -181,9 +181,8 @@ public class RpcRetryingCaller<T> {
// The code of this method should be shared with withRetries. // The code of this method should be shared with withRetries.
this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
try { try {
beforeCall();
callable.prepare(false); callable.prepare(false);
return callable.call(); return callable.call(callTimeout);
} catch (Throwable t) { } catch (Throwable t) {
Throwable t2 = translateException(t); Throwable t2 = translateException(t);
ExceptionUtil.rethrowIfInterrupt(t2); ExceptionUtil.rethrowIfInterrupt(t2);
@ -193,8 +192,6 @@ public class RpcRetryingCaller<T> {
} else { } else {
throw new RuntimeException(t2); throw new RuntimeException(t2);
} }
} finally {
afterCall();
} }
} }

View File

@ -143,11 +143,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
} }
} }
/**
* @see java.util.concurrent.Callable#call() @Override
*/
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public Result [] call() throws IOException { public Result [] call(int callTimeout) throws IOException {
if (closed) { if (closed) {
if (scannerId != -1) { if (scannerId != -1) {
close(); close();
@ -163,8 +162,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
ScanResponse response = null; ScanResponse response = null;
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try { try {
controller.setPriority(getTableName());
response = getStub().scan(controller, request); response = getStub().scan(controller, request);
// Client and RS maintain a nextCallSeq number during the scan. Every next() call // Client and RS maintain a nextCallSeq number during the scan. Every next() call
// from client to server will increment this number in both sides. Client passes this // from client to server will increment this number in both sides. Client passes this

View File

@ -26,9 +26,6 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
/** /**
* Optionally carries Cells across the proxy/service interface down into ipc. On its * Optionally carries Cells across the proxy/service interface down into ipc. On its
* way out it optionally carries a set of result Cell data. We stick the Cells here when we want * way out it optionally carries a set of result Cell data. We stick the Cells here when we want
@ -36,7 +33,8 @@ import com.google.protobuf.RpcController;
* service chasm. Used by client and server ipc'ing. * service chasm. Used by client and server ipc'ing.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class PayloadCarryingRpcController implements RpcController, CellScannable { public class PayloadCarryingRpcController
extends TimeLimitedRpcController implements CellScannable {
/** /**
* Priority to set on this request. Set it here in controller so available composing the * Priority to set on this request. Set it here in controller so available composing the
* request. This is the ordained way of setting priorities going forward. We will be * request. This is the ordained way of setting priorities going forward. We will be
@ -46,8 +44,6 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl
// priority. // priority.
private int priority = 0; private int priority = 0;
// TODO: Fill out the rest of this class methods rather than return UnsupportedOperationException
/** /**
* They are optionally set on construction, cleared after we make the call, and then optionally * They are optionally set on construction, cleared after we make the call, and then optionally
* set on response with the result. We use this lowest common denominator access to Cells because * set on response with the result. We use this lowest common denominator access to Cells because
@ -79,41 +75,6 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl
this.cellScanner = cellScanner; this.cellScanner = cellScanner;
} }
@Override
public String errorText() {
throw new UnsupportedOperationException();
}
@Override
public boolean failed() {
throw new UnsupportedOperationException();
}
@Override
public boolean isCanceled() {
throw new UnsupportedOperationException();
}
@Override
public void notifyOnCancel(RpcCallback<Object> arg0) {
throw new UnsupportedOperationException();
}
@Override
public void reset() {
throw new UnsupportedOperationException();
}
@Override
public void setFailed(String arg0) {
throw new UnsupportedOperationException();
}
@Override
public void startCancel() {
throw new UnsupportedOperationException();
}
/** /**
* @param priority Priority for this request; should fall roughly in the range * @param priority Priority for this request; should fall roughly in the range
* {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS} * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}

View File

@ -82,7 +82,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
.setRequest(request.toByteString()).build(); .setRequest(request.toByteString()).build();
RegionServerCallable<CoprocessorServiceResponse> callable = RegionServerCallable<CoprocessorServiceResponse> callable =
new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) { new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
public CoprocessorServiceResponse call() throws Exception { public CoprocessorServiceResponse call(int callTimeout) throws Exception {
byte[] regionName = getLocation().getRegionInfo().getRegionName(); byte[] regionName = getLocation().getRegionInfo().getRegionName();
return ProtobufUtil.execService(getStub(), call, regionName); return ProtobufUtil.execService(getStub(), call, regionName);
} }

View File

@ -87,7 +87,6 @@ import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -109,6 +108,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* Does RPC against a cluster. Manages connections per regionserver in the cluster. * Does RPC against a cluster. Manages connections per regionserver in the cluster.
* <p>See HBaseServer * <p>See HBaseServer
*/ */
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@InterfaceAudience.Private @InterfaceAudience.Private
public class RpcClient { public class RpcClient {
// The LOG key is intentionally not from this package to avoid ipc logging at DEBUG (all under // The LOG key is intentionally not from this package to avoid ipc logging at DEBUG (all under
@ -131,15 +131,31 @@ public class RpcClient {
private final IPCUtil ipcUtil; private final IPCUtil ipcUtil;
protected final SocketFactory socketFactory; // how to create sockets protected final SocketFactory socketFactory; // how to create sockets
private final int connectTO;
private final int readTO;
private final int writeTO;
protected String clusterId; protected String clusterId;
protected final SocketAddress localAddr; protected final SocketAddress localAddr;
private final boolean fallbackAllowed; private final boolean fallbackAllowed;
private UserProvider userProvider; private UserProvider userProvider;
final private static String SOCKET_TIMEOUT = "ipc.socket.timeout"; final private static String SOCKET_TIMEOUT_CONNECT = "ipc.socket.timeout.connect";
final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds final static int DEFAULT_SOCKET_TIMEOUT_CONNECT = 10000; // 10 seconds
final static int PING_CALL_ID = -1; // Used by the server, for compatibility with old clients.
/**
* How long we wait when we wait for an answer. It's not the operation time, it's the time
* we wait when we start to receive an answer, when the remote write starts to send the data.
*/
final private static String SOCKET_TIMEOUT_READ = "ipc.socket.timeout.read";
final static int DEFAULT_SOCKET_TIMEOUT_READ = 20000; // 20 seconds
final private static String SOCKET_TIMEOUT_WRITE = "ipc.socket.timeout.write";
final static int DEFAULT_SOCKET_TIMEOUT_WRITE = 60000; // 60 seconds
// Used by the server, for compatibility with old clients.
// The client in 0.99+ does not ping the server.
final static int PING_CALL_ID = -1;
public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry"; public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000; public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
@ -152,18 +168,6 @@ public class RpcClient {
public static final String ALLOWS_INTERRUPTS = "hbase.ipc.client.allowsInterrupt"; public static final String ALLOWS_INTERRUPTS = "hbase.ipc.client.allowsInterrupt";
// thread-specific RPC timeout, which may override that of what was passed in.
// This is used to change dynamically the timeout (for read only) when retrying: if
// the time allowed for the operation is less than the usual socket timeout, then
// we lower the timeout. This is subject to race conditions, and should be used with
// extreme caution.
private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
}
};
/** /**
* A class to manage a list of servers that failed recently. * A class to manage a list of servers that failed recently.
*/ */
@ -231,13 +235,6 @@ public class RpcClient {
} }
} }
/**
* @return the socket timeout
*/
static int getSocketTimeout(Configuration conf) {
return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
}
/** A call waiting for a value. */ /** A call waiting for a value. */
protected class Call { protected class Call {
final int id; // call id final int id; // call id
@ -254,15 +251,47 @@ public class RpcClient {
volatile boolean done; // true when call is done volatile boolean done; // true when call is done
long startTime; long startTime;
final MethodDescriptor md; final MethodDescriptor md;
final int timeout; // timeout in millisecond for this call; 0 means infinite.
protected Call(final MethodDescriptor md, Message param, final CellScanner cells, protected Call(final MethodDescriptor md, Message param, final CellScanner cells,
final Message responseDefaultType) { final Message responseDefaultType, int timeout) {
this.param = param; this.param = param;
this.md = md; this.md = md;
this.cells = cells; this.cells = cells;
this.startTime = EnvironmentEdgeManager.currentTimeMillis(); this.startTime = EnvironmentEdgeManager.currentTimeMillis();
this.responseDefaultType = responseDefaultType; this.responseDefaultType = responseDefaultType;
this.id = callIdCnt.getAndIncrement(); this.id = callIdCnt.getAndIncrement();
this.timeout = timeout;
}
/**
* Check if the call did timeout. Set an exception (includes a notify) if it's the case.
* @return true if the call is on timeout, false otherwise.
*/
public boolean checkTimeout() {
if (timeout == 0){
return false;
}
long waitTime = EnvironmentEdgeManager.currentTimeMillis() - getStartTime();
if (waitTime >= timeout) {
IOException ie = new CallTimeoutException("Call id=" + id +
", waitTime=" + waitTime + ", operationTimeout=" + timeout + " expired.");
setException(ie); // includes a notify
return true;
} else {
return false;
}
}
public int remainingTime() {
if (timeout == 0) {
return Integer.MAX_VALUE;
}
int remaining = timeout - (int) (EnvironmentEdgeManager.currentTimeMillis() - getStartTime());
return remaining > 0 ? remaining : 0;
} }
@Override @Override
@ -345,6 +374,7 @@ public class RpcClient {
/** Thread that reads responses and notifies callers. Each connection owns a /** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this * socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */ * socket: responses may be delivered out of order. */
@SuppressWarnings("SynchronizeOnNonFinalField")
protected class Connection extends Thread { protected class Connection extends Thread {
private ConnectionHeader header; // connection header private ConnectionHeader header; // connection header
protected ConnectionId remoteId; protected ConnectionId remoteId;
@ -414,8 +444,7 @@ public class RpcClient {
setName(name + " - writer"); setName(name + " - writer");
} }
public void cancel(CallFuture cts){ public void remove(CallFuture cts){
cts.call.done = true;
callsToWrite.remove(cts); callsToWrite.remove(cts);
calls.remove(cts.call.id); calls.remove(cts.call.id);
} }
@ -442,15 +471,8 @@ public class RpcClient {
continue; continue;
} }
if (remoteId.rpcTimeout > 0) { if (cts.call.checkTimeout()) {
long waitTime = EnvironmentEdgeManager.currentTimeMillis() - cts.call.getStartTime(); continue;
if (waitTime >= remoteId.rpcTimeout) {
IOException ie = new CallTimeoutException("Call id=" + cts.call.id +
", waitTime=" + waitTime + ", rpcTimetout=" + remoteId.rpcTimeout +
", expired before being sent to the server.");
cts.call.setException(ie); // includes a notify
continue;
}
} }
try { try {
@ -595,10 +617,8 @@ public class RpcClient {
if (localAddr != null) { if (localAddr != null) {
this.socket.bind(localAddr); this.socket.bind(localAddr);
} }
// connection time out is 20s NetUtils.connect(this.socket, remoteId.getAddress(), connectTO);
NetUtils.connect(this.socket, remoteId.getAddress(), this.socket.setSoTimeout(readTO);
getSocketTimeout(conf));
this.socket.setSoTimeout(remoteId.rpcTimeout);
return; return;
} catch (SocketTimeoutException toe) { } catch (SocketTimeoutException toe) {
/* The max number of retries is 45, /* The max number of retries is 45,
@ -883,10 +903,8 @@ public class RpcClient {
while (true) { while (true) {
setupConnection(); setupConnection();
InputStream inStream = NetUtils.getInputStream(socket); InputStream inStream = NetUtils.getInputStream(socket);
// This creates a socket with a write timeout. This timeout cannot be changed, // This creates a socket with a write timeout. This timeout cannot be changed.
// RpcClient allows to change the timeout dynamically, but we can only OutputStream outStream = NetUtils.getOutputStream(socket, writeTO);
// change the read timeout today.
OutputStream outStream = NetUtils.getOutputStream(socket, remoteId.rpcTimeout);
// Write out the preamble -- MAGIC, version, and auth to use. // Write out the preamble -- MAGIC, version, and auth to use.
writeConnectionHeaderPreamble(outStream); writeConnectionHeaderPreamble(outStream);
if (useSasl) { if (useSasl) {
@ -1005,7 +1023,7 @@ public class RpcClient {
LOG.debug(getName() + ": closing ipc connection to " + server); LOG.debug(getName() + ": closing ipc connection to " + server);
} }
cleanupCalls(); cleanupCalls(true);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": ipc connection to " + server + " closed"); LOG.debug(getName() + ": ipc connection to " + server + " closed");
@ -1025,8 +1043,6 @@ public class RpcClient {
* Initiates a call by sending the parameter to the remote server. * Initiates a call by sending the parameter to the remote server.
* Note: this is not called from the Connection thread, but by other * Note: this is not called from the Connection thread, but by other
* threads. * threads.
* @param call
* @param priority
* @see #readResponse() * @see #readResponse()
*/ */
private void writeRequest(Call call, final int priority, Span span) throws IOException { private void writeRequest(Call call, final int priority, Span span) throws IOException {
@ -1143,7 +1159,7 @@ public class RpcClient {
if (expectedCall) call.setResponse(value, cellBlockScanner); if (expectedCall) call.setResponse(value, cellBlockScanner);
} }
} catch (IOException e) { } catch (IOException e) {
if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) { if (e instanceof SocketTimeoutException) {
// Clean up open calls but don't treat this as a fatal condition, // Clean up open calls but don't treat this as a fatal condition,
// since we expect certain responses to not make it by the specified // since we expect certain responses to not make it by the specified
// {@link ConnectionId#rpcTimeout}. // {@link ConnectionId#rpcTimeout}.
@ -1152,7 +1168,7 @@ public class RpcClient {
markClosed(e); markClosed(e);
} }
} finally { } finally {
cleanupCalls(remoteId.rpcTimeout); cleanupCalls(false);
} }
} }
@ -1166,7 +1182,7 @@ public class RpcClient {
} }
/** /**
* @param e * @param e exception to be wrapped
* @return RemoteException made from passed <code>e</code> * @return RemoteException made from passed <code>e</code>
*/ */
private RemoteException createRemoteException(final ExceptionResponse e) { private RemoteException createRemoteException(final ExceptionResponse e) {
@ -1195,46 +1211,32 @@ public class RpcClient {
} }
} }
/* Cleanup all calls and mark them as done */
protected void cleanupCalls() {
cleanupCalls(-1);
}
/** /**
* Cleanup the calls older than a given timeout, in milli seconds. * Cleanup the calls older than a given timeout, in milli seconds.
* @param rpcTimeout -1 for all calls, > 0 otherwise. 0 means no timeout and does nothing. * @param allCalls for all calls,
*/ */
protected synchronized void cleanupCalls(long rpcTimeout) { protected synchronized void cleanupCalls(boolean allCalls) {
if (rpcTimeout == 0) return;
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator(); Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
while (itor.hasNext()) { while (itor.hasNext()) {
Call c = itor.next().getValue(); Call c = itor.next().getValue();
long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime(); if (c.done) {
if (rpcTimeout < 0) { // To catch the calls without timeout that were cancelled.
itor.remove();
} else if (allCalls) {
long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime();
IOException ie = new IOException("Call id=" + c.id + ", waitTime=" + waitTime); IOException ie = new IOException("Call id=" + c.id + ", waitTime=" + waitTime);
c.setException(ie); c.setException(ie);
itor.remove(); itor.remove();
} else if (waitTime >= rpcTimeout) { } else if (c.checkTimeout()) {
IOException ie = new CallTimeoutException("Call id=" + c.id +
", waitTime=" + waitTime + ", rpcTimeout=" + rpcTimeout);
c.setException(ie);
itor.remove(); itor.remove();
} else { } else {
// This relies on the insertion order to be the call id order. This is not // We expect the call to be ordered by timeout. It may not be the case, but stopping
// true under 'difficult' conditions (gc, ...). // at the first valid call allows to be sure that we still have something to do without
rpcTimeout -= waitTime; // spending too much time by reading the full list.
break; break;
} }
} }
if (!shouldCloseConnection.get() && socket != null && rpcTimeout > 0) {
try {
socket.setSoTimeout((int)rpcTimeout);
} catch (SocketException e) {
LOG.warn("Couldn't change timeout, which may result in longer than expected calls");
}
}
} }
} }
@ -1253,7 +1255,7 @@ public class RpcClient {
/** /**
* Construct an IPC cluster client whose values are of the {@link Message} class. * Construct an IPC cluster client whose values are of the {@link Message} class.
* @param conf configuration * @param conf configuration
* @param clusterId * @param clusterId the cluster id
* @param factory socket factory * @param factory socket factory
*/ */
RpcClient(Configuration conf, String clusterId, SocketFactory factory) { RpcClient(Configuration conf, String clusterId, SocketFactory factory) {
@ -1263,7 +1265,7 @@ public class RpcClient {
/** /**
* Construct an IPC cluster client whose values are of the {@link Message} class. * Construct an IPC cluster client whose values are of the {@link Message} class.
* @param conf configuration * @param conf configuration
* @param clusterId * @param clusterId the cluster id
* @param factory socket factory * @param factory socket factory
* @param localAddr client socket bind address * @param localAddr client socket bind address
*/ */
@ -1286,22 +1288,30 @@ public class RpcClient {
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.localAddr = localAddr; this.localAddr = localAddr;
this.userProvider = UserProvider.instantiate(conf); this.userProvider = UserProvider.instantiate(conf);
this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
// login the server principal (if using secure Hadoop) // login the server principal (if using secure Hadoop)
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
", tcpKeepAlive=" + this.tcpKeepAlive + ", tcpKeepAlive=" + this.tcpKeepAlive +
", tcpNoDelay=" + this.tcpNoDelay + ", tcpNoDelay=" + this.tcpNoDelay +
", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose + ", connectTO=" + this.connectTO +
", maxRetries=" + this.maxRetries + ", readTO=" + this.readTO +
", fallbackAllowed=" + this.fallbackAllowed + ", writeTO=" + this.writeTO +
", bind address=" + (this.localAddr != null ? this.localAddr : "null")); ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
", maxRetries=" + this.maxRetries +
", fallbackAllowed=" + this.fallbackAllowed +
", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
} }
} }
/** /**
* Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory * Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory
* @param conf configuration * @param conf configuration
* @param clusterId * @param clusterId the cluster id
*/ */
public RpcClient(Configuration conf, String clusterId) { public RpcClient(Configuration conf, String clusterId) {
this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null); this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null);
@ -1310,7 +1320,7 @@ public class RpcClient {
/** /**
* Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory * Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory
* @param conf configuration * @param conf configuration
* @param clusterId * @param clusterId the cluster id
* @param localAddr client socket bind address. * @param localAddr client socket bind address.
*/ */
public RpcClient(Configuration conf, String clusterId, SocketAddress localAddr) { public RpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
@ -1343,7 +1353,7 @@ public class RpcClient {
/** /**
* Encapsulate the ugly casting and RuntimeException conversion in private method. * Encapsulate the ugly casting and RuntimeException conversion in private method.
* @param conf * @param conf configuration
* @return The compressor to use on this client. * @return The compressor to use on this client.
*/ */
private static CompressionCodec getCompressor(final Configuration conf) { private static CompressionCodec getCompressor(final Configuration conf) {
@ -1380,21 +1390,13 @@ public class RpcClient {
* Return the pool size specified in the configuration, which is applicable only if * Return the pool size specified in the configuration, which is applicable only if
* the pool type is {@link PoolType#RoundRobin}. * the pool type is {@link PoolType#RoundRobin}.
* *
* @param config * @param config configuration
* @return the maximum pool size * @return the maximum pool size
*/ */
protected static int getPoolSize(Configuration config) { protected static int getPoolSize(Configuration config) {
return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
} }
/** Return the socket factory of this client
*
* @return this client's socket factory
*/
SocketFactory getSocketFactory() {
return socketFactory;
}
/** Stop all threads related to this client. No further calls may be made /** Stop all threads related to this client. No further calls may be made
* using this client. */ * using this client. */
public void stop() { public void stop() {
@ -1432,25 +1434,19 @@ public class RpcClient {
* with the <code>ticket</code> credentials, returning the value. * with the <code>ticket</code> credentials, returning the value.
* Throws exceptions if there are network problems or if the remote code * Throws exceptions if there are network problems or if the remote code
* threw an exception. * threw an exception.
* @param md
* @param param
* @param cells
* @param addr
* @param returnType
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection. * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
* new Connection each time. * new Connection each time.
* @param rpcTimeout
* @return A pair with the Message response and the Cell data (if any). * @return A pair with the Message response and the Cell data (if any).
* @throws InterruptedException * @throws InterruptedException
* @throws IOException * @throws IOException
*/ */
Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells, Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout, int priority) Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Call call = new Call(md, param, cells, returnType); Call call = new Call(md, param, cells, returnType, callTimeout);
Connection connection = Connection connection =
getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor); getConnection(ticket, call, addr, this.codec, this.compressor);
CallFuture cts = null; CallFuture cts = null;
if (connection.callSender != null){ if (connection.callSender != null){
@ -1460,16 +1456,17 @@ public class RpcClient {
} }
while (!call.done) { while (!call.done) {
if (call.checkTimeout()) {
if (cts != null) connection.callSender.remove(cts);
break;
}
try { try {
synchronized (call) { synchronized (call) {
call.wait(1000); // wait for the result. We will be notified by the reader. call.wait(Math.min(call.remainingTime(), 1000) + 1);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (cts != null) { call.setException(new InterruptedIOException());
connection.callSender.cancel(cts); if (cts != null) connection.callSender.remove(cts);
} else {
call.done = true;
}
throw e; throw e;
} }
} }
@ -1487,7 +1484,6 @@ public class RpcClient {
} }
/** /**
* Take an IOException and the address we were trying to connect to * Take an IOException and the address we were trying to connect to
* and return an IOException with the input exception as the cause. * and return an IOException with the input exception as the cause.
@ -1523,7 +1519,7 @@ public class RpcClient {
* process died) or no route to host: i.e. their next retries should be faster and with a * process died) or no route to host: i.e. their next retries should be faster and with a
* safe exception. * safe exception.
*/ */
public void cancelConnections(String hostname, int port, IOException ioe) { public void cancelConnections(String hostname, int port) {
synchronized (connections) { synchronized (connections) {
for (Connection connection : connections.values()) { for (Connection connection : connections.values()) {
if (connection.isAlive() && if (connection.isAlive() &&
@ -1540,15 +1536,15 @@ public class RpcClient {
/** /**
* Get a connection from the pool, or create a new one and add it to the * Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given host/port are reused. * pool. Connections to a given host/port are reused.
*/ */
protected Connection getConnection(User ticket, Call call, InetSocketAddress addr, protected Connection getConnection(User ticket, Call call, InetSocketAddress addr,
int rpcTimeout, final Codec codec, final CompressionCodec compressor) final Codec codec, final CompressionCodec compressor)
throws IOException { throws IOException {
if (!running.get()) throw new StoppedRpcClientException(); if (!running.get()) throw new StoppedRpcClientException();
Connection connection; Connection connection;
ConnectionId remoteId = ConnectionId remoteId =
new ConnectionId(ticket, call.md.getService().getName(), addr, rpcTimeout); new ConnectionId(ticket, call.md.getService().getName(), addr);
synchronized (connections) { synchronized (connections) {
connection = connections.get(remoteId); connection = connections.get(remoteId);
if (connection == null) { if (connection == null) {
@ -1576,17 +1572,12 @@ public class RpcClient {
protected static class ConnectionId { protected static class ConnectionId {
final InetSocketAddress address; final InetSocketAddress address;
final User ticket; final User ticket;
final int rpcTimeout;
private static final int PRIME = 16777619; private static final int PRIME = 16777619;
final String serviceName; final String serviceName;
ConnectionId(User ticket, ConnectionId(User ticket, String serviceName, InetSocketAddress address) {
String serviceName,
InetSocketAddress address,
int rpcTimeout) {
this.address = address; this.address = address;
this.ticket = ticket; this.ticket = ticket;
this.rpcTimeout = rpcTimeout;
this.serviceName = serviceName; this.serviceName = serviceName;
} }
@ -1604,8 +1595,7 @@ public class RpcClient {
@Override @Override
public String toString() { public String toString() {
return this.address.toString() + "/" + this.serviceName + "/" + this.ticket + "/" + return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
this.rpcTimeout;
} }
@Override @Override
@ -1614,7 +1604,7 @@ public class RpcClient {
ConnectionId id = (ConnectionId) obj; ConnectionId id = (ConnectionId) obj;
return address.equals(id.address) && return address.equals(id.address) &&
((ticket != null && ticket.equals(id.ticket)) || ((ticket != null && ticket.equals(id.ticket)) ||
(ticket == id.ticket)) && rpcTimeout == id.rpcTimeout && (ticket == id.ticket)) &&
this.serviceName == id.serviceName; this.serviceName == id.serviceName;
} }
return false; return false;
@ -1624,28 +1614,11 @@ public class RpcClient {
public int hashCode() { public int hashCode() {
int hashcode = (address.hashCode() + int hashcode = (address.hashCode() +
PRIME * (PRIME * this.serviceName.hashCode() ^ PRIME * (PRIME * this.serviceName.hashCode() ^
(ticket == null ? 0 : ticket.hashCode()) )) ^ (ticket == null ? 0 : ticket.hashCode()) ));
rpcTimeout;
return hashcode; return hashcode;
} }
} }
public static void setRpcTimeout(int t) {
rpcTimeout.set(t);
}
/**
* Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given
* default timeout.
*/
public static int getRpcTimeout(int defaultTimeout) {
return Math.min(defaultTimeout, rpcTimeout.get());
}
public static void resetRpcTimeout() {
rpcTimeout.remove();
}
/** /**
* Make a blocking call. Throws exceptions if there are network problems or if the remote code * Make a blocking call. Throws exceptions if there are network problems or if the remote code
* threw an exception. * threw an exception.
@ -1654,24 +1627,24 @@ public class RpcClient {
* new Connection each time. * new Connection each time.
* @return A pair with the Message response and the Cell data (if any). * @return A pair with the Message response and the Cell data (if any).
*/ */
Message callBlockingMethod(MethodDescriptor md, RpcController controller, Message callBlockingMethod(MethodDescriptor md, PayloadCarryingRpcController pcrc,
Message param, Message returnType, final User ticket, final InetSocketAddress isa, Message param, Message returnType, final User ticket, final InetSocketAddress isa)
final int rpcTimeout)
throws ServiceException { throws ServiceException {
long startTime = 0; long startTime = 0;
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
startTime = EnvironmentEdgeManager.currentTimeMillis(); startTime = EnvironmentEdgeManager.currentTimeMillis();
} }
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller; int callTimeout = 0;
CellScanner cells = null; CellScanner cells = null;
if (pcrc != null) { if (pcrc != null) {
callTimeout = pcrc.getCallTimeout();
cells = pcrc.cellScanner(); cells = pcrc.cellScanner();
// Clear it here so we don't by mistake try and these cells processing results. // Clear it here so we don't by mistake try and these cells processing results.
pcrc.setCellScanner(null); pcrc.setCellScanner(null);
} }
Pair<Message, CellScanner> val; Pair<Message, CellScanner> val;
try { try {
val = call(md, param, cells, returnType, ticket, isa, rpcTimeout, val = call(md, param, cells, returnType, ticket, isa, callTimeout,
pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS); pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
if (pcrc != null) { if (pcrc != null) {
// Shove the results into controller so can be carried across the proxy/pb service void. // Shove the results into controller so can be carried across the proxy/pb service void.
@ -1696,8 +1669,8 @@ public class RpcClient {
* @return A blocking rpc channel that goes via this rpc client instance. * @return A blocking rpc channel that goes via this rpc client instance.
*/ */
public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn,
final User ticket, final int rpcTimeout) { final User ticket, int defaultOperationTimeout) {
return new BlockingRpcChannelImplementation(this, sn, ticket, rpcTimeout); return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
} }
/** /**
@ -1707,25 +1680,36 @@ public class RpcClient {
public static class BlockingRpcChannelImplementation implements BlockingRpcChannel { public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
private final InetSocketAddress isa; private final InetSocketAddress isa;
private final RpcClient rpcClient; private final RpcClient rpcClient;
private final int rpcTimeout;
private final User ticket; private final User ticket;
private final int defaultOperationTimeout;
/**
* @param defaultOperationTimeout - the default timeout when no timeout is given
* by the caller.
*/
protected BlockingRpcChannelImplementation(final RpcClient rpcClient, final ServerName sn, protected BlockingRpcChannelImplementation(final RpcClient rpcClient, final ServerName sn,
final User ticket, final int rpcTimeout) { final User ticket, int defaultOperationTimeout) {
this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
this.rpcClient = rpcClient; this.rpcClient = rpcClient;
// Set the rpc timeout to be the minimum of configured timeout and whatever the current
// thread local setting is.
this.rpcTimeout = getRpcTimeout(rpcTimeout);
this.ticket = ticket; this.ticket = ticket;
this.defaultOperationTimeout = defaultOperationTimeout;
} }
@Override @Override
public Message callBlockingMethod(MethodDescriptor md, RpcController controller, public Message callBlockingMethod(MethodDescriptor md, RpcController controller,
Message param, Message returnType) Message param, Message returnType) throws ServiceException {
throws ServiceException { PayloadCarryingRpcController pcrc;
return this.rpcClient.callBlockingMethod(md, controller, param, returnType, this.ticket, if (controller != null) {
this.isa, this.rpcTimeout); pcrc = (PayloadCarryingRpcController) controller;
if (!pcrc.hasCallTimeout()){
pcrc.setCallTimeout(defaultOperationTimeout);
}
} else {
pcrc = new PayloadCarryingRpcController();
pcrc.setCallTimeout(defaultOperationTimeout);
}
return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
} }
} }
} }

View File

@ -1428,28 +1428,6 @@ public final class ProtobufUtil {
// Start helpers for Client // Start helpers for Client
/**
* A helper to invoke a Get using client protocol.
*
* @param client
* @param regionName
* @param get
* @return the result of the Get
* @throws IOException
*/
public static Result get(final ClientService.BlockingInterface client,
final byte[] regionName, final Get get) throws IOException {
GetRequest request =
RequestConverter.buildGetRequest(regionName, get);
try {
GetResponse response = client.get(null, request);
if (response == null) return null;
return toResult(response.getResult());
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/** /**
* A helper to get a row of the closet one before using client protocol. * A helper to get a row of the closet one before using client protocol.
* *

View File

@ -2042,8 +2042,7 @@ public class RpcServer implements RpcServerInterface {
status.getClient(), startTime, processingTime, qTime, status.getClient(), startTime, processingTime, qTime,
responseSize); responseSize);
} }
return new Pair<Message, CellScanner>(result, return new Pair<Message, CellScanner>(result, controller.cellScanner());
controller != null? controller.cellScanner(): null);
} catch (Throwable e) { } catch (Throwable e) {
// The above callBlockingMethod will always return a SE. Strip the SE wrapper before // The above callBlockingMethod will always return a SE. Strip the SE wrapper before
// putting it on the wire. Its needed to adhere to the pb Service Interface but we don't // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't
@ -2239,7 +2238,7 @@ public class RpcServer implements RpcServerInterface {
/** /**
* Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)} * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
* and {@link #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer)}. Only * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
* one of readCh or writeCh should be non-null. * one of readCh or writeCh should be non-null.
* *
* @param readCh read channel * @param readCh read channel
@ -2248,7 +2247,7 @@ public class RpcServer implements RpcServerInterface {
* @return bytes written * @return bytes written
* @throws java.io.IOException e * @throws java.io.IOException e
* @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer) * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
* @see #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer) * @see #channelWrite(GatheringByteChannel, BufferChain)
*/ */
private static int channelIO(ReadableByteChannel readCh, private static int channelIO(ReadableByteChannel readCh,
WritableByteChannel writeCh, WritableByteChannel writeCh,

View File

@ -596,7 +596,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
final RegionServerCallable<Boolean> svrCallable = final RegionServerCallable<Boolean> svrCallable =
new RegionServerCallable<Boolean>(conn, tableName, first) { new RegionServerCallable<Boolean>(conn, tableName, first) {
@Override @Override
public Boolean call() throws Exception { public Boolean call(int callTimeout) throws Exception {
SecureBulkLoadClient secureClient = null; SecureBulkLoadClient secureClient = null;
boolean success = false; boolean success = false;

View File

@ -60,8 +60,8 @@ public class ReplicationProtbufUtil {
final HLog.Entry[] entries) throws IOException { final HLog.Entry[] entries) throws IOException {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
buildReplicateWALEntryRequest(entries); buildReplicateWALEntryRequest(entries);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
try { try {
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
admin.replicateWALEntry(controller, p.getFirst()); admin.replicateWALEntry(controller, p.getFirst());
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);

View File

@ -437,7 +437,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
// A sleeper that sleeps for msgInterval. // A sleeper that sleeps for msgInterval.
private final Sleeper sleeper; private final Sleeper sleeper;
private final int rpcTimeout; private final int operationTimeout;
private final RegionServerAccounting regionServerAccounting; private final RegionServerAccounting regionServerAccounting;
@ -555,7 +555,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
this.numRegionsToReport = conf.getInt( this.numRegionsToReport = conf.getInt(
"hbase.regionserver.numregionstoreport", 10); "hbase.regionserver.numregionstoreport", 10);
this.rpcTimeout = conf.getInt( this.operationTimeout = conf.getInt(
HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
@ -1972,7 +1972,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
new InetSocketAddress(sn.getHostname(), sn.getPort()); new InetSocketAddress(sn.getHostname(), sn.getPort());
try { try {
BlockingRpcChannel channel = BlockingRpcChannel channel =
this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), this.rpcTimeout); this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), operationTimeout);
intf = RegionServerStatusService.newBlockingStub(channel); intf = RegionServerStatusService.newBlockingStub(channel);
break; break;
} catch (IOException e) { } catch (IOException e) {

View File

@ -423,7 +423,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
* RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br> * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
* At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
* @param numTasks current total number of available tasks * @param numTasks current total number of available tasks
* @return
*/ */
private int calculateAvailableSplitters(int numTasks) { private int calculateAvailableSplitters(int numTasks) {
// at lease one RS(itself) available // at lease one RS(itself) available

View File

@ -1861,7 +1861,6 @@ public class HLogSplitter {
* Tag original sequence number for each edit to be replayed * Tag original sequence number for each edit to be replayed
* @param entry * @param entry
* @param cell * @param cell
* @return
*/ */
private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) { private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) {
// Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet

View File

@ -191,7 +191,7 @@ public class WALEditsReplaySink {
} }
@Override @Override
public ReplicateWALEntryResponse call() throws IOException { public ReplicateWALEntryResponse call(int callTimeout) throws IOException {
try { try {
replayToServer(this.regionInfo, this.entries); replayToServer(this.regionInfo, this.entries);
} catch (ServiceException se) { } catch (ServiceException se) {
@ -210,8 +210,8 @@ public class WALEditsReplaySink {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray); ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
try { try {
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
remoteSvr.replay(controller, p.getFirst()); remoteSvr.replay(controller, p.getFirst());
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);

View File

@ -40,20 +40,26 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation; import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
@ -100,6 +106,25 @@ public class TestHCM {
private static final byte[] ROW_X = Bytes.toBytes("xxx"); private static final byte[] ROW_X = Bytes.toBytes("xxx");
private static Random _randy = new Random(); private static Random _randy = new Random();
/**
* This copro sleeps 20 second. The first call it fails. The second time, it works.
*/
public static class SleepAndFailFirstTime extends BaseRegionObserver {
static final AtomicLong ct = new AtomicLong(0);
public SleepAndFailFirstTime() {
}
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
Threads.sleep(20000);
if (ct.incrementAndGet() == 1){
throw new IOException("first call I fail");
}
}
}
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
@ -165,7 +190,7 @@ public class TestHCM {
t.close(); t.close();
con1.close(); con1.close();
// if the pool was created on demand it should be closed upon connectin close // if the pool was created on demand it should be closed upon connection close
assertTrue(pool.isShutdown()); assertTrue(pool.isShutdown());
con2.close(); con2.close();
@ -244,6 +269,41 @@ public class TestHCM {
testConnectionClose(false); testConnectionClose(false);
} }
/**
* Test that an operation can fail if we read the global operation timeout, even if the
* individual timeout is fine. We do that with:
* - client side: an operation timeout of 30 seconds
* - server side: we sleep 20 second at each attempt. The first work fails, the second one
* succeeds. But the client won't wait that much, because 20 + 20 > 30, so the client
* timeouted when the server answers.
*/
@Test
public void testOperationTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout");
hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
HTable table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration());
// Check that it works if the timeout is big enough
table.setOperationTimeout(120 * 1000);
table.get(new Get(FAM_NAM));
// Resetting and retrying. Will fail this time, not enough time for the second try
SleepAndFailFirstTime.ct.set(0);
try {
table.setOperationTimeout(30 * 1000);
table.get(new Get(FAM_NAM));
Assert.fail("We expect an exception here");
} catch (SocketTimeoutException e) {
// The client has a CallTimeout class, but it's not shared.We're not very clean today,
// in the general case you can expect the call to stop, but the exception may vary.
// In this test however, we're sure that it will be a socket timeout.
LOG.info("We received an exception, as expected ", e);
} catch (IOException e) {
Assert.fail("Wrong exception:" + e.getMessage());
}
}
private void testConnectionClose(boolean allowsInterrupt) throws Exception { private void testConnectionClose(boolean allowsInterrupt) throws Exception {
String tableName = "HCM-testConnectionClose" + allowsInterrupt; String tableName = "HCM-testConnectionClose" + allowsInterrupt;
TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close(); TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
@ -302,12 +362,12 @@ public class TestHCM {
LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn); LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
for (int i = 0; i < 5000; i++) { for (int i = 0; i < 5000; i++) {
rpcClient.cancelConnections(sn.getHostname(), sn.getPort(), null); rpcClient.cancelConnections(sn.getHostname(), sn.getPort());
Thread.sleep(5); Thread.sleep(5);
} }
step.compareAndSet(1, 2); step.compareAndSet(1, 2);
// The test may fail here if the thread doing the gets is stuck. The wait to find // The test may fail here if the thread doing the gets is stuck. The way to find
// out what's happening is to look for the thread named 'testConnectionCloseThread' // out what's happening is to look for the thread named 'testConnectionCloseThread'
TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() { TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
@Override @Override

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
@ -155,7 +156,9 @@ public class TestEndToEndSplitTransaction {
byte[] regionName = con.relocateRegion(tableName, row).getRegionInfo() byte[] regionName = con.relocateRegion(tableName, row).getRegionInfo()
.getRegionName(); .getRegionName();
// get and scan should now succeed without exception // get and scan should now succeed without exception
ProtobufUtil.get(server, regionName, new Get(row)); ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(regionName, new Get(row));
server.get(null, request);
ScanRequest scanRequest = RequestConverter.buildScanRequest( ScanRequest scanRequest = RequestConverter.buildScanRequest(
regionName, new Scan(row), 1, true); regionName, new Scan(row), 1, true);
try { try {
@ -165,6 +168,8 @@ public class TestEndToEndSplitTransaction {
} }
} catch (IOException x) { } catch (IOException x) {
return false; return false;
} catch (ServiceException e) {
return false;
} }
return true; return true;
} }

View File

@ -158,7 +158,7 @@ public class TestHRegionServerBulkLoad {
RegionServerCallable<Void> callable = RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes("aaa")) { new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes("aaa")) {
@Override @Override
public Void call() throws Exception { public Void call(int callTimeout) throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row " LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow())); + Bytes.toStringBinary(getRow()));
byte[] regionName = getLocation().getRegionInfo().getRegionName(); byte[] regionName = getLocation().getRegionInfo().getRegionName();
@ -177,7 +177,7 @@ public class TestHRegionServerBulkLoad {
// 10 * 50 = 500 open file handles! // 10 * 50 = 500 open file handles!
callable = new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes("aaa")) { callable = new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes("aaa")) {
@Override @Override
public Void call() throws Exception { public Void call(int callTimeout) throws Exception {
LOG.debug("compacting " + getLocation() + " for row " LOG.debug("compacting " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow())); + Bytes.toStringBinary(getRow()));
AdminProtos.AdminService.BlockingInterface server = AdminProtos.AdminService.BlockingInterface server =