From 50df1e2fd7feb0d8a4efddcfcba5646f8918bd20 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Fri, 26 Jul 2013 22:38:15 +0000 Subject: [PATCH] HBASE-8764 Some MasterMonitorCallable should retry git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1507495 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/client/AsyncProcess.java | 20 +- .../hadoop/hbase/client/ClientScanner.java | 11 +- .../hadoop/hbase/client/ConnectionUtils.java | 3 +- .../hadoop/hbase/client/HBaseAdmin.java | 149 +++++---- .../hadoop/hbase/client/HConnection.java | 34 +- .../hbase/client/HConnectionManager.java | 12 - .../hbase/client/HConnectionWrapper.java | 12 - .../apache/hadoop/hbase/client/HTable.java | 249 ++++++++------- .../hbase/client/MultiServerCallable.java | 25 +- .../hbase/client/RegionOfflineException.java | 4 +- .../hbase/client/RegionServerCallable.java | 145 +++++++++ .../client/RetriesExhaustedException.java | 3 +- .../hadoop/hbase/client/RetryingCallable.java | 66 ++++ .../hbase/client/RpcRetryingCaller.java | 228 +++++++++++++ .../hadoop/hbase/client/ScannerCallable.java | 27 +- .../hadoop/hbase/client/ScannerCaller.java | 21 ++ .../hadoop/hbase/client/ServerCallable.java | 299 ------------------ .../exceptions/DoNotRetryIOException.java | 2 +- .../exceptions/HBaseSnapshotException.java | 3 +- .../InvalidFamilyOperationException.java | 2 +- .../exceptions/LockTimeoutException.java | 7 +- .../hbase/exceptions/RegionException.java | 3 +- .../exceptions/TableExistsException.java | 6 +- .../exceptions/TableNotDisabledException.java | 4 +- .../exceptions/TableNotEnabledException.java | 5 +- .../exceptions/TableNotFoundException.java | 4 +- .../ipc/RegionCoprocessorRpcChannel.java | 14 +- .../hadoop/hbase/client/TestAsyncProcess.java | 49 ++- .../org/apache/hadoop/hbase/util/Threads.java | 3 + .../mapreduce/LoadIncrementalHFiles.java | 22 +- .../regionserver/wal/WALEditsReplaySink.java | 26 +- .../client/HConnectionTestingUtility.java | 8 +- .../hbase/client/TestHBaseAdminNoCluster.java | 90 ++++++ .../TestHRegionServerBulkLoad.java | 35 +- .../hbase/regionserver/wal/TestHLog.java | 24 +- 35 files changed, 933 insertions(+), 682 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCaller.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 58bdd2650d4..2aa31d2571e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -84,7 +84,6 @@ import java.util.concurrent.atomic.AtomicLong; */ class AsyncProcess { private static final Log LOG = LogFactory.getLog(AsyncProcess.class); - protected final HConnection hConnection; protected final byte[] tableName; protected final ExecutorService pool; @@ -406,9 +405,9 @@ class AsyncProcess { public void run() { MultiResponse res; try { - ServerCallable callable = createCallable(loc, multi); + MultiServerCallable callable = createCallable(loc, multi); try { - res = callable.withoutRetries(); + res = createCaller(callable).callWithoutRetries(callable); } catch (IOException e) { LOG.warn("The call to the RS failed, we don't know where we stand. location=" + loc, e); @@ -441,10 +440,19 @@ class AsyncProcess { /** * Create a callable. Isolated to be easily overridden in the tests. */ - protected ServerCallable createCallable( - final HRegionLocation loc, final MultiAction multi) { + protected MultiServerCallable createCallable(final HRegionLocation location, + final MultiAction multi) { + return new MultiServerCallable(hConnection, tableName, location, multi); + } - return new MultiServerCallable(hConnection, tableName, loc, multi); + /** + * For tests. + * @param callable + * @return Returns a caller. + */ + protected RpcRetryingCaller createCaller(MultiServerCallable callable) { + // callable is unused. + return new RpcRetryingCaller(); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 8d5d1ac6501..2746f4e06b9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -66,6 +66,7 @@ public class ClientScanner extends AbstractClientScanner { private final byte[] tableName; private final int scannerTimeout; private boolean scanMetricsPublished = false; + private ScannerCaller caller = new ScannerCaller(); /** * Create a new ClientScanner for the specified table. An HConnection will be @@ -179,7 +180,7 @@ public class ClientScanner extends AbstractClientScanner { // Close the previous scanner if it's open if (this.callable != null) { this.callable.setClose(); - callable.withRetries(); + this.caller.callWithRetries(callable, getConnection().getConfiguration()); this.callable = null; } @@ -216,7 +217,7 @@ public class ClientScanner extends AbstractClientScanner { callable = getScannerCallable(localStartKey); // Open a scanner on the region server starting at the // beginning of the region - callable.withRetries(); + this.caller.callWithRetries(callable, getConnection().getConfiguration()); this.currentRegion = callable.getHRegionInfo(); if (this.scanMetrics != null) { this.scanMetrics.countOfRegions.incrementAndGet(); @@ -276,10 +277,10 @@ public class ClientScanner extends AbstractClientScanner { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just // exhausted current region. - values = callable.withRetries(); + values = this.caller.callWithRetries(callable, getConnection().getConfiguration()); if (skipFirst && values != null && values.length == 1) { skipFirst = false; // Already skipped, unset it before scanning again - values = callable.withRetries(); + values = this.caller.callWithRetries(callable, getConnection().getConfiguration()); } retryAfterOutOfOrderException = true; } catch (DoNotRetryIOException e) { @@ -402,7 +403,7 @@ public class ClientScanner extends AbstractClientScanner { if (callable != null) { callable.setClose(); try { - callable.withRetries(); + this.caller.callWithRetries(callable, getConnection().getConfiguration()); } catch (IOException e) { // We used to catch this error, interpret, and rethrow. However, we // have since decided that it's not nice for a scanner's close to diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index b97608ad39d..c49ec9d99be 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -24,8 +24,7 @@ import org.apache.hadoop.hbase.HConstants; import java.util.Random; /** - * Utility used by client connections such as {@link HConnection} and - * {@link ServerCallable} + * Utility used by client connections. */ @InterfaceAudience.Public @InterfaceStability.Evolving diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 55d4b855f49..96a83987da6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -25,7 +25,6 @@ import java.net.SocketTimeoutException; import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; @@ -84,7 +83,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema; import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest; @@ -522,7 +520,7 @@ public class HBaseAdmin implements Abortable, Closeable { } } - execute(new MasterAdminCallable() { + executeCallable(new MasterAdminCallable(getConnection()) { @Override public Void call() throws ServiceException { CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys); @@ -555,7 +553,7 @@ public class HBaseAdmin implements Abortable, Closeable { HRegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); boolean tableExists = true; - execute(new MasterAdminCallable() { + executeCallable(new MasterAdminCallable(getConnection()) { @Override public Void call() throws ServiceException { DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName); @@ -753,7 +751,7 @@ public class HBaseAdmin implements Abortable, Closeable { public void enableTableAsync(final byte [] tableName) throws IOException { HTableDescriptor.isLegalTableName(tableName); - execute(new MasterAdminCallable() { + executeCallable(new MasterAdminCallable(getConnection()) { @Override public Void call() throws ServiceException { LOG.info("Started enable of " + Bytes.toString(tableName)); @@ -824,7 +822,7 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void disableTableAsync(final byte [] tableName) throws IOException { HTableDescriptor.isLegalTableName(tableName); - execute(new MasterAdminCallable() { + executeCallable(new MasterAdminCallable(getConnection()) { @Override public Void call() throws ServiceException { LOG.info("Started disable of " + Bytes.toString(tableName)); @@ -1031,7 +1029,7 @@ public class HBaseAdmin implements Abortable, Closeable { public Pair getAlterStatus(final byte[] tableName) throws IOException { HTableDescriptor.isLegalTableName(tableName); - return execute(new MasterMonitorCallable>() { + return executeCallable(new MasterMonitorCallable>(getConnection()) { @Override public Pair call() throws ServiceException { GetSchemaAlterStatusRequest req = RequestConverter @@ -1067,7 +1065,7 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void addColumn(final byte [] tableName, final HColumnDescriptor column) throws IOException { - execute(new MasterAdminCallable() { + executeCallable(new MasterAdminCallable(getConnection()) { @Override public Void call() throws ServiceException { AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, column); @@ -1100,7 +1098,7 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void deleteColumn(final byte [] tableName, final byte [] columnName) throws IOException { - execute(new MasterAdminCallable() { + executeCallable(new MasterAdminCallable(getConnection()) { @Override public Void call() throws ServiceException { DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(tableName, columnName); @@ -1135,7 +1133,7 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void modifyColumn(final byte [] tableName, final HColumnDescriptor descriptor) throws IOException { - execute(new MasterAdminCallable() { + executeCallable(new MasterAdminCallable(getConnection()) { @Override public Void call() throws ServiceException { ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(tableName, descriptor); @@ -1542,7 +1540,7 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void assign(final byte[] regionName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { - execute(new MasterAdminCallable() { + executeCallable(new MasterAdminCallable(getConnection()) { @Override public Void call() throws ServiceException { AssignRegionRequest request = RequestConverter.buildAssignRegionRequest(regionName); @@ -1568,7 +1566,7 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void unassign(final byte [] regionName, final boolean force) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { - execute(new MasterAdminCallable() { + executeCallable(new MasterAdminCallable(getConnection()) { @Override public Void call() throws ServiceException { UnassignRegionRequest request = @@ -1817,7 +1815,7 @@ public class HBaseAdmin implements Abortable, Closeable { "' doesn't match with the HTD one: " + htd.getNameAsString()); } - execute(new MasterAdminCallable() { + executeCallable(new MasterAdminCallable(getConnection()) { @Override public Void call() throws ServiceException { ModifyTableRequest request = RequestConverter.buildModifyTableRequest(tableName, htd); @@ -1890,7 +1888,7 @@ public class HBaseAdmin implements Abortable, Closeable { * @throws IOException if a remote or network exception occurs */ public synchronized void shutdown() throws IOException { - execute(new MasterAdminCallable() { + executeCallable(new MasterAdminCallable(getConnection()) { @Override public Void call() throws ServiceException { masterAdmin.shutdown(null,ShutdownRequest.newBuilder().build()); @@ -1906,7 +1904,7 @@ public class HBaseAdmin implements Abortable, Closeable { * @throws IOException if a remote or network exception occurs */ public synchronized void stopMaster() throws IOException { - execute(new MasterAdminCallable() { + executeCallable(new MasterAdminCallable(getConnection()) { @Override public Void call() throws ServiceException { masterAdmin.stopMaster(null,StopMasterRequest.newBuilder().build()); @@ -1942,7 +1940,7 @@ public class HBaseAdmin implements Abortable, Closeable { * @throws IOException if a remote or network exception occurs */ public ClusterStatus getClusterStatus() throws IOException { - return execute(new MasterMonitorCallable() { + return executeCallable(new MasterMonitorCallable(getConnection()) { @Override public ClusterStatus call() throws ServiceException { GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest(); @@ -2301,7 +2299,7 @@ public class HBaseAdmin implements Abortable, Closeable { Thread.currentThread().interrupt(); } LOG.debug("Getting current status of snapshot from master..."); - done = execute(new MasterAdminCallable() { + done = executeCallable(new MasterAdminCallable(getConnection()) { @Override public IsSnapshotDoneResponse call() throws ServiceException { return masterAdmin.isSnapshotDone(null, request); @@ -2330,7 +2328,7 @@ public class HBaseAdmin implements Abortable, Closeable { final TakeSnapshotRequest request = TakeSnapshotRequest.newBuilder().setSnapshot(snapshot) .build(); // run the snapshot on the master - return execute(new MasterAdminCallable() { + return executeCallable(new MasterAdminCallable(getConnection()) { @Override public TakeSnapshotResponse call() throws ServiceException { return masterAdmin.snapshot(null, request); @@ -2361,7 +2359,7 @@ public class HBaseAdmin implements Abortable, Closeable { public boolean isSnapshotFinished(final SnapshotDescription snapshot) throws IOException, HBaseSnapshotException, UnknownSnapshotException { - return execute(new MasterAdminCallable() { + return executeCallable(new MasterAdminCallable(getConnection()) { @Override public IsSnapshotDoneResponse call() throws ServiceException { return masterAdmin.isSnapshotDone(null, @@ -2503,7 +2501,8 @@ public class HBaseAdmin implements Abortable, Closeable { Thread.currentThread().interrupt(); } LOG.debug("Getting current status of snapshot restore from master..."); - done = execute(new MasterAdminCallable() { + done = executeCallable(new MasterAdminCallable( + getConnection()) { @Override public IsRestoreSnapshotDoneResponse call() throws ServiceException { return masterAdmin.isRestoreSnapshotDone(null, request); @@ -2533,7 +2532,7 @@ public class HBaseAdmin implements Abortable, Closeable { .build(); // run the snapshot restore on the master - return execute(new MasterAdminCallable() { + return executeCallable(new MasterAdminCallable(getConnection()) { @Override public RestoreSnapshotResponse call() throws ServiceException { return masterAdmin.restoreSnapshot(null, request); @@ -2547,7 +2546,7 @@ public class HBaseAdmin implements Abortable, Closeable { * @throws IOException if a network error occurs */ public List listSnapshots() throws IOException { - return execute(new MasterAdminCallable>() { + return executeCallable(new MasterAdminCallable>(getConnection()) { @Override public List call() throws ServiceException { return masterAdmin.getCompletedSnapshots(null, ListSnapshotRequest.newBuilder().build()) @@ -2603,13 +2602,12 @@ public class HBaseAdmin implements Abortable, Closeable { // make sure the snapshot is possibly valid HTableDescriptor.isLegalTableName(Bytes.toBytes(snapshotName)); // do the delete - execute(new MasterAdminCallable() { + executeCallable(new MasterAdminCallable(getConnection()) { @Override public Void call() throws ServiceException { - masterAdmin.deleteSnapshot( - null, - DeleteSnapshotRequest.newBuilder() - .setSnapshot(SnapshotDescription.newBuilder().setName(snapshotName).build()).build()); + masterAdmin.deleteSnapshot(null, + DeleteSnapshotRequest.newBuilder(). + setSnapshot(SnapshotDescription.newBuilder().setName(snapshotName).build()).build()); return null; } }); @@ -2633,75 +2631,88 @@ public class HBaseAdmin implements Abortable, Closeable { List snapshots = listSnapshots(pattern); for (final SnapshotDescription snapshot : snapshots) { // do the delete - execute(new MasterAdminCallable() { + executeCallable(new MasterAdminCallable(getConnection()) { @Override public Void call() throws ServiceException { - masterAdmin.deleteSnapshot( - null, + this.masterAdmin.deleteSnapshot(null, DeleteSnapshotRequest.newBuilder().setSnapshot(snapshot).build()); return null; } }); } } - + /** * @see {@link #execute(MasterAdminCallable)} */ - private abstract static class MasterAdminCallable implements Callable{ + abstract static class MasterAdminCallable extends MasterCallable { protected MasterAdminKeepAliveConnection masterAdmin; + private final HConnection connection; + + public MasterAdminCallable(final HConnection connection) { + this.connection = connection; + } + + @Override + public void prepare(boolean reload) throws IOException { + this.masterAdmin = this.connection.getKeepAliveMasterAdminService(); + } + + @Override + public void close() throws IOException { + this.masterAdmin.close(); + } } /** * @see {@link #execute(MasterMonitorCallable)} */ - private abstract static class MasterMonitorCallable implements Callable { + abstract static class MasterMonitorCallable extends MasterCallable { protected MasterMonitorKeepAliveConnection masterMonitor; - } + private final HConnection connection; - /** - * This method allows to execute a function requiring a connection to - * master without having to manage the connection creation/close. - * Create a {@link MasterAdminCallable} to use it. - */ - private V execute(MasterAdminCallable function) throws IOException { - function.masterAdmin = connection.getKeepAliveMasterAdminService(); - try { - return executeCallable(function); - } finally { - function.masterAdmin.close(); + public MasterMonitorCallable(final HConnection connection) { + this.connection = connection; + } + + @Override + public void prepare(boolean reload) throws IOException { + this.masterMonitor = this.connection.getKeepAliveMasterMonitorService(); + } + + @Override + public void close() throws IOException { + this.masterMonitor.close(); } } /** - * This method allows to execute a function requiring a connection to - * master without having to manage the connection creation/close. - * Create a {@link MasterAdminCallable} to use it. + * Parent of {@link MasterMonitorCallable} and {@link MasterAdminCallable}. + * Has common methods. + * @param */ - private V execute(MasterMonitorCallable function) throws IOException { - function.masterMonitor = connection.getKeepAliveMasterMonitorService(); - try { - return executeCallable(function); - } finally { - function.masterMonitor.close(); + abstract static class MasterCallable implements RetryingCallable, Closeable { + @Override + public void throwable(Throwable t, boolean retrying) { + } + + @Override + public String getExceptionMessageAdditionalDetail() { + return ""; + } + + @Override + public long sleep(long pause, int tries) { + return ConnectionUtils.getPauseTime(pause, tries); } } - /** - * Helper function called by other execute functions. - */ - private V executeCallable(Callable function) throws IOException { + private V executeCallable(MasterCallable callable) throws IOException { + RpcRetryingCaller caller = new RpcRetryingCaller(); try { - return function.call(); - } catch (RemoteException re) { - throw re.unwrapRemoteException(); - } catch (IOException e) { - throw e; - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } catch (Exception e) { - // This should not happen... - throw new IOException("Unexpected exception when calling master", e); + return caller.callWithRetries(callable, getConfiguration()); + } finally { + callable.close(); } } @@ -2729,4 +2740,4 @@ public class HBaseAdmin implements Abortable, Closeable { public CoprocessorRpcChannel coprocessorService() { return new MasterCoprocessorRpcChannel(connection); } -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java index 1e6356f1490..1e7c52a2433 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java @@ -59,6 +59,12 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMoni @InterfaceAudience.Public @InterfaceStability.Stable public interface HConnection extends Abortable, Closeable { + /** + * Key for configuration in Configuration whose value is the class we implement making a + * new HConnection instance. + */ + public static final String HBASE_CLIENT_CONNECTION_IMPL = "hbase.client.connection.impl"; + /** * @return Configuration instance being used by this HConnection instance. */ @@ -269,34 +275,6 @@ public interface HConnection extends Abortable, Closeable { boolean reload) throws IOException; - /** - * Pass in a ServerCallable with your particular bit of logic defined and - * this method will manage the process of doing retries with timed waits - * and refinds of missing regions. - * - * @param the type of the return value - * @param callable callable to run - * @return an object of type T - * @throws IOException if a remote or network exception occurs - * @throws RuntimeException other unspecified error - */ - @Deprecated - T getRegionServerWithRetries(ServerCallable callable) - throws IOException, RuntimeException; - - /** - * Pass in a ServerCallable with your particular bit of logic defined and - * this method will pass it to the defined region server. - * @param the type of the return value - * @param callable callable to run - * @return an object of type T - * @throws IOException if a remote or network exception occurs - * @throws RuntimeException other unspecified error - */ - @Deprecated - T getRegionServerWithoutRetries(ServerCallable callable) - throws IOException, RuntimeException; - /** * Process a mixed batch of Get, Put and Delete actions. All actions for a * RegionServer are forwarded in one RPC call. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 1ce14d7ece2..da71697ffa3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -1954,18 +1954,6 @@ public class HConnectionManager { } } - @Override - public T getRegionServerWithRetries(ServerCallable callable) - throws IOException, RuntimeException { - return callable.withRetries(); - } - - @Override - public T getRegionServerWithoutRetries(ServerCallable callable) - throws IOException, RuntimeException { - return callable.withoutRetries(); - } - void updateCachedLocation(HRegionInfo hri, HRegionLocation source, ServerName serverName, long seqNum) { HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionWrapper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionWrapper.java index eeb94c5e256..f74cc47da9c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionWrapper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionWrapper.java @@ -222,18 +222,6 @@ public class HConnectionWrapper implements HConnection { return hconnection.getRegionLocation(tableName, row, reload); } - @Override - public T getRegionServerWithRetries(ServerCallable callable) - throws IOException, RuntimeException { - return hconnection.getRegionServerWithRetries(callable); - } - - @Override - public T getRegionServerWithoutRetries(ServerCallable callable) - throws IOException, RuntimeException { - return hconnection.getRegionServerWithoutRetries(callable); - } - @Override public void processBatch(List actions, byte[] tableName, ExecutorService pool, Object[] results) throws IOException, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 6a2a2e085a6..e4e4dd57773 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -592,12 +592,15 @@ public class HTable implements HTableInterface { @Override public Result getRowOrBefore(final byte[] row, final byte[] family) throws IOException { - return new ServerCallable(connection, tableName, row, operationTimeout) { + RegionServerCallable callable = new RegionServerCallable(this.connection, + tableName, row) { public Result call() throws IOException { - return ProtobufUtil.getRowOrBefore(stub, - location.getRegionInfo().getRegionName(), row, family); + return ProtobufUtil.getRowOrBefore(getStub(), + getLocation().getRegionInfo().getRegionName(), row, family); } - }.withRetries(); + }; + return new RpcRetryingCaller(). + callWithRetries(callable, getConfiguration(), this.operationTimeout); } /** @@ -637,12 +640,14 @@ public class HTable implements HTableInterface { */ @Override public Result get(final Get get) throws IOException { - return new ServerCallable(connection, tableName, get.getRow(), operationTimeout) { - public Result call() throws IOException { - return ProtobufUtil.get(stub, - location.getRegionInfo().getRegionName(), get); - } - }.withRetries(); + RegionServerCallable callable = new RegionServerCallable(this.connection, + getTableName(), get.getRow()) { + public Result call() throws IOException { + return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get); + } + }; + return new RpcRetryingCaller(). + callWithRetries(callable, getConfiguration(), this.operationTimeout); } /** @@ -704,18 +709,21 @@ public class HTable implements HTableInterface { @Override public void delete(final Delete delete) throws IOException { - new ServerCallable(connection, tableName, delete.getRow(), operationTimeout) { - public Boolean call() throws IOException { - try { - MutateRequest request = RequestConverter.buildMutateRequest( - location.getRegionInfo().getRegionName(), delete); - MutateResponse response = stub.mutate(null, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }.withRetries(); + RegionServerCallable callable = new RegionServerCallable(connection, + tableName, delete.getRow()) { + public Boolean call() throws IOException { + try { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), delete); + MutateResponse response = getStub().mutate(null, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; + new RpcRetryingCaller(). + callWithRetries(callable, getConfiguration(), this.operationTimeout); } /** @@ -838,19 +846,21 @@ public class HTable implements HTableInterface { */ @Override public void mutateRow(final RowMutations rm) throws IOException { - new ServerCallable(connection, tableName, rm.getRow(), - operationTimeout) { + RegionServerCallable callable = + new RegionServerCallable(connection, getTableName(), rm.getRow()) { public Void call() throws IOException { try { MultiRequest request = RequestConverter.buildMultiRequest( - location.getRegionInfo().getRegionName(), rm); - stub.multi(null, request); + getLocation().getRegionInfo().getRegionName(), rm); + getStub().multi(null, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } return null; } - }.withRetries(); + }; + new RpcRetryingCaller(). + callWithRetries(callable, getConfiguration(), this.operationTimeout); } /** @@ -862,21 +872,23 @@ public class HTable implements HTableInterface { throw new IOException( "Invalid arguments to append, no columns specified"); } - return new ServerCallable(connection, tableName, append.getRow(), operationTimeout) { - public Result call() throws IOException { - try { - MutateRequest request = RequestConverter.buildMutateRequest( - location.getRegionInfo().getRegionName(), append); - PayloadCarryingRpcController rpcController = - new PayloadCarryingRpcController(); - MutateResponse response = stub.mutate(rpcController, request); - if (!response.hasResult()) return null; - return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + RegionServerCallable callable = + new RegionServerCallable(this.connection, getTableName(), append.getRow()) { + public Result call() throws IOException { + try { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), append); + PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController(); + MutateResponse response = getStub().mutate(rpcController, request); + if (!response.hasResult()) return null; + return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } - }.withRetries(); + } + }; + return new RpcRetryingCaller(). + callWithRetries(callable, getConfiguration(), this.operationTimeout); } /** @@ -888,19 +900,22 @@ public class HTable implements HTableInterface { throw new IOException( "Invalid arguments to increment, no columns specified"); } - return new ServerCallable(connection, tableName, increment.getRow(), operationTimeout) { - public Result call() throws IOException { - try { - MutateRequest request = RequestConverter.buildMutateRequest( - location.getRegionInfo().getRegionName(), increment); - PayloadCarryingRpcController rpcContoller = new PayloadCarryingRpcController(); - MutateResponse response = stub.mutate(rpcContoller, request); - return ProtobufUtil.toResult(response.getResult(), rpcContoller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + RegionServerCallable callable = new RegionServerCallable(this.connection, + getTableName(), increment.getRow()) { + public Result call() throws IOException { + try { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), increment); + PayloadCarryingRpcController rpcContoller = new PayloadCarryingRpcController(); + MutateResponse response = getStub().mutate(rpcContoller, request); + return ProtobufUtil.toResult(response.getResult(), rpcContoller.cellScanner()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } - }.withRetries(); + } + }; + return new RpcRetryingCaller(). + callWithRetries(callable, getConfiguration(), this.operationTimeout); } /** @@ -932,22 +947,26 @@ public class HTable implements HTableInterface { throw new IOException( "Invalid arguments to incrementColumnValue", npe); } - return new ServerCallable(connection, tableName, row, operationTimeout) { - public Long call() throws IOException { - try { - MutateRequest request = RequestConverter.buildMutateRequest( - location.getRegionInfo().getRegionName(), row, family, - qualifier, amount, durability); - PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController(); - MutateResponse response = stub.mutate(rpcController, request); - Result result = - ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner()); - return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + + RegionServerCallable callable = + new RegionServerCallable(connection, getTableName(), row) { + public Long call() throws IOException { + try { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, + qualifier, amount, durability); + PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController(); + MutateResponse response = getStub().mutate(rpcController, request); + Result result = + ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner()); + return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } - }.withRetries(); + } + }; + return new RpcRetryingCaller(). + callWithRetries(callable, getConfiguration(), this.operationTimeout); } /** @@ -958,19 +977,22 @@ public class HTable implements HTableInterface { final byte [] family, final byte [] qualifier, final byte [] value, final Put put) throws IOException { - return new ServerCallable(connection, tableName, row, operationTimeout) { - public Boolean call() throws IOException { - try { - MutateRequest request = RequestConverter.buildMutateRequest( - location.getRegionInfo().getRegionName(), row, family, qualifier, + RegionServerCallable callable = + new RegionServerCallable(connection, getTableName(), row) { + public Boolean call() throws IOException { + try { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, new BinaryComparator(value), CompareType.EQUAL, put); - MutateResponse response = stub.mutate(null, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + MutateResponse response = getStub().mutate(null, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } - }.withRetries(); + } + }; + return new RpcRetryingCaller(). + callWithRetries(callable, getConfiguration(), this.operationTimeout); } @@ -982,19 +1004,22 @@ public class HTable implements HTableInterface { final byte [] family, final byte [] qualifier, final byte [] value, final Delete delete) throws IOException { - return new ServerCallable(connection, tableName, row, operationTimeout) { - public Boolean call() throws IOException { - try { - MutateRequest request = RequestConverter.buildMutateRequest( - location.getRegionInfo().getRegionName(), row, family, qualifier, + RegionServerCallable callable = + new RegionServerCallable(connection, getTableName(), row) { + public Boolean call() throws IOException { + try { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, new BinaryComparator(value), CompareType.EQUAL, delete); - MutateResponse response = stub.mutate(null, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + MutateResponse response = getStub().mutate(null, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } - }.withRetries(); + } + }; + return new RpcRetryingCaller(). + callWithRetries(callable, getConfiguration(), this.operationTimeout); } /** @@ -1002,19 +1027,21 @@ public class HTable implements HTableInterface { */ @Override public boolean exists(final Get get) throws IOException { - return new ServerCallable(connection, tableName, get.getRow(), operationTimeout) { - public Boolean call() throws IOException { - try { - GetRequest request = RequestConverter.buildGetRequest( - location.getRegionInfo().getRegionName(), get, true); - - GetResponse response = stub.get(null, request); - return response.getExists(); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }.withRetries(); + RegionServerCallable callable = + new RegionServerCallable(connection, getTableName(), get.getRow()) { + public Boolean call() throws IOException { + try { + GetRequest request = RequestConverter.buildGetRequest( + getLocation().getRegionInfo().getRegionName(), get, true); + GetResponse response = getStub().get(null, request); + return response.getExists(); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; + return new RpcRetryingCaller(). + callWithRetries(callable, getConfiguration(), this.operationTimeout); } /** @@ -1105,19 +1132,23 @@ public class HTable implements HTableInterface { for (final Map.Entry> getsByRegionEntry : getsByRegion.entrySet()) { Callable> callable = new Callable>() { public List call() throws Exception { - return new ServerCallable>(connection, tableName, getsByRegionEntry.getValue() - .get(0).getRow(), operationTimeout) { + RegionServerCallable> callable = + new RegionServerCallable>(connection, getTableName(), + getsByRegionEntry.getValue().get(0).getRow()) { public List call() throws IOException { try { - MultiGetRequest requests = RequestConverter.buildMultiGetRequest(location - .getRegionInfo().getRegionName(), getsByRegionEntry.getValue(), true, false); - MultiGetResponse responses = stub.multiGet(null, requests); + MultiGetRequest requests = RequestConverter.buildMultiGetRequest( + getLocation().getRegionInfo().getRegionName(), getsByRegionEntry.getValue(), + true, false); + MultiGetResponse responses = getStub().multiGet(null, requests); return responses.getExistsList(); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } } - }.withRetries(); + }; + return new RpcRetryingCaller>(). + callWithRetries(callable, getConfiguration(), operationTimeout); } }; futures.put(getsByRegionEntry.getKey(), pool.submit(callable)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 45157ccfbe8..8505c1e3383 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.HRegionLocation; @@ -36,19 +35,22 @@ import com.google.protobuf.ServiceException; /** * Callable that handles the multi method call going against a single - * regionserver; i.e. A {@link ServerCallable} for the multi call (It is not a - * {@link Callable} that goes against multiple regions. + * regionserver; i.e. A {@link RegionServerCallable} for the multi call (It is not a + * {@link RegionServerCallable} that goes against multiple regions. * @param */ -class MultiServerCallable extends ServerCallable { +class MultiServerCallable extends RegionServerCallable { private final MultiAction multi; - private final HRegionLocation loc; MultiServerCallable(final HConnection connection, final byte [] tableName, - final HRegionLocation loc, final MultiAction multi) { + final HRegionLocation location, final MultiAction multi) { super(connection, tableName, null); this.multi = multi; - this.loc = loc; + setLocation(location); + } + + MultiAction getMulti() { + return this.multi; } @Override @@ -74,7 +76,7 @@ class MultiServerCallable extends ServerCallable { RequestConverter.buildNoDataMultiRequest(regionName, rms, cells); // Carry the cells over the proxy/pb Service interface using the payload carrying // rpc controller. - stub.multi(new PayloadCarryingRpcController(cells), multiRequest); + getStub().multi(new PayloadCarryingRpcController(cells), multiRequest); // This multi call does not return results. response.add(regionName, action.getOriginalIndex(), Result.EMPTY_RESULT); } catch (ServiceException se) { @@ -99,7 +101,7 @@ class MultiServerCallable extends ServerCallable { // Controller optionally carries cell data over the proxy/service boundary and also // optionally ferries cell response data back out again. PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells); - ClientProtos.MultiResponse responseProto = stub.multi(controller, multiRequest); + ClientProtos.MultiResponse responseProto = getStub().multi(controller, multiRequest); results = ResponseConverter.getResults(responseProto, controller.cellScanner()); } catch (ServiceException se) { ex = ProtobufUtil.getRemoteException(se); @@ -115,6 +117,7 @@ class MultiServerCallable extends ServerCallable { @Override public void prepare(boolean reload) throws IOException { - stub = connection.getClient(loc.getServerName()); + // Use the location we were given in the constructor rather than go look it up. + setStub(getConnection().getClient(getLocation().getServerName())); } -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionOfflineException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionOfflineException.java index e1cbe00ce51..cca8741726c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionOfflineException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionOfflineException.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.exceptions.RegionException; /** Thrown when a table can not be located */ @InterfaceAudience.Public -@InterfaceStability.Stable +@InterfaceStability.Evolving public class RegionOfflineException extends RegionException { private static final long serialVersionUID = 466008402L; /** default constructor */ @@ -36,4 +36,4 @@ public class RegionOfflineException extends RegionException { public RegionOfflineException(String s) { super(s); } -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java new file mode 100644 index 00000000000..b965dbd2071 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -0,0 +1,145 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.exceptions.NotServingRegionException; +import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Implementations call a RegionServer and implement {@link #call()}. + * Passed to a {@link RpcRetryingCaller} so we retry on fail. + * @param the class that the ServerCallable handles + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public abstract class RegionServerCallable implements RetryingCallable { + // Public because used outside of this package over in ipc. + static final Log LOG = LogFactory.getLog(RegionServerCallable.class); + private final HConnection connection; + private final byte [] tableName; + private final byte [] row; + private HRegionLocation location; + private ClientService.BlockingInterface stub; + + protected final static int MIN_WAIT_DEAD_SERVER = 10000; + + /** + * @param connection Connection to use. + * @param tableName Table name to which row belongs. + * @param row The row we want in tableName. + */ + public RegionServerCallable(HConnection connection, byte [] tableName, byte [] row) { + this.connection = connection; + this.tableName = tableName; + this.row = row; + } + + /** + * Prepare for connection to the server hosting region with row from tablename. Does lookup + * to find region location and hosting server. + * @param reload Set this to true if connection should re-find the region + * @throws IOException e + */ + public void prepare(final boolean reload) throws IOException { + this.location = connection.getRegionLocation(tableName, row, reload); + if (this.location == null) { + throw new IOException("Failed to find location, tableName=" + Bytes.toString(tableName) + + ", row=" + Bytes.toString(row) + ", reload=" + reload); + } + setStub(getConnection().getClient(getLocation().getServerName())); + } + + /** + * @return {@link HConnection} instance used by this Callable. + */ + HConnection getConnection() { + return this.connection; + } + + protected ClientService.BlockingInterface getStub() { + return this.stub; + } + + void setStub(final ClientService.BlockingInterface stub) { + this.stub = stub; + } + + protected HRegionLocation getLocation() { + return this.location; + } + + protected void setLocation(final HRegionLocation location) { + this.location = location; + } + + public byte [] getTableName() { + return this.tableName; + } + + public byte [] getRow() { + return this.row; + } + + @Override + public void throwable(Throwable t, boolean retrying) { + if (t instanceof SocketTimeoutException || + t instanceof ConnectException || + t instanceof RetriesExhaustedException || + (location != null && getConnection().isDeadServer(location.getServerName()))) { + // if thrown these exceptions, we clear all the cache entries that + // map to that slow/dead server; otherwise, let cache miss and ask + // .META. again to find the new location + getConnection().clearCaches(location.getServerName()); + } else if (t instanceof RegionMovedException) { + getConnection().updateCachedLocations(tableName, row, t, location); + } else if (t instanceof NotServingRegionException && !retrying) { + // Purge cache entries for this specific region from META cache + // since we don't call connect(true) when number of retries is 1. + getConnection().deleteCachedRegionLocation(location); + } + } + + @Override + public String getExceptionMessageAdditionalDetail() { + return "row '" + Bytes.toString(row) + "' on table '" + Bytes.toString(tableName); + } + + @Override + public long sleep(long pause, int tries) { + // Tries hasn't been bumped up yet so we use "tries + 1" to get right pause time + long sleep = ConnectionUtils.getPauseTime(pause, tries + 1); + if (sleep < MIN_WAIT_DEAD_SERVER + && (location == null || getConnection().isDeadServer(location.getServerName()))) { + sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f); + } + return sleep; + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java index f28fcd58cff..2bf16eb6397 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceStability; import java.io.IOException; import java.util.Date; import java.util.List; +import java.util.concurrent.Callable; /** * Exception thrown by HTable methods when an attempt to do something (like @@ -61,7 +62,7 @@ public class RetriesExhaustedException extends IOException { /** * Create a new RetriesExhaustedException from the list of prior failures. - * @param callableVitals Details from the {@link ServerCallable} we were using + * @param callableVitals Details from the Callable we were using * when we got this exception. * @param numTries The number of tries we made * @param exceptions List of exceptions that failed before giving up diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java new file mode 100644 index 00000000000..e13a4ba3341 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java @@ -0,0 +1,66 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.Callable; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A Callable that will be retried. If {@link #call()} invocation throws exceptions, + * we will call {@link #throwable(Throwable, boolean)} with whatever the exception was. + * @param + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public interface RetryingCallable extends Callable { + /** + * Prepare by setting up any connections to servers, etc., ahead of {@link #call()} invocation. + * @param reload Set this to true if need to requery locations (usually set on second invocation + * to {@link #call()} or whatever + * @throws IOException e + */ + void prepare(final boolean reload) throws IOException; + + /** + * Called when {@link #call()} throws an exception and we are going to retry; take action to + * make it so we succeed on next call (clear caches, do relookup of locations, etc.). + * @param t + * @param retrying True if we are in retrying mode (we are not in retrying mode when max + * retries == 1; we ARE in retrying mode if retries > 1 even when we are the last attempt) + */ + void throwable(final Throwable t, boolean retrying); + + /** + * @return Some details from the implementation that we would like to add to a terminating + * exception; i.e. a fatal exception is being thrown ending retries and we might like to add + * more implementation-specific detail on to the exception being thrown. + */ + String getExceptionMessageAdditionalDetail(); + + /** + * @param pause + * @param tries + * @return Suggestion on how much to sleep between retries + */ + long sleep(final long pause, final int tries); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java new file mode 100644 index 00000000000..b6d022883bf --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -0,0 +1,228 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.ipc.RemoteException; + +import com.google.protobuf.ServiceException; + +/** + * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client + * threadlocal outstanding timeouts as so we don't persist too much. + * Dynamic rather than static so can set the generic appropriately. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class RpcRetryingCaller { + static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class); + /** + * Timeout for the call including retries + */ + private int callTimeout; + /** + * When we started making calls. + */ + private long globalStartTime; + /** + * Start and end times for a single call. + */ + private long startTime, endTime; + private final static int MIN_RPC_TIMEOUT = 2000; + + public RpcRetryingCaller() { + super(); + } + + private void beforeCall() { + this.startTime = EnvironmentEdgeManager.currentTimeMillis(); + int remaining = (int)(callTimeout - (this.startTime - this.globalStartTime)); + if (remaining < MIN_RPC_TIMEOUT) { + // If there is no time left, we're trying anyway. It's too late. + // 0 means no timeout, and it's not the intent here. So we secure both cases by + // resetting to the minimum. + remaining = MIN_RPC_TIMEOUT; + } + RpcClient.setRpcTimeout(remaining); + } + + private void afterCall() { + RpcClient.resetRpcTimeout(); + this.endTime = EnvironmentEdgeManager.currentTimeMillis(); + } + + public synchronized T callWithRetries(RetryingCallable callable, final Configuration conf) + throws IOException, RuntimeException { + return callWithRetries(callable, conf, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + } + + public synchronized T callWithRetries(RetryingCallable callable, final Configuration conf, + final int callTimeout) + throws IOException, RuntimeException { + final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + final int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + return callWithRetries(callable, callTimeout, pause, numRetries); + } + + /** + * Retries if invocation fails. + * @param conf + * @param callTimeout Timeout for this call + * @param callable The {@link RetryingCallable} to run. + * @return an object of type T + * @throws IOException if a remote or network exception occurs + * @throws RuntimeException other unspecified error + */ + synchronized T callWithRetries(RetryingCallable callable, int callTimeout, final long pause, + final int retries) + throws IOException, RuntimeException { + this.callTimeout = callTimeout; + List exceptions = + new ArrayList(); + this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); + for (int tries = 0;; tries++) { + long expectedSleep = 0; + try { + beforeCall(); + callable.prepare(tries != 0); // if called with false, check table status on ZK + return callable.call(); + } catch (Throwable t) { + LOG.warn("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime=" + + (this.globalStartTime - System.currentTimeMillis()) + "ms", t); + // translateException throws exception when should not retry: i.e. when request is bad. + t = translateException(t); + callable.throwable(t, retries != 1); + RetriesExhaustedException.ThrowableWithExtraContext qt = + new RetriesExhaustedException.ThrowableWithExtraContext(t, + EnvironmentEdgeManager.currentTimeMillis(), toString()); + exceptions.add(qt); + if (tries >= retries - 1) { + throw new RetriesExhaustedException(tries, exceptions); + } + // If the server is dead, we need to wait a little before retrying, to give + // a chance to the regions to be + // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time + expectedSleep = callable.sleep(pause, tries + 1); + + // If, after the planned sleep, there won't be enough time left, we stop now. + long duration = singleCallDuration(expectedSleep); + if (duration > this.callTimeout) { + String msg = "callTimeout=" + this.callTimeout + ", callDuration=" + duration + + ": " + callable.getExceptionMessageAdditionalDetail(); + throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t)); + } + } finally { + afterCall(); + } + try { + Thread.sleep(expectedSleep); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries); + } + } + } + + /** + * @param expectedSleep + * @return Calculate how long a single call took + */ + private long singleCallDuration(final long expectedSleep) { + return (this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep; + } + + /** + * Call the server once only. + * {@link RetryingCallable} has a strange shape so we can do retrys. Use this invocation if you + * want to do a single call only (A call to {@link RetryingCallable#call()} will not likely + * succeed). + * @return an object of type T + * @throws IOException if a remote or network exception occurs + * @throws RuntimeException other unspecified error + */ + public T callWithoutRetries(RetryingCallable callable) + throws IOException, RuntimeException { + // The code of this method should be shared with withRetries. + this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); + try { + beforeCall(); + callable.prepare(false); + return callable.call(); + } catch (Throwable t) { + Throwable t2 = translateException(t); + // It would be nice to clear the location cache here. + if (t2 instanceof IOException) { + throw (IOException)t2; + } else { + throw new RuntimeException(t2); + } + } finally { + afterCall(); + } + } + + /** + * Get the good or the remote exception if any, throws the DoNotRetryIOException. + * @param t the throwable to analyze + * @return the translated exception, if it's not a DoNotRetryIOException + * @throws DoNotRetryIOException - if we find it, we throw it instead of translating. + */ + static Throwable translateException(Throwable t) throws DoNotRetryIOException { + if (t instanceof UndeclaredThrowableException) { + if (t.getCause() != null) { + t = t.getCause(); + } + } + if (t instanceof RemoteException) { + t = ((RemoteException)t).unwrapRemoteException(); + } + if (t instanceof ServiceException) { + ServiceException se = (ServiceException)t; + Throwable cause = se.getCause(); + if (cause != null && cause instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException)cause; + } + // Don't let ServiceException out; its rpc specific. + t = cause; + // t could be a RemoteException so go aaround again. + translateException(t); + } else if (t instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException)t; + } + return t; + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index dad125acec0..26bbded9384 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -49,12 +49,13 @@ import java.io.IOException; import java.net.UnknownHostException; /** - * Retries scanner operations such as create, next, etc. - * Used by {@link ResultScanner}s made by {@link HTable}. + * Scanner operations such as create, next, etc. + * Used by {@link ResultScanner}s made by {@link HTable}. Passed to a retrying caller such as + * {@link RpcRetryingCaller} so fails are retried. */ @InterfaceAudience.Public @InterfaceStability.Stable -public class ScannerCallable extends ServerCallable { +public class ScannerCallable extends RegionServerCallable { public static final String LOG_SCANNER_LATENCY_CUTOFF = "hbase.client.log.scanner.latency.cutoff"; public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity"; @@ -126,7 +127,7 @@ public class ScannerCallable extends ServerCallable { * to decide if hbase client connects to a remote region server */ private void checkIfRegionServerIsRemote() { - if (this.location.getHostname().equalsIgnoreCase(myAddress)) { + if (getLocation().getHostname().equalsIgnoreCase(myAddress)) { isRegionServerRemote = false; } else { isRegionServerRemote = true; @@ -154,7 +155,7 @@ public class ScannerCallable extends ServerCallable { ScanResponse response = null; PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); try { - response = stub.scan(controller, request); + response = getStub().scan(controller, request); // Client and RS maintain a nextCallSeq number during the scan. Every next() call // from client to server will increment this number in both sides. Client passes this // number along with the request and at RS side both the incoming nextCallSeq and its @@ -198,7 +199,7 @@ public class ScannerCallable extends ServerCallable { if (logScannerActivity && (ioe instanceof UnknownScannerException)) { try { HRegionLocation location = - connection.relocateRegion(tableName, scan.getStartRow()); + getConnection().relocateRegion(getTableName(), scan.getStartRow()); LOG.info("Scanner=" + scannerId + " expired, current region location is " + location.toString() + " ip:" + location.getHostnamePort()); @@ -270,7 +271,7 @@ public class ScannerCallable extends ServerCallable { ScanRequest request = RequestConverter.buildScanRequest(this.scannerId, 0, true); try { - stub.scan(null, request); + getStub().scan(null, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -284,15 +285,15 @@ public class ScannerCallable extends ServerCallable { incRPCcallsMetrics(); ScanRequest request = RequestConverter.buildScanRequest( - this.location.getRegionInfo().getRegionName(), + getLocation().getRegionInfo().getRegionName(), this.scan, 0, false); try { - ScanResponse response = stub.scan(null, request); + ScanResponse response = getStub().scan(null, request); long id = response.getScannerId(); if (logScannerActivity) { LOG.info("Open scanner=" + id + " for scan=" + scan.toString() - + " on region " + this.location.toString() + " ip:" - + this.location.getHostnamePort()); + + " on region " + getLocation().toString() + " ip:" + + getLocation().getHostnamePort()); } return id; } catch (ServiceException se) { @@ -318,7 +319,7 @@ public class ScannerCallable extends ServerCallable { if (!instantiated) { return null; } - return location.getRegionInfo(); + return getLocation().getRegionInfo(); } /** @@ -336,4 +337,4 @@ public class ScannerCallable extends ServerCallable { public void setCaching(int caching) { this.caching = caching; } -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCaller.java new file mode 100644 index 00000000000..7cc058d72c5 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCaller.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + + +class ScannerCaller extends RpcRetryingCaller {} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java deleted file mode 100644 index e9bfff99d19..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java +++ /dev/null @@ -1,299 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.client; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import com.google.protobuf.ServiceException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException; -import org.apache.hadoop.hbase.exceptions.NotServingRegionException; -import org.apache.hadoop.hbase.exceptions.RegionMovedException; -import org.apache.hadoop.hbase.ipc.RpcClient; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.ipc.RemoteException; - -import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.ConnectException; -import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; - -/** - * Abstract class that implements {@link Callable}. Implementation stipulates - * return type and method we actually invoke on remote Server. Usually - * used inside a try/catch that fields usual connection failures all wrapped - * up in a retry loop. - *

Call {@link #prepare(boolean)} to connect to server hosting region - * that contains the passed row in the passed table before invoking - * {@link #call()}. - * @see HConnection#getRegionServerWithoutRetries(ServerCallable) - * @param the class that the ServerCallable handles - */ -@InterfaceAudience.Public -@InterfaceStability.Stable -public abstract class ServerCallable implements Callable { - static final Log LOG = LogFactory.getLog(ServerCallable.class); - - protected final HConnection connection; - protected final byte [] tableName; - protected final byte [] row; - protected HRegionLocation location; - protected ClientService.BlockingInterface stub; - protected int callTimeout; - protected long globalStartTime; - protected long startTime, endTime; - protected final static int MIN_RPC_TIMEOUT = 2000; - protected final static int MIN_WAIT_DEAD_SERVER = 10000; - - /** - * @param connection Connection to use. - * @param tableName Table name to which row belongs. - * @param row The row we want in tableName. - */ - public ServerCallable(HConnection connection, byte [] tableName, byte [] row) { - this(connection, tableName, row, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - } - - public ServerCallable(HConnection connection, byte [] tableName, byte [] row, int callTimeout) { - this.connection = connection; - this.tableName = tableName; - this.row = row; - this.callTimeout = callTimeout; - } - - /** - * Prepare for connection to the server hosting region with row from tablename. Does lookup - * to find region location and hosting server. - * @param reload Set this to true if connection should re-find the region - * @throws IOException e - */ - public void prepare(final boolean reload) throws IOException { - this.location = connection.getRegionLocation(tableName, row, reload); - if (this.location == null) { - throw new IOException("Failed to find location, tableName=" + Bytes.toString(tableName) + ", row=" + - Bytes.toString(row) + ", reload=" + reload); - } - this.stub = connection.getClient(location.getServerName()); - } - - /** @return the server name - * @deprecated Just use {@link #toString()} instead. - */ - public String getServerName() { - if (location == null) return null; - return location.getHostnamePort(); - } - - /** @return the region name - * @deprecated Just use {@link #toString()} instead. - */ - public byte[] getRegionName() { - if (location == null) return null; - return location.getRegionInfo().getRegionName(); - } - - /** @return the row - * @deprecated Just use {@link #toString()} instead. - */ - public byte [] getRow() { - return row; - } - - public void beforeCall() { - this.startTime = EnvironmentEdgeManager.currentTimeMillis(); - int remaining = (int)(callTimeout - (this.startTime - this.globalStartTime)); - if (remaining < MIN_RPC_TIMEOUT) { - // If there is no time left, we're trying anyway. It's too late. - // 0 means no timeout, and it's not the intent here. So we secure both cases by - // resetting to the minimum. - remaining = MIN_RPC_TIMEOUT; - } - RpcClient.setRpcTimeout(remaining); - } - - public void afterCall() { - RpcClient.resetRpcTimeout(); - this.endTime = EnvironmentEdgeManager.currentTimeMillis(); - } - - /** - * @return {@link HConnection} instance used by this Callable. - */ - HConnection getConnection() { - return this.connection; - } - - /** - * Run this instance with retries, timed waits, - * and refinds of missing regions. - * - * @return an object of type T - * @throws IOException if a remote or network exception occurs - * @throws RuntimeException other unspecified error - */ - public T withRetries() - throws IOException, RuntimeException { - Configuration c = getConnection().getConfiguration(); - final long pause = c.getLong(HConstants.HBASE_CLIENT_PAUSE, - HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - final int numRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - List exceptions = - new ArrayList(); - this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); - for (int tries = 0;; tries++) { - long expectedSleep = 0; - try { - beforeCall(); - prepare(tries != 0); // if called with false, check table status on ZK - return call(); - } catch (Throwable t) { - LOG.warn("Call exception, tries=" + tries + ", numRetries=" + numRetries + ", retryTime=" + - (this.globalStartTime - System.currentTimeMillis()) + "ms", t); - - t = translateException(t); - // translateException throws an exception when we should not retry, i.e. when it's the - // request that is bad. - - if (t instanceof SocketTimeoutException || - t instanceof ConnectException || - t instanceof RetriesExhaustedException || - (location != null && getConnection().isDeadServer(location.getServerName()))) { - // if thrown these exceptions, we clear all the cache entries that - // map to that slow/dead server; otherwise, let cache miss and ask - // .META. again to find the new location - getConnection().clearCaches(location.getServerName()); - } else if (t instanceof RegionMovedException) { - getConnection().updateCachedLocations(tableName, row, t, location); - } else if (t instanceof NotServingRegionException && numRetries == 1) { - // Purge cache entries for this specific region from META cache - // since we don't call connect(true) when number of retries is 1. - getConnection().deleteCachedRegionLocation(location); - } - - RetriesExhaustedException.ThrowableWithExtraContext qt = - new RetriesExhaustedException.ThrowableWithExtraContext(t, - EnvironmentEdgeManager.currentTimeMillis(), toString()); - exceptions.add(qt); - if (tries >= numRetries - 1) { - throw new RetriesExhaustedException(tries, exceptions); - } - - // If the server is dead, we need to wait a little before retrying, to give - // a chance to the regions to be - // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time - expectedSleep = ConnectionUtils.getPauseTime(pause, tries + 1); - if (expectedSleep < MIN_WAIT_DEAD_SERVER - && (location == null || getConnection().isDeadServer(location.getServerName()))) { - expectedSleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f); - } - - // If, after the planned sleep, there won't be enough time left, we stop now. - long duration = singleCallDuration(expectedSleep); - if (duration > this.callTimeout) { - throw (SocketTimeoutException) new SocketTimeoutException( - "Call to access row '" + Bytes.toString(row) + "' on table '" - + Bytes.toString(tableName) - + "' failed on timeout. " + " callTimeout=" + this.callTimeout + - ", callDuration=" + duration).initCause(t); - } - } finally { - afterCall(); - } - try { - Thread.sleep(expectedSleep); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted after " + tries + " tries on " + numRetries, e); - } - } - } - - /** - * @param expectedSleep - * @return Calculate how long a single call took - */ - private long singleCallDuration(final long expectedSleep) { - return (this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep; - } - - /** - * Run this instance against the server once. - * @return an object of type T - * @throws IOException if a remote or network exception occurs - * @throws RuntimeException other unspecified error - */ - public T withoutRetries() - throws IOException, RuntimeException { - // The code of this method should be shared with withRetries. - this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); - try { - beforeCall(); - prepare(false); - return call(); - } catch (Throwable t) { - Throwable t2 = translateException(t); - // It would be nice to clear the location cache here. - if (t2 instanceof IOException) { - throw (IOException)t2; - } else { - throw new RuntimeException(t2); - } - } finally { - afterCall(); - } - } - - /** - * Get the good or the remote exception if any, throws the DoNotRetryIOException. - * @param t the throwable to analyze - * @return the translated exception, if it's not a DoNotRetryIOException - * @throws DoNotRetryIOException - if we find it, we throw it instead of translating. - */ - protected static Throwable translateException(Throwable t) throws DoNotRetryIOException { - if (t instanceof UndeclaredThrowableException) { - if(t.getCause() != null) { - t = t.getCause(); - } - } - if (t instanceof RemoteException) { - t = ((RemoteException)t).unwrapRemoteException(); - } - if (t instanceof ServiceException) { - ServiceException se = (ServiceException)t; - Throwable cause = se.getCause(); - if (cause != null && cause instanceof DoNotRetryIOException) { - throw (DoNotRetryIOException)cause; - } - } else if (t instanceof DoNotRetryIOException) { - throw (DoNotRetryIOException)t; - } - return t; - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/DoNotRetryIOException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/DoNotRetryIOException.java index 6eb29c1b3b2..4a3044c4dfe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/DoNotRetryIOException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/DoNotRetryIOException.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.HBaseIOException; @InterfaceAudience.Public @InterfaceStability.Stable public class DoNotRetryIOException extends HBaseIOException { - + // TODO: This would be more useful as a marker interface than as a class. private static final long serialVersionUID = 1197446454511704139L; /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/HBaseSnapshotException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/HBaseSnapshotException.java index 60cbf7ceefd..a3f340bd605 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/HBaseSnapshotException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/HBaseSnapshotException.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.exceptions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; /** @@ -28,7 +27,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescriptio @SuppressWarnings("serial") @InterfaceAudience.Public @InterfaceStability.Evolving -public class HBaseSnapshotException extends HBaseIOException { +public class HBaseSnapshotException extends DoNotRetryIOException { private SnapshotDescription description; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/InvalidFamilyOperationException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/InvalidFamilyOperationException.java index 7fa1ada4f65..05ecfb47707 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/InvalidFamilyOperationException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/InvalidFamilyOperationException.java @@ -29,7 +29,7 @@ import java.io.IOException; */ @InterfaceAudience.Public @InterfaceStability.Evolving -public class InvalidFamilyOperationException extends IOException { +public class InvalidFamilyOperationException extends DoNotRetryIOException { private static final long serialVersionUID = 1L << 22 - 1L; /** default constructor */ public InvalidFamilyOperationException() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/LockTimeoutException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/LockTimeoutException.java index 9013312a68a..76684f93801 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/LockTimeoutException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/LockTimeoutException.java @@ -19,9 +19,7 @@ */ package org.apache.hadoop.hbase.exceptions; -import java.io.IOException; - -public class LockTimeoutException extends IOException { +public class LockTimeoutException extends DoNotRetryIOException { private static final long serialVersionUID = -1770764924258999825L; @@ -33,5 +31,4 @@ public class LockTimeoutException extends IOException { public LockTimeoutException(String s) { super(s); } - -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/RegionException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/RegionException.java index 4d5912c0641..c1ca1d03281 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/RegionException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/RegionException.java @@ -43,5 +43,4 @@ public class RegionException extends HBaseIOException { public RegionException(String s) { super(s); } - -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableExistsException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableExistsException.java index 26b952dd6ac..12512c7aff4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableExistsException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableExistsException.java @@ -17,14 +17,12 @@ package org.apache.hadoop.hbase.exceptions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import java.io.IOException; - /** * Thrown when a table exists but should not */ @InterfaceAudience.Public @InterfaceStability.Stable -public class TableExistsException extends IOException { +public class TableExistsException extends DoNotRetryIOException { private static final long serialVersionUID = 1L << 7 - 1L; /** default constructor */ public TableExistsException() { @@ -39,4 +37,4 @@ public class TableExistsException extends IOException { public TableExistsException(String s) { super(s); } -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableNotDisabledException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableNotDisabledException.java index 180c69db351..addc2479d80 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableNotDisabledException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableNotDisabledException.java @@ -22,14 +22,12 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.util.Bytes; -import java.io.IOException; - /** * Thrown if a table should be offline but is not */ @InterfaceAudience.Public @InterfaceStability.Stable -public class TableNotDisabledException extends IOException { +public class TableNotDisabledException extends DoNotRetryIOException { private static final long serialVersionUID = 1L << 19 - 1L; /** default constructor */ public TableNotDisabledException() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableNotEnabledException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableNotEnabledException.java index 3c90fde8c74..f60999f740e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableNotEnabledException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableNotEnabledException.java @@ -22,14 +22,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.util.Bytes; -import java.io.IOException; /** * Thrown if a table should be enabled but is not */ @InterfaceAudience.Public @InterfaceStability.Stable -public class TableNotEnabledException extends IOException { +public class TableNotEnabledException extends DoNotRetryIOException { private static final long serialVersionUID = 262144L; /** default constructor */ public TableNotEnabledException() { @@ -50,4 +49,4 @@ public class TableNotEnabledException extends IOException { public TableNotEnabledException(byte[] tableName) { this(Bytes.toString(tableName)); } -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableNotFoundException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableNotFoundException.java index 3e6e22c9058..a0683899f60 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableNotFoundException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/TableNotFoundException.java @@ -24,7 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability; /** Thrown when a table can not be located */ @InterfaceAudience.Public @InterfaceStability.Stable -public class TableNotFoundException extends RegionException { +public class TableNotFoundException extends DoNotRetryIOException { private static final long serialVersionUID = 993179627856392526L; /** default constructor */ @@ -36,4 +36,4 @@ public class TableNotFoundException extends RegionException { public TableNotFoundException(String s) { super(s); } -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 82974508050..f683c331502 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -24,7 +24,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.client.RegionServerCallable; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; @@ -75,14 +76,15 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ .setServiceName(method.getService().getFullName()) .setMethodName(method.getName()) .setRequest(request.toByteString()).build(); - ServerCallable callable = - new ServerCallable(connection, table, row) { + RegionServerCallable callable = + new RegionServerCallable(connection, table, row) { public CoprocessorServiceResponse call() throws Exception { - byte[] regionName = location.getRegionInfo().getRegionName(); - return ProtobufUtil.execService(stub, call, regionName); + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + return ProtobufUtil.execService(getStub(), call, regionName); } }; - CoprocessorServiceResponse result = callable.withRetries(); + CoprocessorServiceResponse result = new RpcRetryingCaller(). + callWithRetries(callable, this.connection.getConfiguration()); Message response = null; if (result.getValue().hasValue()) { response = responsePrototype.newBuilderForType() diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index bd277af067c..e9daba45684 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -@SuppressWarnings("ThrowableResultOfMethodCallIgnored") @Category(MediumTests.class) public class TestAsyncProcess { private static final byte[] DUMMY_TABLE = "DUMMY_TABLE".getBytes(); @@ -65,42 +64,40 @@ public class TestAsyncProcess { private static Exception failure = new Exception("failure"); static class MyAsyncProcess extends AsyncProcess { - - public MyAsyncProcess(HConnection hc, - AsyncProcessCallback callback, Configuration conf) { + public MyAsyncProcess(HConnection hc, AsyncProcessCallback callback, Configuration conf) { super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, - new SynchronousQueue(), - Threads.newDaemonThreadFactory("test-TestAsyncProcess")) - , callback, conf); + new SynchronousQueue(), Threads.newDaemonThreadFactory("test-TestAsyncProcess")), + callback, conf); } - /** - * Do not call a server, fails if the rowkey of the operation is{@link #FAILS} - */ @Override - protected ServerCallable createCallable( - final HRegionLocation loc, final MultiAction multi) { - - final MultiResponse mr = new MultiResponse(); - for (Map.Entry>> entry : multi.actions.entrySet()) { - for (Action a : entry.getValue()) { - if (Arrays.equals(FAILS, a.getAction().getRow())) { - mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), failure); - } else { - mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), success); - } - } - } - - return new MultiServerCallable(hConnection, tableName, null, null) { + protected RpcRetryingCaller createCaller(MultiServerCallable callable) { + final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti()); + return new RpcRetryingCaller() { @Override - public MultiResponse withoutRetries() { + public MultiResponse callWithoutRetries( RetryingCallable callable) + throws IOException, RuntimeException { return mr; } }; } } + static MultiResponse createMultiResponse(final HRegionLocation loc, + final MultiAction multi) { + final MultiResponse mr = new MultiResponse(); + for (Map.Entry>> entry : multi.actions.entrySet()) { + for (Action a : entry.getValue()) { + if (Arrays.equals(FAILS, a.getAction().getRow())) { + mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), failure); + } else { + mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), success); + } + } + } + return mr; + } + /** * Returns our async process. */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index d27dce2e244..7ace6b76254 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.util; +import java.io.InterruptedIOException; import java.io.PrintWriter; import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.LinkedBlockingQueue; @@ -129,6 +130,7 @@ public class Threads { } /** + * If interrupted, just prints out the interrupt on STDOUT, resets interrupt and returns * @param millis How long to sleep for in milliseconds. */ public static void sleep(long millis) { @@ -136,6 +138,7 @@ public class Threads { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); + Thread.currentThread().interrupt(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 493abd1aef9..486ebefb37e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -60,7 +60,8 @@ import org.apache.hadoop.hbase.exceptions.TableNotFoundException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.client.RegionServerCallable; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient; import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; @@ -536,23 +537,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool { famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); } - final ServerCallable svrCallable = new ServerCallable(conn, - tableName, first) { + final RegionServerCallable svrCallable = + new RegionServerCallable(conn, tableName, first) { @Override public Boolean call() throws Exception { SecureBulkLoadClient secureClient = null; boolean success = false; try { - LOG.debug("Going to connect to server " + location + " for row " - + Bytes.toStringBinary(row)); - byte[] regionName = location.getRegionInfo().getRegionName(); + LOG.debug("Going to connect to server " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); + byte[] regionName = getLocation().getRegionInfo().getRegionName(); if(!useSecure) { - success = ProtobufUtil.bulkLoadHFile(stub, famPaths, regionName, assignSeqIds); + success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds); } else { - HTable table = new HTable(conn.getConfiguration(), tableName); + HTable table = new HTable(conn.getConfiguration(), getTableName()); secureClient = new SecureBulkLoadClient(table); - success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken, location.getRegionInfo().getStartKey()); + success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken, + getLocation().getRegionInfo().getStartKey()); } return success; } finally { @@ -586,7 +588,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try { List toRetry = new ArrayList(); - boolean success = svrCallable.withRetries(); + boolean success = new RpcRetryingCaller().callWithRetries(svrCallable, getConf()); if (!success) { LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) + " into table " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index 37f8dcc3da8..1614b125839 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -34,15 +34,11 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.ServerCallable; -import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException; -import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException; -import org.apache.hadoop.hbase.ipc.RpcClient; -import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; @@ -166,7 +162,7 @@ public class WALEditsReplaySink { try { ReplayServerCallable callable = new ReplayServerCallable( this.conn, this.tableName, regionLoc, regionInfo, actions); - callable.withRetries(); + new RpcRetryingCaller().callWithRetries(callable, conf, this.replayTimeout); } catch (IOException ie) { if (skipErrors) { LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS @@ -181,19 +177,17 @@ public class WALEditsReplaySink { * Callable that handles the replay method call going against a single regionserver * @param */ - class ReplayServerCallable extends ServerCallable { + class ReplayServerCallable extends RegionServerCallable { private HRegionInfo regionInfo; private List> actions; - private Map>>> retryActions = null; - ReplayServerCallable(final HConnection connection, final byte [] tableName, final HRegionLocation regionLoc, final HRegionInfo regionInfo, final List> actions) { - super(connection, tableName, null, replayTimeout); + super(connection, tableName, null); this.actions = actions; this.regionInfo = regionInfo; - this.location = regionLoc; + setLocation(regionLoc); } @Override @@ -208,7 +202,7 @@ public class WALEditsReplaySink { private void replayToServer(HRegionInfo regionInfo, List> actions) throws IOException, ServiceException { - AdminService.BlockingInterface remoteSvr = connection.getAdmin(location.getServerName()); + AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName()); MultiRequest request = RequestConverter.buildMultiRequest(regionInfo.getRegionName(), actions); MultiResponse protoResults = remoteSvr.replay(null, request); @@ -235,16 +229,14 @@ public class WALEditsReplaySink { @Override public void prepare(boolean reload) throws IOException { if (!reload) return; - // relocate regions in case we have a new dead server or network hiccup // if not due to connection issue, the following code should run fast because it uses // cached location for (Action action : actions) { // use first row to relocate region because all actions are for one region - this.location = this.connection.locateRegion(tableName, action.getAction().getRow()); + setLocation(conn.locateRegion(tableName, action.getAction().getRow())); break; } } } - -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 7982423668b..0f88cab780f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -43,7 +43,7 @@ public class HConnectionTestingUtility { * configuration instance. Minimally the mock will return * conf when {@link HConnection#getConfiguration()} is invoked. * Be sure to shutdown the connection when done by calling - * {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)} else it + * {@link HConnectionManager#deleteConnection(Configuration)} else it * will stick around; this is probably not what you want. * @param conf configuration * @return HConnection object for conf @@ -69,7 +69,7 @@ public class HConnectionTestingUtility { * more of the popular {@link HConnection} methods so they do 'normal' * operation (see return doc below for list). Be sure to shutdown the * connection when done by calling - * {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)} else it + * {@link HConnectionManager#deleteConnection(Configuration)} else it * will stick around; this is probably not what you want. * * @param conf Configuration to use @@ -88,7 +88,7 @@ public class HConnectionTestingUtility { * {@link HConnection#getAdmin(ServerName)} is called, returns the passed * {@link ClientProtos.ClientService.BlockingInterface} instance when * {@link HConnection#getClient(ServerName)} is called (Be sure to call - * {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)} + * {@link HConnectionManager#deleteConnection(Configuration)} * when done with this mocked Connection. * @throws IOException */ @@ -123,7 +123,7 @@ public class HConnectionTestingUtility { * Get a Mockito spied-upon {@link HConnection} that goes with the passed * conf configuration instance. * Be sure to shutdown the connection when done by calling - * {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)} else it + * {@link HConnectionManager#deleteConnection(Configuration)} else it * will stick around; this is probably not what you want. * @param conf configuration * @return HConnection object for conf diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java new file mode 100644 index 00000000000..0470ce4b02d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.exceptions.MasterNotRunningException; +import org.apache.hadoop.hbase.exceptions.PleaseHoldException; +import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mortbay.log.Log; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +@Category(SmallTests.class) +public class TestHBaseAdminNoCluster { + /** + * Verify that PleaseHoldException gets retried. + * HBASE-8764 + * @throws IOException + * @throws ZooKeeperConnectionException + * @throws MasterNotRunningException + * @throws ServiceException + */ + @Test + public void testMasterMonitorCollableRetries() + throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ServiceException { + Configuration configuration = HBaseConfiguration.create(); + // Set the pause and retry count way down. + configuration.setLong(HConstants.HBASE_CLIENT_PAUSE, 1); + final int count = 10; + configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, count); + // Get mocked connection. Getting the connection will register it so when HBaseAdmin is + // constructed with same configuration, it will find this mocked connection. + HConnection connection = HConnectionTestingUtility.getMockedConnection(configuration); + // Mock so we get back the master interface. Make it so when createTable is called, we throw + // the PleaseHoldException. + MasterAdminKeepAliveConnection masterAdmin = + Mockito.mock(MasterAdminKeepAliveConnection.class); + Mockito.when(masterAdmin.createTable((RpcController)Mockito.any(), + (MasterAdminProtos.CreateTableRequest)Mockito.any())). + thenThrow(new ServiceException("Test fail").initCause(new PleaseHoldException("test"))); + Mockito.when(connection.getKeepAliveMasterAdminService()).thenReturn(masterAdmin); + // Mock up our admin Interfaces + HBaseAdmin admin = new HBaseAdmin(configuration); + try { + HTableDescriptor htd = new HTableDescriptor("testMasterMonitorCollableRetries"); + // Pass any old htable descriptor; not important + try { + admin.createTable(htd, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + fail(); + } catch (RetriesExhaustedException e) { + Log.info("Expected fail", e); + } + // Assert we were called 'count' times. + Mockito.verify(masterAdmin, Mockito.atLeast(count)).createTable((RpcController)Mockito.any(), + (MasterAdminProtos.CreateTableRequest)Mockito.any()); + } finally { + admin.close(); + if (connection != null)HConnectionManager.deleteConnection(configuration); + } + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java index f61dd8ed212..412f8184678 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java @@ -32,10 +32,11 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ServerCallable; import org.apache.hadoop.hbase.exceptions.TableExistsException; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -141,41 +142,43 @@ public class TestHRegionServerBulkLoad { } // bulk load HFiles - HConnection conn = UTIL.getHBaseAdmin().getConnection(); + final HConnection conn = UTIL.getHBaseAdmin().getConnection(); byte[] tbl = Bytes.toBytes(tableName); - new ServerCallable(conn, tbl, Bytes - .toBytes("aaa")) { + RegionServerCallable callable = + new RegionServerCallable(conn, tbl, Bytes.toBytes("aaa")) { @Override public Void call() throws Exception { - LOG.debug("Going to connect to server " + location + " for row " - + Bytes.toStringBinary(row)); - byte[] regionName = location.getRegionInfo().getRegionName(); + LOG.debug("Going to connect to server " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); + byte[] regionName = getLocation().getRegionInfo().getRegionName(); BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true); - stub.bulkLoadHFile(null, request); + getStub().bulkLoadHFile(null, request); return null; } - }.withRetries(); + }; + RpcRetryingCaller caller = new RpcRetryingCaller(); + caller.callWithRetries(callable, UTIL.getConfiguration()); // Periodically do compaction to reduce the number of open file handles. if (numBulkLoads.get() % 10 == 0) { // 10 * 50 = 500 open file handles! - new ServerCallable(conn, tbl, - Bytes.toBytes("aaa")) { + callable = new RegionServerCallable(conn, tbl, Bytes.toBytes("aaa")) { @Override public Void call() throws Exception { - LOG.debug("compacting " + location + " for row " - + Bytes.toStringBinary(row)); + LOG.debug("compacting " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); AdminProtos.AdminService.BlockingInterface server = - connection.getAdmin(location.getServerName()); + conn.getAdmin(getLocation().getServerName()); CompactRegionRequest request = RequestConverter.buildCompactRegionRequest( - location.getRegionInfo().getRegionName(), true, null); + getLocation().getRegionInfo().getRegionName(), true, null); server.compactRegion(null, request); numCompactions.incrementAndGet(); return null; } - }.withRetries(); + }; + caller.callWithRetries(callable, UTIL.getConfiguration()); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 9dc0edf2c33..6ecb95d0146 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -22,6 +22,7 @@ import static org.junit.Assert.*; import java.io.IOException; import java.lang.reflect.Method; +import java.net.BindException; import java.util.TreeMap; import java.util.List; import java.util.Map; @@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; @@ -377,7 +379,7 @@ public class TestHLog { * [FSNamesystem.nextGenerationStampForBlock]) * 3. HDFS-142 (on restart, maintain pendingCreates) */ - @Test + @Test (timeout=300000) public void testAppendClose() throws Exception { byte [] tableName = Bytes.toBytes(getName()); HRegionInfo regioninfo = new HRegionInfo(tableName, @@ -422,16 +424,16 @@ public class TestHLog { Thread.sleep(1000); } assertFalse(cluster.isClusterUp()); - - // Workaround a strange issue with Hadoop's RPC system - if we don't - // sleep here, the new datanodes will pick up a cached IPC connection to - // the old (dead) NN and fail to start. Sleeping 2 seconds goes past - // the idle time threshold configured in the conf above - Thread.sleep(2000); - - LOG.info("Waiting a few seconds before re-starting HDFS"); - Thread.sleep(5000); - cluster = TEST_UTIL.startMiniDFSClusterForTestHLog(namenodePort); + cluster = null; + for (int i = 0; i < 100; i++) { + try { + cluster = TEST_UTIL.startMiniDFSClusterForTestHLog(namenodePort); + break; + } catch (BindException e) { + LOG.info("Sleeping. BindException bringing up new cluster"); + Threads.sleep(1000); + } + } cluster.waitActive(); fs = cluster.getFileSystem(); LOG.info("STARTED second instance.");