From 21ddb60dc0af51f018c68e84b2e8969380e177b8 Mon Sep 17 00:00:00 2001 From: nkeywal Date: Tue, 25 Feb 2014 16:28:57 +0000 Subject: [PATCH] HBASE-10566 cleanup rpcTimeout in the client git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1571727 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/client/ClientSmallScanner.java | 11 +- .../hbase/client/ConnectionManager.java | 15 +- .../client/DelegatingRetryingCallable.java | 4 +- .../hadoop/hbase/client/HBaseAdmin.java | 62 ++-- .../apache/hadoop/hbase/client/HTable.java | 118 ++++--- .../hbase/client/MultiServerCallable.java | 6 +- .../org/apache/hadoop/hbase/client/Put.java | 2 +- .../hbase/client/RegionServerCallable.java | 2 +- .../hadoop/hbase/client/RetryingCallable.java | 20 +- .../hbase/client/RpcRetryingCaller.java | 45 ++- .../hadoop/hbase/client/ScannerCallable.java | 10 +- .../ipc/PayloadCarryingRpcController.java | 43 +-- .../ipc/RegionCoprocessorRpcChannel.java | 2 +- .../apache/hadoop/hbase/ipc/RpcClient.java | 298 +++++++++--------- .../hadoop/hbase/protobuf/ProtobufUtil.java | 22 -- .../apache/hadoop/hbase/ipc/RpcServer.java | 7 +- .../mapreduce/LoadIncrementalHFiles.java | 2 +- .../protobuf/ReplicationProtbufUtil.java | 2 +- .../hbase/regionserver/HRegionServer.java | 6 +- .../hbase/regionserver/SplitLogWorker.java | 1 - .../hbase/regionserver/wal/HLogSplitter.java | 1 - .../regionserver/wal/WALEditsReplaySink.java | 4 +- .../apache/hadoop/hbase/client/TestHCM.java | 66 +++- .../TestEndToEndSplitTransaction.java | 7 +- .../TestHRegionServerBulkLoad.java | 4 +- 25 files changed, 394 insertions(+), 366 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index a17be55bb3e..77fd2aa3186 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -165,16 +165,15 @@ public class ClientSmallScanner extends ClientScanner { this.scan.setStartRow(localStartKey); RegionServerCallable callable = new RegionServerCallable( getConnection(), getTable(), scan.getStartRow()) { - public Result[] call() throws IOException { + public Result[] call(int callTimeout) throws IOException { ScanRequest request = RequestConverter.buildScanRequest(getLocation() .getRegionInfo().getRegionName(), scan, cacheNum, true); - ScanResponse response = null; PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); try { - controller.setPriority(getTableName()); - response = getStub().scan(controller, request); - return ResponseConverter.getResults(controller.cellScanner(), - response); + ScanResponse response = getStub().scan(controller, request); + return ResponseConverter.getResults(controller.cellScanner(), response); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 95ee1c19ac2..cef0945261c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -608,9 +608,7 @@ class ConnectionManager { @Override public void newDead(ServerName sn) { clearCaches(sn); - rpcClient.cancelConnections(sn.getHostname(), sn.getPort(), - new SocketException(sn.getServerName() + - " is dead: closing its connection.")); + rpcClient.cancelConnections(sn.getHostname(), sn.getPort()); } }, conf, listenerClass); } @@ -1516,8 +1514,7 @@ class ConnectionManager { synchronized (connectionLock.get(key)) { stub = stubs.get(key); if (stub == null) { - BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, - user, rpcTimeout); + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); stub = makeStub(channel); isMasterRunning(); stubs.put(key, stub); @@ -1635,8 +1632,8 @@ class ConnectionManager { synchronized (this.connectionLock.get(key)) { stub = (AdminService.BlockingInterface)this.stubs.get(key); if (stub == null) { - BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName, - user, this.rpcTimeout); + BlockingRpcChannel channel = + this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); stub = AdminService.newBlockingStub(channel); this.stubs.put(key, stub); } @@ -1656,8 +1653,8 @@ class ConnectionManager { synchronized (this.connectionLock.get(key)) { stub = (ClientService.BlockingInterface)this.stubs.get(key); if (stub == null) { - BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, - user, this.rpcTimeout); + BlockingRpcChannel channel = + this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); stub = ClientService.newBlockingStub(channel); // In old days, after getting stub/proxy, we'd make a call. We are not doing that here. // Just fail on first actual call rather than in here on setup. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java index 010b08b5100..adfc49b52ff 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java @@ -28,8 +28,8 @@ public class DelegatingRetryingCallable> implem } @Override - public T call() throws Exception { - return delegate.call(); + public T call(int callTimeout) throws Exception { + return delegate.call(callTimeout); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 19a4b883bb5..2dc212a7668 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -607,7 +607,7 @@ public class HBaseAdmin implements Abortable, Closeable { executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws ServiceException { + public Void call(int callTimeout) throws ServiceException { CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys); master.createTable(null, request); return null; @@ -636,7 +636,7 @@ public class HBaseAdmin implements Abortable, Closeable { executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws ServiceException { + public Void call(int callTimeout) throws ServiceException { DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName); master.deleteTable(null,req); return null; @@ -841,7 +841,7 @@ public class HBaseAdmin implements Abortable, Closeable { TableName.isLegalFullyQualifiedTableName(tableName.getName()); executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws ServiceException { + public Void call(int callTimeout) throws ServiceException { LOG.info("Started enable of " + tableName); EnableTableRequest req = RequestConverter.buildEnableTableRequest(tableName); master.enableTable(null,req); @@ -918,7 +918,7 @@ public class HBaseAdmin implements Abortable, Closeable { TableName.isLegalFullyQualifiedTableName(tableName.getName()); executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws ServiceException { + public Void call(int callTimeout) throws ServiceException { LOG.info("Started disable of " + tableName); DisableTableRequest req = RequestConverter.buildDisableTableRequest(tableName); master.disableTable(null,req); @@ -1136,7 +1136,7 @@ public class HBaseAdmin implements Abortable, Closeable { throws IOException { return executeCallable(new MasterCallable>(getConnection()) { @Override - public Pair call() throws ServiceException { + public Pair call(int callTimeout) throws ServiceException { GetSchemaAlterStatusRequest req = RequestConverter .buildGetSchemaAlterStatusRequest(tableName); GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(null, req); @@ -1203,7 +1203,7 @@ public class HBaseAdmin implements Abortable, Closeable { throws IOException { executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws ServiceException { + public Void call(int callTimeout) throws ServiceException { AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, column); master.addColumn(null,req); return null; @@ -1249,7 +1249,7 @@ public class HBaseAdmin implements Abortable, Closeable { throws IOException { executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws ServiceException { + public Void call(int callTimeout) throws ServiceException { DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(tableName, columnName); master.deleteColumn(null,req); return null; @@ -1297,7 +1297,7 @@ public class HBaseAdmin implements Abortable, Closeable { throws IOException { executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws ServiceException { + public Void call(int callTimeout) throws ServiceException { ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(tableName, descriptor); master.modifyColumn(null,req); return null; @@ -1707,7 +1707,7 @@ public class HBaseAdmin implements Abortable, Closeable { final byte[] toBeAssigned = getRegionName(regionName); executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws ServiceException { + public Void call(int callTimeout) throws ServiceException { AssignRegionRequest request = RequestConverter.buildAssignRegionRequest(toBeAssigned); master.assignRegion(null,request); @@ -1735,7 +1735,7 @@ public class HBaseAdmin implements Abortable, Closeable { final byte[] toBeUnassigned = getRegionName(regionName); executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws ServiceException { + public Void call(int callTimeout) throws ServiceException { UnassignRegionRequest request = RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force); master.unassignRegion(null,request); @@ -1992,7 +1992,7 @@ public class HBaseAdmin implements Abortable, Closeable { executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws ServiceException { + public Void call(int callTimeout) throws ServiceException { ModifyTableRequest request = RequestConverter.buildModifyTableRequest(tableName, htd); master.modifyTable(null, request); return null; @@ -2105,7 +2105,7 @@ public class HBaseAdmin implements Abortable, Closeable { public synchronized void shutdown() throws IOException { executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws ServiceException { + public Void call(int callTimeout) throws ServiceException { master.shutdown(null,ShutdownRequest.newBuilder().build()); return null; } @@ -2121,7 +2121,7 @@ public class HBaseAdmin implements Abortable, Closeable { public synchronized void stopMaster() throws IOException { executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws ServiceException { + public Void call(int callTimeout) throws ServiceException { master.stopMaster(null,StopMasterRequest.newBuilder().build()); return null; } @@ -2157,7 +2157,7 @@ public class HBaseAdmin implements Abortable, Closeable { public ClusterStatus getClusterStatus() throws IOException { return executeCallable(new MasterCallable(getConnection()) { @Override - public ClusterStatus call() throws ServiceException { + public ClusterStatus call(int callTimeout) throws ServiceException { GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest(); return ClusterStatus.convert(master.getClusterStatus(null,req).getClusterStatus()); } @@ -2185,7 +2185,7 @@ public class HBaseAdmin implements Abortable, Closeable { public void createNamespace(final NamespaceDescriptor descriptor) throws IOException { executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws Exception { + public Void call(int callTimeout) throws Exception { master.createNamespace(null, CreateNamespaceRequest.newBuilder() .setNamespaceDescriptor(ProtobufUtil @@ -2203,7 +2203,7 @@ public class HBaseAdmin implements Abortable, Closeable { public void modifyNamespace(final NamespaceDescriptor descriptor) throws IOException { executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws Exception { + public Void call(int callTimeout) throws Exception { master.modifyNamespace(null, ModifyNamespaceRequest.newBuilder(). setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build()); return null; @@ -2219,7 +2219,7 @@ public class HBaseAdmin implements Abortable, Closeable { public void deleteNamespace(final String name) throws IOException { executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws Exception { + public Void call(int callTimeout) throws Exception { master.deleteNamespace(null, DeleteNamespaceRequest.newBuilder(). setNamespaceName(name).build()); return null; @@ -2237,7 +2237,7 @@ public class HBaseAdmin implements Abortable, Closeable { return executeCallable(new MasterCallable(getConnection()) { @Override - public NamespaceDescriptor call() throws Exception { + public NamespaceDescriptor call(int callTimeout) throws Exception { return ProtobufUtil.toNamespaceDescriptor( master.getNamespaceDescriptor(null, GetNamespaceDescriptorRequest.newBuilder(). setNamespaceName(name).build()).getNamespaceDescriptor()); @@ -2254,7 +2254,7 @@ public class HBaseAdmin implements Abortable, Closeable { return executeCallable(new MasterCallable(getConnection()) { @Override - public NamespaceDescriptor[] call() throws Exception { + public NamespaceDescriptor[] call(int callTimeout) throws Exception { List list = master.listNamespaceDescriptors(null, ListNamespaceDescriptorsRequest.newBuilder(). build()).getNamespaceDescriptorList(); @@ -2277,7 +2277,7 @@ public class HBaseAdmin implements Abortable, Closeable { return executeCallable(new MasterCallable(getConnection()) { @Override - public HTableDescriptor[] call() throws Exception { + public HTableDescriptor[] call(int callTimeout) throws Exception { List list = master.listTableDescriptorsByNamespace(null, ListTableDescriptorsByNamespaceRequest. newBuilder().setNamespaceName(name).build()).getTableSchemaList(); @@ -2301,7 +2301,7 @@ public class HBaseAdmin implements Abortable, Closeable { return executeCallable(new MasterCallable(getConnection()) { @Override - public TableName[] call() throws Exception { + public TableName[] call(int callTimeout) throws Exception { List tableNames = master.listTableNamesByNamespace(null, ListTableNamesByNamespaceRequest. newBuilder().setNamespaceName(name).build()) @@ -2715,7 +2715,7 @@ public class HBaseAdmin implements Abortable, Closeable { LOG.debug("Getting current status of snapshot from master..."); done = executeCallable(new MasterCallable(getConnection()) { @Override - public IsSnapshotDoneResponse call() throws ServiceException { + public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException { return master.isSnapshotDone(null, request); } }); @@ -2744,7 +2744,7 @@ public class HBaseAdmin implements Abortable, Closeable { // run the snapshot on the master return executeCallable(new MasterCallable(getConnection()) { @Override - public SnapshotResponse call() throws ServiceException { + public SnapshotResponse call(int callTimeout) throws ServiceException { return master.snapshot(null, request); } }); @@ -2775,7 +2775,7 @@ public class HBaseAdmin implements Abortable, Closeable { return executeCallable(new MasterCallable(getConnection()) { @Override - public IsSnapshotDoneResponse call() throws ServiceException { + public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException { return master.isSnapshotDone(null, IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build()); } @@ -3026,7 +3026,7 @@ public class HBaseAdmin implements Abortable, Closeable { ExecProcedureResponse response = executeCallable(new MasterCallable( getConnection()) { @Override - public ExecProcedureResponse call() throws ServiceException { + public ExecProcedureResponse call(int callTimeout) throws ServiceException { return master.execProcedure(null, request); } }); @@ -3089,7 +3089,7 @@ public class HBaseAdmin implements Abortable, Closeable { return executeCallable( new MasterCallable(getConnection()) { @Override - public IsProcedureDoneResponse call() throws ServiceException { + public IsProcedureDoneResponse call(int callTimeout) throws ServiceException { return master.isProcedureDone(null, IsProcedureDoneRequest .newBuilder().setProcedure(desc).build()); } @@ -3135,7 +3135,7 @@ public class HBaseAdmin implements Abortable, Closeable { done = executeCallable(new MasterCallable( getConnection()) { @Override - public IsRestoreSnapshotDoneResponse call() throws ServiceException { + public IsRestoreSnapshotDoneResponse call(int callTimeout) throws ServiceException { return master.isRestoreSnapshotDone(null, request); } }); @@ -3165,7 +3165,7 @@ public class HBaseAdmin implements Abortable, Closeable { // run the snapshot restore on the master return executeCallable(new MasterCallable(getConnection()) { @Override - public RestoreSnapshotResponse call() throws ServiceException { + public RestoreSnapshotResponse call(int callTimeout) throws ServiceException { return master.restoreSnapshot(null, request); } }); @@ -3179,7 +3179,7 @@ public class HBaseAdmin implements Abortable, Closeable { public List listSnapshots() throws IOException { return executeCallable(new MasterCallable>(getConnection()) { @Override - public List call() throws ServiceException { + public List call(int callTimeout) throws ServiceException { return master.getCompletedSnapshots(null, GetCompletedSnapshotsRequest.newBuilder().build()) .getSnapshotsList(); } @@ -3235,7 +3235,7 @@ public class HBaseAdmin implements Abortable, Closeable { // do the delete executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws ServiceException { + public Void call(int callTimeout) throws ServiceException { master.deleteSnapshot(null, DeleteSnapshotRequest.newBuilder(). setSnapshot(SnapshotDescription.newBuilder().setName(snapshotName).build()).build()); @@ -3264,7 +3264,7 @@ public class HBaseAdmin implements Abortable, Closeable { // do the delete executeCallable(new MasterCallable(getConnection()) { @Override - public Void call() throws ServiceException { + public Void call(int callTimeout) throws ServiceException { this.master.deleteSnapshot(null, DeleteSnapshotRequest.newBuilder().setSnapshot(snapshot).build()); return null; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index f137a01055b..a9b21aa2ed0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; @@ -713,16 +714,26 @@ public class HTable implements HTableInterface { */ @Override public Result getRowOrBefore(final byte[] row, final byte[] family) - throws IOException { + throws IOException { RegionServerCallable callable = new RegionServerCallable(this.connection, tableName, row) { - public Result call() throws IOException { - return ProtobufUtil.getRowOrBefore(getStub(), - getLocation().getRegionInfo().getRegionName(), row, family); + public Result call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest( + getLocation().getRegionInfo().getRegionName(), row, family); + try { + ClientProtos.GetResponse response = getStub().get(controller, request); + if (!response.hasResult()) return null; + return ProtobufUtil.toResult(response.getResult()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); - } + return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); + } /** * {@inheritDoc} @@ -771,11 +782,22 @@ public class HTable implements HTableInterface { public Result get(final Get get) throws IOException { RegionServerCallable callable = new RegionServerCallable(this.connection, getName(), get.getRow()) { - public Result call() throws IOException { - return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get); + public Result call(int callTimeout) throws IOException { + ClientProtos.GetRequest request = + RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get); + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { + ClientProtos.GetResponse response = getStub().get(controller, request); + if (response == null) return null; + return ProtobufUtil.toResult(response.getResult()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); } /** @@ -863,11 +885,15 @@ public class HTable implements HTableInterface { throws IOException { RegionServerCallable callable = new RegionServerCallable(connection, tableName, delete.getRow()) { - public Boolean call() throws IOException { + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), delete); - MutateResponse response = getStub().mutate(null, request); + MutateResponse response = getStub().mutate(controller, request); return Boolean.valueOf(response.getProcessed()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -999,16 +1025,17 @@ public class HTable implements HTableInterface { public void mutateRow(final RowMutations rm) throws IOException { RegionServerCallable callable = new RegionServerCallable(connection, getName(), rm.getRow()) { - public Void call() throws IOException { + public Void call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); try { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( getLocation().getRegionInfo().getRegionName(), rm); regionMutationBuilder.setAtomic(true); MultiRequest request = MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); - PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(); - pcrc.setPriority(tableName); - getStub().multi(null, request); + getStub().multi(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -1032,15 +1059,16 @@ public class HTable implements HTableInterface { final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); RegionServerCallable callable = new RegionServerCallable(this.connection, getName(), append.getRow()) { - public Result call() throws IOException { + public Result call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); try { MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce); - PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController(); - rpcController.setPriority(getTableName()); - MutateResponse response = getStub().mutate(rpcController, request); + MutateResponse response = getStub().mutate(controller, request); if (!response.hasResult()) return null; - return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner()); + return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -1062,14 +1090,15 @@ public class HTable implements HTableInterface { final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); RegionServerCallable callable = new RegionServerCallable(this.connection, getName(), increment.getRow()) { - public Result call() throws IOException { + public Result call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); try { MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce); - PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController(); - rpcController.setPriority(getTableName()); - MutateResponse response = getStub().mutate(rpcController, request); - return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner()); + MutateResponse response = getStub().mutate(controller, request); + return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -1124,16 +1153,17 @@ public class HTable implements HTableInterface { final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); RegionServerCallable callable = new RegionServerCallable(connection, getName(), row) { - public Long call() throws IOException { + public Long call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); try { MutateRequest request = RequestConverter.buildIncrementRequest( getLocation().getRegionInfo().getRegionName(), row, family, qualifier, amount, durability, nonceGroup, nonce); - PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController(); - rpcController.setPriority(getTableName()); - MutateResponse response = getStub().mutate(rpcController, request); + MutateResponse response = getStub().mutate(controller, request); Result result = - ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner()); + ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -1153,12 +1183,15 @@ public class HTable implements HTableInterface { throws IOException { RegionServerCallable callable = new RegionServerCallable(connection, getName(), row) { - public Boolean call() throws IOException { + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); try { MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), row, family, qualifier, new BinaryComparator(value), CompareType.EQUAL, put); - MutateResponse response = getStub().mutate(null, request); + MutateResponse response = getStub().mutate(controller, request); return Boolean.valueOf(response.getProcessed()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -1178,13 +1211,16 @@ public class HTable implements HTableInterface { throws IOException { RegionServerCallable callable = new RegionServerCallable(connection, getName(), row) { - public Boolean call() throws IOException { + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); try { CompareType compareType = CompareType.valueOf(compareOp.name()); MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), row, family, qualifier, new BinaryComparator(value), compareType, put); - MutateResponse response = getStub().mutate(null, request); + MutateResponse response = getStub().mutate(controller, request); return Boolean.valueOf(response.getProcessed()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -1204,12 +1240,15 @@ public class HTable implements HTableInterface { throws IOException { RegionServerCallable callable = new RegionServerCallable(connection, getName(), row) { - public Boolean call() throws IOException { + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); try { MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), row, family, qualifier, new BinaryComparator(value), CompareType.EQUAL, delete); - MutateResponse response = getStub().mutate(null, request); + MutateResponse response = getStub().mutate(controller, request); return Boolean.valueOf(response.getProcessed()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -1229,13 +1268,16 @@ public class HTable implements HTableInterface { throws IOException { RegionServerCallable callable = new RegionServerCallable(connection, getName(), row) { - public Boolean call() throws IOException { + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); try { CompareType compareType = CompareType.valueOf(compareOp.name()); MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), row, family, qualifier, new BinaryComparator(value), compareType, delete); - MutateResponse response = getStub().mutate(null, request); + MutateResponse response = getStub().mutate(controller, request); return Boolean.valueOf(response.getProcessed()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 94c8e286ea7..47d8d545b5f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -72,14 +71,14 @@ class MultiServerCallable extends RegionServerCallable { @Override public HRegionInfo getHRegionInfo() { throw new RuntimeException("Cannot get region info for multi-region request"); - }; + } MultiAction getMulti() { return this.multiAction; } @Override - public MultiResponse call() throws IOException { + public MultiResponse call(int callTimeout) throws IOException { int countOfActions = this.multiAction.size(); if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions"); MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder(); @@ -118,6 +117,7 @@ class MultiServerCallable extends RegionServerCallable { // optionally ferries cell response data back out again. PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells); controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); ClientProtos.MultiResponse responseProto; ClientProtos.MultiRequest requestProto = multiRequestBuilder.build(); try { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java index 876c033a9fe..3fa4f4b7508 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.util.Bytes; * Used to perform Put operations for a single row. *

* To perform a Put, instantiate a Put object with the row to insert to and - * for each column to be inserted, execute {@link #add(byte[], byte[], byte[]) add} or + * for eachumn to be inserted, execute {@link #add(byte[], byte[], byte[]) add} or * {@link #add(byte[], byte[], long, byte[]) add} if setting the timestamp. */ @InterfaceAudience.Public diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index 74c1e4a8993..1740a2d86af 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.util.Bytes; /** - * Implementations call a RegionServer and implement {@link #call()}. + * Implementations call a RegionServer and implement {@link #call(int)}. * Passed to a {@link RpcRetryingCaller} so we retry on fail. * TODO: this class is actually tied to one region, because most of the paths make use of * the regioninfo part of location when building requests. The only reason it works for diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java index e8881284235..5bc6c1a57b0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java @@ -25,22 +25,21 @@ import java.util.concurrent.Callable; import org.apache.hadoop.classification.InterfaceAudience; /** - * A Callable that will be retried. If {@link #call()} invocation throws exceptions, + * A Callable that will be retried. If {@link #call(int)} invocation throws exceptions, * we will call {@link #throwable(Throwable, boolean)} with whatever the exception was. * @param */ @InterfaceAudience.Private -public interface RetryingCallable extends Callable { +public interface RetryingCallable { /** - * Prepare by setting up any connections to servers, etc., ahead of {@link #call()} invocation. - * @param reload Set this to true if need to requery locations (usually set on second invocation - * to {@link #call()} or whatever + * Prepare by setting up any connections to servers, etc., ahead of {@link #call(int)} invocation. + * @param reload Set this to true if need to requery locations * @throws IOException e */ void prepare(final boolean reload) throws IOException; /** - * Called when {@link #call()} throws an exception and we are going to retry; take action to + * Called when {@link #call(int)} throws an exception and we are going to retry; take action to * make it so we succeed on next call (clear caches, do relookup of locations, etc.). * @param t * @param retrying True if we are in retrying mode (we are not in retrying mode when max @@ -48,6 +47,15 @@ public interface RetryingCallable extends Callable { */ void throwable(final Throwable t, boolean retrying); + /** + * Computes a result, or throws an exception if unable to do so. + * + * @param callTimeout - the time available for this call. 0 for infinite. + * @return computed result + * @throws Exception if unable to compute a result + */ + T call(int callTimeout) throws Exception; + /** * @return Some details from the implementation that we would like to add to a terminating * exception; i.e. a fatal exception is being thrown ending retries and we might like to add diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index 6c51a702b68..68faba560ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -32,7 +32,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.ipc.RemoteException; @@ -53,6 +52,10 @@ public class RpcRetryingCaller { * Timeout for the call including retries */ private int callTimeout; + /** + * The remaining time, for the call to come. Takes into account the tries already done. + */ + private int remainingTime; /** * When we started making calls. */ @@ -77,20 +80,20 @@ public class RpcRetryingCaller { } private void beforeCall() { - int remaining = (int)(callTimeout - - (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime)); - if (remaining < MIN_RPC_TIMEOUT) { - // If there is no time left, we're trying anyway. It's too late. - // 0 means no timeout, and it's not the intent here. So we secure both cases by - // resetting to the minimum. - remaining = MIN_RPC_TIMEOUT; + if (callTimeout > 0) { + remainingTime = (int) (callTimeout - + (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime)); + if (remainingTime < MIN_RPC_TIMEOUT) { + // If there is no time left, we're trying anyway. It's too late. + // 0 means no timeout, and it's not the intent here. So we secure both cases by + // resetting to the minimum. + remainingTime = MIN_RPC_TIMEOUT; + } + } else { + remainingTime = 0; } - RpcClient.setRpcTimeout(remaining); } - private void afterCall() { - RpcClient.resetRpcTimeout(); - } public synchronized T callWithRetries(RetryingCallable callable) throws IOException, RuntimeException { @@ -114,12 +117,13 @@ public class RpcRetryingCaller { new ArrayList(); this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); for (int tries = 0;; tries++) { - long expectedSleep = 0; + long expectedSleep; try { - beforeCall(); callable.prepare(tries != 0); // if called with false, check table status on ZK - return callable.call(); + beforeCall(); + return callable.call(remainingTime); } catch (Throwable t) { + ExceptionUtil.rethrowIfInterrupt(t); if (LOG.isTraceEnabled()) { LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime=" + (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + "ms", t); @@ -131,7 +135,6 @@ public class RpcRetryingCaller { new RetriesExhaustedException.ThrowableWithExtraContext(t, EnvironmentEdgeManager.currentTimeMillis(), toString()); exceptions.add(qt); - ExceptionUtil.rethrowIfInterrupt(t); if (tries >= retries - 1) { throw new RetriesExhaustedException(tries, exceptions); } @@ -147,8 +150,6 @@ public class RpcRetryingCaller { ": " + callable.getExceptionMessageAdditionalDetail(); throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t)); } - } finally { - afterCall(); } try { Thread.sleep(expectedSleep); @@ -159,7 +160,6 @@ public class RpcRetryingCaller { } /** - * @param expectedSleep * @return Calculate how long a single call took */ private long singleCallDuration(final long expectedSleep) { @@ -170,7 +170,7 @@ public class RpcRetryingCaller { /** * Call the server once only. * {@link RetryingCallable} has a strange shape so we can do retrys. Use this invocation if you - * want to do a single call only (A call to {@link RetryingCallable#call()} will not likely + * want to do a single call only (A call to {@link RetryingCallable#call(int)} will not likely * succeed). * @return an object of type T * @throws IOException if a remote or network exception occurs @@ -181,9 +181,8 @@ public class RpcRetryingCaller { // The code of this method should be shared with withRetries. this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); try { - beforeCall(); callable.prepare(false); - return callable.call(); + return callable.call(callTimeout); } catch (Throwable t) { Throwable t2 = translateException(t); ExceptionUtil.rethrowIfInterrupt(t2); @@ -193,8 +192,6 @@ public class RpcRetryingCaller { } else { throw new RuntimeException(t2); } - } finally { - afterCall(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 1a3d7a722df..798352c5117 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -143,11 +143,10 @@ public class ScannerCallable extends RegionServerCallable { } } - /** - * @see java.util.concurrent.Callable#call() - */ + + @Override @SuppressWarnings("deprecation") - public Result [] call() throws IOException { + public Result [] call(int callTimeout) throws IOException { if (closed) { if (scannerId != -1) { close(); @@ -163,8 +162,9 @@ public class ScannerCallable extends RegionServerCallable { request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); ScanResponse response = null; PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); try { - controller.setPriority(getTableName()); response = getStub().scan(controller, request); // Client and RS maintain a nextCallSeq number during the scan. Every next() call // from client to server will increment this number in both sides. Client passes this diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java index 1f9b4bb5d81..153ddde54eb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java @@ -26,9 +26,6 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - /** * Optionally carries Cells across the proxy/service interface down into ipc. On its * way out it optionally carries a set of result Cell data. We stick the Cells here when we want @@ -36,7 +33,8 @@ import com.google.protobuf.RpcController; * service chasm. Used by client and server ipc'ing. */ @InterfaceAudience.Private -public class PayloadCarryingRpcController implements RpcController, CellScannable { +public class PayloadCarryingRpcController + extends TimeLimitedRpcController implements CellScannable { /** * Priority to set on this request. Set it here in controller so available composing the * request. This is the ordained way of setting priorities going forward. We will be @@ -46,8 +44,6 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl // priority. private int priority = 0; - // TODO: Fill out the rest of this class methods rather than return UnsupportedOperationException - /** * They are optionally set on construction, cleared after we make the call, and then optionally * set on response with the result. We use this lowest common denominator access to Cells because @@ -79,41 +75,6 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl this.cellScanner = cellScanner; } - @Override - public String errorText() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean failed() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isCanceled() { - throw new UnsupportedOperationException(); - } - - @Override - public void notifyOnCancel(RpcCallback arg0) { - throw new UnsupportedOperationException(); - } - - @Override - public void reset() { - throw new UnsupportedOperationException(); - } - - @Override - public void setFailed(String arg0) { - throw new UnsupportedOperationException(); - } - - @Override - public void startCancel() { - throw new UnsupportedOperationException(); - } - /** * @param priority Priority for this request; should fall roughly in the range * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 323834828cd..f28feac23eb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -82,7 +82,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ .setRequest(request.toByteString()).build(); RegionServerCallable callable = new RegionServerCallable(connection, table, row) { - public CoprocessorServiceResponse call() throws Exception { + public CoprocessorServiceResponse call(int callTimeout) throws Exception { byte[] regionName = getLocation().getRegionInfo().getRegionName(); return ProtobufUtil.execService(getStub(), call, regionName); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 1e8c9d3049d..1abb2d5d218 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -87,7 +87,6 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; -import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.nio.ByteBuffer; @@ -109,6 +108,7 @@ import java.util.concurrent.atomic.AtomicInteger; * Does RPC against a cluster. Manages connections per regionserver in the cluster. *

See HBaseServer */ +@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") @InterfaceAudience.Private public class RpcClient { // The LOG key is intentionally not from this package to avoid ipc logging at DEBUG (all under @@ -131,15 +131,31 @@ public class RpcClient { private final IPCUtil ipcUtil; protected final SocketFactory socketFactory; // how to create sockets + private final int connectTO; + private final int readTO; + private final int writeTO; protected String clusterId; protected final SocketAddress localAddr; private final boolean fallbackAllowed; private UserProvider userProvider; - final private static String SOCKET_TIMEOUT = "ipc.socket.timeout"; - final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds - final static int PING_CALL_ID = -1; // Used by the server, for compatibility with old clients. + final private static String SOCKET_TIMEOUT_CONNECT = "ipc.socket.timeout.connect"; + final static int DEFAULT_SOCKET_TIMEOUT_CONNECT = 10000; // 10 seconds + + /** + * How long we wait when we wait for an answer. It's not the operation time, it's the time + * we wait when we start to receive an answer, when the remote write starts to send the data. + */ + final private static String SOCKET_TIMEOUT_READ = "ipc.socket.timeout.read"; + final static int DEFAULT_SOCKET_TIMEOUT_READ = 20000; // 20 seconds + + final private static String SOCKET_TIMEOUT_WRITE = "ipc.socket.timeout.write"; + final static int DEFAULT_SOCKET_TIMEOUT_WRITE = 60000; // 60 seconds + + // Used by the server, for compatibility with old clients. + // The client in 0.99+ does not ping the server. + final static int PING_CALL_ID = -1; public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry"; public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000; @@ -152,18 +168,6 @@ public class RpcClient { public static final String ALLOWS_INTERRUPTS = "hbase.ipc.client.allowsInterrupt"; - // thread-specific RPC timeout, which may override that of what was passed in. - // This is used to change dynamically the timeout (for read only) when retrying: if - // the time allowed for the operation is less than the usual socket timeout, then - // we lower the timeout. This is subject to race conditions, and should be used with - // extreme caution. - private static ThreadLocal rpcTimeout = new ThreadLocal() { - @Override - protected Integer initialValue() { - return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; - } - }; - /** * A class to manage a list of servers that failed recently. */ @@ -231,13 +235,6 @@ public class RpcClient { } } - /** - * @return the socket timeout - */ - static int getSocketTimeout(Configuration conf) { - return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT); - } - /** A call waiting for a value. */ protected class Call { final int id; // call id @@ -254,15 +251,47 @@ public class RpcClient { volatile boolean done; // true when call is done long startTime; final MethodDescriptor md; + final int timeout; // timeout in millisecond for this call; 0 means infinite. protected Call(final MethodDescriptor md, Message param, final CellScanner cells, - final Message responseDefaultType) { + final Message responseDefaultType, int timeout) { this.param = param; this.md = md; this.cells = cells; this.startTime = EnvironmentEdgeManager.currentTimeMillis(); this.responseDefaultType = responseDefaultType; this.id = callIdCnt.getAndIncrement(); + this.timeout = timeout; + } + + + /** + * Check if the call did timeout. Set an exception (includes a notify) if it's the case. + * @return true if the call is on timeout, false otherwise. + */ + public boolean checkTimeout() { + if (timeout == 0){ + return false; + } + + long waitTime = EnvironmentEdgeManager.currentTimeMillis() - getStartTime(); + if (waitTime >= timeout) { + IOException ie = new CallTimeoutException("Call id=" + id + + ", waitTime=" + waitTime + ", operationTimeout=" + timeout + " expired."); + setException(ie); // includes a notify + return true; + } else { + return false; + } + } + + public int remainingTime() { + if (timeout == 0) { + return Integer.MAX_VALUE; + } + + int remaining = timeout - (int) (EnvironmentEdgeManager.currentTimeMillis() - getStartTime()); + return remaining > 0 ? remaining : 0; } @Override @@ -345,6 +374,7 @@ public class RpcClient { /** Thread that reads responses and notifies callers. Each connection owns a * socket connected to a remote address. Calls are multiplexed through this * socket: responses may be delivered out of order. */ + @SuppressWarnings("SynchronizeOnNonFinalField") protected class Connection extends Thread { private ConnectionHeader header; // connection header protected ConnectionId remoteId; @@ -414,8 +444,7 @@ public class RpcClient { setName(name + " - writer"); } - public void cancel(CallFuture cts){ - cts.call.done = true; + public void remove(CallFuture cts){ callsToWrite.remove(cts); calls.remove(cts.call.id); } @@ -442,15 +471,8 @@ public class RpcClient { continue; } - if (remoteId.rpcTimeout > 0) { - long waitTime = EnvironmentEdgeManager.currentTimeMillis() - cts.call.getStartTime(); - if (waitTime >= remoteId.rpcTimeout) { - IOException ie = new CallTimeoutException("Call id=" + cts.call.id + - ", waitTime=" + waitTime + ", rpcTimetout=" + remoteId.rpcTimeout + - ", expired before being sent to the server."); - cts.call.setException(ie); // includes a notify - continue; - } + if (cts.call.checkTimeout()) { + continue; } try { @@ -595,10 +617,8 @@ public class RpcClient { if (localAddr != null) { this.socket.bind(localAddr); } - // connection time out is 20s - NetUtils.connect(this.socket, remoteId.getAddress(), - getSocketTimeout(conf)); - this.socket.setSoTimeout(remoteId.rpcTimeout); + NetUtils.connect(this.socket, remoteId.getAddress(), connectTO); + this.socket.setSoTimeout(readTO); return; } catch (SocketTimeoutException toe) { /* The max number of retries is 45, @@ -883,10 +903,8 @@ public class RpcClient { while (true) { setupConnection(); InputStream inStream = NetUtils.getInputStream(socket); - // This creates a socket with a write timeout. This timeout cannot be changed, - // RpcClient allows to change the timeout dynamically, but we can only - // change the read timeout today. - OutputStream outStream = NetUtils.getOutputStream(socket, remoteId.rpcTimeout); + // This creates a socket with a write timeout. This timeout cannot be changed. + OutputStream outStream = NetUtils.getOutputStream(socket, writeTO); // Write out the preamble -- MAGIC, version, and auth to use. writeConnectionHeaderPreamble(outStream); if (useSasl) { @@ -1005,7 +1023,7 @@ public class RpcClient { LOG.debug(getName() + ": closing ipc connection to " + server); } - cleanupCalls(); + cleanupCalls(true); if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": ipc connection to " + server + " closed"); @@ -1025,8 +1043,6 @@ public class RpcClient { * Initiates a call by sending the parameter to the remote server. * Note: this is not called from the Connection thread, but by other * threads. - * @param call - * @param priority * @see #readResponse() */ private void writeRequest(Call call, final int priority, Span span) throws IOException { @@ -1143,7 +1159,7 @@ public class RpcClient { if (expectedCall) call.setResponse(value, cellBlockScanner); } } catch (IOException e) { - if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) { + if (e instanceof SocketTimeoutException) { // Clean up open calls but don't treat this as a fatal condition, // since we expect certain responses to not make it by the specified // {@link ConnectionId#rpcTimeout}. @@ -1152,7 +1168,7 @@ public class RpcClient { markClosed(e); } } finally { - cleanupCalls(remoteId.rpcTimeout); + cleanupCalls(false); } } @@ -1166,7 +1182,7 @@ public class RpcClient { } /** - * @param e + * @param e exception to be wrapped * @return RemoteException made from passed e */ private RemoteException createRemoteException(final ExceptionResponse e) { @@ -1195,46 +1211,32 @@ public class RpcClient { } } - /* Cleanup all calls and mark them as done */ - protected void cleanupCalls() { - cleanupCalls(-1); - } /** * Cleanup the calls older than a given timeout, in milli seconds. - * @param rpcTimeout -1 for all calls, > 0 otherwise. 0 means no timeout and does nothing. + * @param allCalls for all calls, */ - protected synchronized void cleanupCalls(long rpcTimeout) { - if (rpcTimeout == 0) return; - + protected synchronized void cleanupCalls(boolean allCalls) { Iterator> itor = calls.entrySet().iterator(); while (itor.hasNext()) { Call c = itor.next().getValue(); - long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime(); - if (rpcTimeout < 0) { + if (c.done) { + // To catch the calls without timeout that were cancelled. + itor.remove(); + } else if (allCalls) { + long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime(); IOException ie = new IOException("Call id=" + c.id + ", waitTime=" + waitTime); c.setException(ie); itor.remove(); - } else if (waitTime >= rpcTimeout) { - IOException ie = new CallTimeoutException("Call id=" + c.id + - ", waitTime=" + waitTime + ", rpcTimeout=" + rpcTimeout); - c.setException(ie); + } else if (c.checkTimeout()) { itor.remove(); } else { - // This relies on the insertion order to be the call id order. This is not - // true under 'difficult' conditions (gc, ...). - rpcTimeout -= waitTime; + // We expect the call to be ordered by timeout. It may not be the case, but stopping + // at the first valid call allows to be sure that we still have something to do without + // spending too much time by reading the full list. break; } } - - if (!shouldCloseConnection.get() && socket != null && rpcTimeout > 0) { - try { - socket.setSoTimeout((int)rpcTimeout); - } catch (SocketException e) { - LOG.warn("Couldn't change timeout, which may result in longer than expected calls"); - } - } } } @@ -1253,7 +1255,7 @@ public class RpcClient { /** * Construct an IPC cluster client whose values are of the {@link Message} class. * @param conf configuration - * @param clusterId + * @param clusterId the cluster id * @param factory socket factory */ RpcClient(Configuration conf, String clusterId, SocketFactory factory) { @@ -1263,7 +1265,7 @@ public class RpcClient { /** * Construct an IPC cluster client whose values are of the {@link Message} class. * @param conf configuration - * @param clusterId + * @param clusterId the cluster id * @param factory socket factory * @param localAddr client socket bind address */ @@ -1286,22 +1288,30 @@ public class RpcClient { IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); this.localAddr = localAddr; this.userProvider = UserProvider.instantiate(conf); + this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); + this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ); + this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE); + + // login the server principal (if using secure Hadoop) if (LOG.isDebugEnabled()) { LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + - ", tcpKeepAlive=" + this.tcpKeepAlive + - ", tcpNoDelay=" + this.tcpNoDelay + - ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose + - ", maxRetries=" + this.maxRetries + - ", fallbackAllowed=" + this.fallbackAllowed + - ", bind address=" + (this.localAddr != null ? this.localAddr : "null")); + ", tcpKeepAlive=" + this.tcpKeepAlive + + ", tcpNoDelay=" + this.tcpNoDelay + + ", connectTO=" + this.connectTO + + ", readTO=" + this.readTO + + ", writeTO=" + this.writeTO + + ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose + + ", maxRetries=" + this.maxRetries + + ", fallbackAllowed=" + this.fallbackAllowed + + ", bind address=" + (this.localAddr != null ? this.localAddr : "null")); } } /** * Construct an IPC client for the cluster clusterId with the default SocketFactory * @param conf configuration - * @param clusterId + * @param clusterId the cluster id */ public RpcClient(Configuration conf, String clusterId) { this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null); @@ -1310,7 +1320,7 @@ public class RpcClient { /** * Construct an IPC client for the cluster clusterId with the default SocketFactory * @param conf configuration - * @param clusterId + * @param clusterId the cluster id * @param localAddr client socket bind address. */ public RpcClient(Configuration conf, String clusterId, SocketAddress localAddr) { @@ -1343,7 +1353,7 @@ public class RpcClient { /** * Encapsulate the ugly casting and RuntimeException conversion in private method. - * @param conf + * @param conf configuration * @return The compressor to use on this client. */ private static CompressionCodec getCompressor(final Configuration conf) { @@ -1380,21 +1390,13 @@ public class RpcClient { * Return the pool size specified in the configuration, which is applicable only if * the pool type is {@link PoolType#RoundRobin}. * - * @param config + * @param config configuration * @return the maximum pool size */ protected static int getPoolSize(Configuration config) { return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); } - /** Return the socket factory of this client - * - * @return this client's socket factory - */ - SocketFactory getSocketFactory() { - return socketFactory; - } - /** Stop all threads related to this client. No further calls may be made * using this client. */ public void stop() { @@ -1432,25 +1434,19 @@ public class RpcClient { * with the ticket credentials, returning the value. * Throws exceptions if there are network problems or if the remote code * threw an exception. - * @param md - * @param param - * @param cells - * @param addr - * @param returnType * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a * new Connection each time. - * @param rpcTimeout * @return A pair with the Message response and the Cell data (if any). * @throws InterruptedException * @throws IOException */ Pair call(MethodDescriptor md, Message param, CellScanner cells, - Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout, int priority) + Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority) throws IOException, InterruptedException { - Call call = new Call(md, param, cells, returnType); + Call call = new Call(md, param, cells, returnType, callTimeout); Connection connection = - getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor); + getConnection(ticket, call, addr, this.codec, this.compressor); CallFuture cts = null; if (connection.callSender != null){ @@ -1460,16 +1456,17 @@ public class RpcClient { } while (!call.done) { + if (call.checkTimeout()) { + if (cts != null) connection.callSender.remove(cts); + break; + } try { synchronized (call) { - call.wait(1000); // wait for the result. We will be notified by the reader. + call.wait(Math.min(call.remainingTime(), 1000) + 1); } } catch (InterruptedException e) { - if (cts != null) { - connection.callSender.cancel(cts); - } else { - call.done = true; - } + call.setException(new InterruptedIOException()); + if (cts != null) connection.callSender.remove(cts); throw e; } } @@ -1487,7 +1484,6 @@ public class RpcClient { } - /** * Take an IOException and the address we were trying to connect to * and return an IOException with the input exception as the cause. @@ -1523,7 +1519,7 @@ public class RpcClient { * process died) or no route to host: i.e. their next retries should be faster and with a * safe exception. */ - public void cancelConnections(String hostname, int port, IOException ioe) { + public void cancelConnections(String hostname, int port) { synchronized (connections) { for (Connection connection : connections.values()) { if (connection.isAlive() && @@ -1540,15 +1536,15 @@ public class RpcClient { /** * Get a connection from the pool, or create a new one and add it to the - * pool. Connections to a given host/port are reused. + * pool. Connections to a given host/port are reused. */ protected Connection getConnection(User ticket, Call call, InetSocketAddress addr, - int rpcTimeout, final Codec codec, final CompressionCodec compressor) + final Codec codec, final CompressionCodec compressor) throws IOException { if (!running.get()) throw new StoppedRpcClientException(); Connection connection; ConnectionId remoteId = - new ConnectionId(ticket, call.md.getService().getName(), addr, rpcTimeout); + new ConnectionId(ticket, call.md.getService().getName(), addr); synchronized (connections) { connection = connections.get(remoteId); if (connection == null) { @@ -1576,17 +1572,12 @@ public class RpcClient { protected static class ConnectionId { final InetSocketAddress address; final User ticket; - final int rpcTimeout; private static final int PRIME = 16777619; final String serviceName; - ConnectionId(User ticket, - String serviceName, - InetSocketAddress address, - int rpcTimeout) { + ConnectionId(User ticket, String serviceName, InetSocketAddress address) { this.address = address; this.ticket = ticket; - this.rpcTimeout = rpcTimeout; this.serviceName = serviceName; } @@ -1604,8 +1595,7 @@ public class RpcClient { @Override public String toString() { - return this.address.toString() + "/" + this.serviceName + "/" + this.ticket + "/" + - this.rpcTimeout; + return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; } @Override @@ -1614,7 +1604,7 @@ public class RpcClient { ConnectionId id = (ConnectionId) obj; return address.equals(id.address) && ((ticket != null && ticket.equals(id.ticket)) || - (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout && + (ticket == id.ticket)) && this.serviceName == id.serviceName; } return false; @@ -1624,28 +1614,11 @@ public class RpcClient { public int hashCode() { int hashcode = (address.hashCode() + PRIME * (PRIME * this.serviceName.hashCode() ^ - (ticket == null ? 0 : ticket.hashCode()) )) ^ - rpcTimeout; + (ticket == null ? 0 : ticket.hashCode()) )); return hashcode; } } - public static void setRpcTimeout(int t) { - rpcTimeout.set(t); - } - - /** - * Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given - * default timeout. - */ - public static int getRpcTimeout(int defaultTimeout) { - return Math.min(defaultTimeout, rpcTimeout.get()); - } - - public static void resetRpcTimeout() { - rpcTimeout.remove(); - } - /** * Make a blocking call. Throws exceptions if there are network problems or if the remote code * threw an exception. @@ -1654,24 +1627,24 @@ public class RpcClient { * new Connection each time. * @return A pair with the Message response and the Cell data (if any). */ - Message callBlockingMethod(MethodDescriptor md, RpcController controller, - Message param, Message returnType, final User ticket, final InetSocketAddress isa, - final int rpcTimeout) + Message callBlockingMethod(MethodDescriptor md, PayloadCarryingRpcController pcrc, + Message param, Message returnType, final User ticket, final InetSocketAddress isa) throws ServiceException { long startTime = 0; if (LOG.isTraceEnabled()) { startTime = EnvironmentEdgeManager.currentTimeMillis(); } - PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller; + int callTimeout = 0; CellScanner cells = null; if (pcrc != null) { + callTimeout = pcrc.getCallTimeout(); cells = pcrc.cellScanner(); // Clear it here so we don't by mistake try and these cells processing results. pcrc.setCellScanner(null); } Pair val; try { - val = call(md, param, cells, returnType, ticket, isa, rpcTimeout, + val = call(md, param, cells, returnType, ticket, isa, callTimeout, pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS); if (pcrc != null) { // Shove the results into controller so can be carried across the proxy/pb service void. @@ -1696,8 +1669,8 @@ public class RpcClient { * @return A blocking rpc channel that goes via this rpc client instance. */ public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, - final User ticket, final int rpcTimeout) { - return new BlockingRpcChannelImplementation(this, sn, ticket, rpcTimeout); + final User ticket, int defaultOperationTimeout) { + return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout); } /** @@ -1707,25 +1680,36 @@ public class RpcClient { public static class BlockingRpcChannelImplementation implements BlockingRpcChannel { private final InetSocketAddress isa; private final RpcClient rpcClient; - private final int rpcTimeout; private final User ticket; + private final int defaultOperationTimeout; + /** + * @param defaultOperationTimeout - the default timeout when no timeout is given + * by the caller. + */ protected BlockingRpcChannelImplementation(final RpcClient rpcClient, final ServerName sn, - final User ticket, final int rpcTimeout) { + final User ticket, int defaultOperationTimeout) { this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); this.rpcClient = rpcClient; - // Set the rpc timeout to be the minimum of configured timeout and whatever the current - // thread local setting is. - this.rpcTimeout = getRpcTimeout(rpcTimeout); this.ticket = ticket; + this.defaultOperationTimeout = defaultOperationTimeout; } @Override public Message callBlockingMethod(MethodDescriptor md, RpcController controller, - Message param, Message returnType) - throws ServiceException { - return this.rpcClient.callBlockingMethod(md, controller, param, returnType, this.ticket, - this.isa, this.rpcTimeout); + Message param, Message returnType) throws ServiceException { + PayloadCarryingRpcController pcrc; + if (controller != null) { + pcrc = (PayloadCarryingRpcController) controller; + if (!pcrc.hasCallTimeout()){ + pcrc.setCallTimeout(defaultOperationTimeout); + } + } else { + pcrc = new PayloadCarryingRpcController(); + pcrc.setCallTimeout(defaultOperationTimeout); + } + + return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 55337357a13..1a0525e9b89 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -1428,28 +1428,6 @@ public final class ProtobufUtil { // Start helpers for Client - /** - * A helper to invoke a Get using client protocol. - * - * @param client - * @param regionName - * @param get - * @return the result of the Get - * @throws IOException - */ - public static Result get(final ClientService.BlockingInterface client, - final byte[] regionName, final Get get) throws IOException { - GetRequest request = - RequestConverter.buildGetRequest(regionName, get); - try { - GetResponse response = client.get(null, request); - if (response == null) return null; - return toResult(response.getResult()); - } catch (ServiceException se) { - throw getRemoteException(se); - } - } - /** * A helper to get a row of the closet one before using client protocol. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 2f9bb6c11fa..4a2c9f089b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -2042,8 +2042,7 @@ public class RpcServer implements RpcServerInterface { status.getClient(), startTime, processingTime, qTime, responseSize); } - return new Pair(result, - controller != null? controller.cellScanner(): null); + return new Pair(result, controller.cellScanner()); } catch (Throwable e) { // The above callBlockingMethod will always return a SE. Strip the SE wrapper before // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't @@ -2239,7 +2238,7 @@ public class RpcServer implements RpcServerInterface { /** * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)} - * and {@link #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer)}. Only + * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only * one of readCh or writeCh should be non-null. * * @param readCh read channel @@ -2248,7 +2247,7 @@ public class RpcServer implements RpcServerInterface { * @return bytes written * @throws java.io.IOException e * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer) - * @see #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer) + * @see #channelWrite(GatheringByteChannel, BufferChain) */ private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 6f8396ed190..e666e5b2aa5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -596,7 +596,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { final RegionServerCallable svrCallable = new RegionServerCallable(conn, tableName, first) { @Override - public Boolean call() throws Exception { + public Boolean call(int callTimeout) throws Exception { SecureBulkLoadClient secureClient = null; boolean success = false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index d156b715ed7..2dc2388ce62 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -60,8 +60,8 @@ public class ReplicationProtbufUtil { final HLog.Entry[] entries) throws IOException { Pair p = buildReplicateWALEntryRequest(entries); + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); try { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); admin.replicateWALEntry(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index cb31a9694d0..369d3fe1db5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -437,7 +437,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // A sleeper that sleeps for msgInterval. private final Sleeper sleeper; - private final int rpcTimeout; + private final int operationTimeout; private final RegionServerAccounting regionServerAccounting; @@ -555,7 +555,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa this.numRegionsToReport = conf.getInt( "hbase.regionserver.numregionstoreport", 10); - this.rpcTimeout = conf.getInt( + this.operationTimeout = conf.getInt( HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); @@ -1972,7 +1972,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa new InetSocketAddress(sn.getHostname(), sn.getPort()); try { BlockingRpcChannel channel = - this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), this.rpcTimeout); + this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), operationTimeout); intf = RegionServerStatusService.newBlockingStub(channel); break; } catch (IOException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 77265351a5d..e01bb00db70 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -423,7 +423,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
* At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) * @param numTasks current total number of available tasks - * @return */ private int calculateAvailableSplitters(int numTasks) { // at lease one RS(itself) available diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index c7f79cf0603..15986d16279 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -1861,7 +1861,6 @@ public class HLogSplitter { * Tag original sequence number for each edit to be replayed * @param entry * @param cell - * @return */ private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) { // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index 917a7c949e9..0b3be7ac099 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -191,7 +191,7 @@ public class WALEditsReplaySink { } @Override - public ReplicateWALEntryResponse call() throws IOException { + public ReplicateWALEntryResponse call(int callTimeout) throws IOException { try { replayToServer(this.regionInfo, this.entries); } catch (ServiceException se) { @@ -210,8 +210,8 @@ public class WALEditsReplaySink { Pair p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray); + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); try { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); remoteSvr.replay(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 2ef135c3fa9..115a49eba5b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -40,20 +40,26 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.filter.Filter; @@ -100,6 +106,25 @@ public class TestHCM { private static final byte[] ROW_X = Bytes.toBytes("xxx"); private static Random _randy = new Random(); +/** +* This copro sleeps 20 second. The first call it fails. The second time, it works. +*/ + public static class SleepAndFailFirstTime extends BaseRegionObserver { + static final AtomicLong ct = new AtomicLong(0); + + public SleepAndFailFirstTime() { + } + + @Override + public void preGetOp(final ObserverContext e, + final Get get, final List results) throws IOException { + Threads.sleep(20000); + if (ct.incrementAndGet() == 1){ + throw new IOException("first call I fail"); + } + } + } + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); @@ -165,7 +190,7 @@ public class TestHCM { t.close(); con1.close(); - // if the pool was created on demand it should be closed upon connectin close + // if the pool was created on demand it should be closed upon connection close assertTrue(pool.isShutdown()); con2.close(); @@ -244,6 +269,41 @@ public class TestHCM { testConnectionClose(false); } + /** + * Test that an operation can fail if we read the global operation timeout, even if the + * individual timeout is fine. We do that with: + * - client side: an operation timeout of 30 seconds + * - server side: we sleep 20 second at each attempt. The first work fails, the second one + * succeeds. But the client won't wait that much, because 20 + 20 > 30, so the client + * timeouted when the server answers. + */ + @Test + public void testOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + HTable table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); + + // Check that it works if the timeout is big enough + table.setOperationTimeout(120 * 1000); + table.get(new Get(FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + table.setOperationTimeout(30 * 1000); + table.get(new Get(FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (SocketTimeoutException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } catch (IOException e) { + Assert.fail("Wrong exception:" + e.getMessage()); + } + } + + private void testConnectionClose(boolean allowsInterrupt) throws Exception { String tableName = "HCM-testConnectionClose" + allowsInterrupt; TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close(); @@ -302,12 +362,12 @@ public class TestHCM { LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn); for (int i = 0; i < 5000; i++) { - rpcClient.cancelConnections(sn.getHostname(), sn.getPort(), null); + rpcClient.cancelConnections(sn.getHostname(), sn.getPort()); Thread.sleep(5); } step.compareAndSet(1, 2); - // The test may fail here if the thread doing the gets is stuck. The wait to find + // The test may fail here if the thread doing the gets is stuck. The way to find // out what's happening is to look for the thread named 'testConnectionCloseThread' TEST_UTIL.waitFor(20000, new Waiter.Predicate() { @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java index dc0d8920815..438ed1e05a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -155,7 +156,9 @@ public class TestEndToEndSplitTransaction { byte[] regionName = con.relocateRegion(tableName, row).getRegionInfo() .getRegionName(); // get and scan should now succeed without exception - ProtobufUtil.get(server, regionName, new Get(row)); + ClientProtos.GetRequest request = + RequestConverter.buildGetRequest(regionName, new Get(row)); + server.get(null, request); ScanRequest scanRequest = RequestConverter.buildScanRequest( regionName, new Scan(row), 1, true); try { @@ -165,6 +168,8 @@ public class TestEndToEndSplitTransaction { } } catch (IOException x) { return false; + } catch (ServiceException e) { + return false; } return true; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java index 4949dcb7d5c..89ce874495f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java @@ -158,7 +158,7 @@ public class TestHRegionServerBulkLoad { RegionServerCallable callable = new RegionServerCallable(conn, tbl, Bytes.toBytes("aaa")) { @Override - public Void call() throws Exception { + public Void call(int callTimeout) throws Exception { LOG.debug("Going to connect to server " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); byte[] regionName = getLocation().getRegionInfo().getRegionName(); @@ -177,7 +177,7 @@ public class TestHRegionServerBulkLoad { // 10 * 50 = 500 open file handles! callable = new RegionServerCallable(conn, tbl, Bytes.toBytes("aaa")) { @Override - public Void call() throws Exception { + public Void call(int callTimeout) throws Exception { LOG.debug("compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); AdminProtos.AdminService.BlockingInterface server =