HBASE-10606 Bad timeout in RpcRetryingCaller#callWithRetries w/o parameters

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1572124 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2014-02-26 16:08:04 +00:00
parent 10c40aa50b
commit 43b5767f62
13 changed files with 61 additions and 65 deletions

View File

@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -49,7 +48,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.cloudera.htrace.Trace;
import com.google.common.annotations.VisibleForTesting;
@ -118,8 +116,6 @@ class AsyncProcess {
public void waitUntilDone() throws InterruptedIOException {}
};
// TODO: many of the fields should be made private
protected final long id;
protected final ClusterConnection hConnection;
@ -156,6 +152,7 @@ class AsyncProcess {
protected final long pause;
protected int numTries;
protected int serverTrackerTimeout;
protected int operationTimeout;
// End configuration settings.
protected static class BatchErrors {
@ -206,6 +203,8 @@ class AsyncProcess {
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
@ -303,14 +302,12 @@ class AsyncProcess {
Iterator<? extends Row> it = rows.iterator();
while (it.hasNext()) {
Row r = it.next();
HRegionLocation loc = null;
HRegionLocation loc;
try {
loc = findDestLocation(tableName, r);
} catch (IOException ex) {
if (locationErrors == null) {
locationErrors = new ArrayList<Exception>();
locationErrorRows = new ArrayList<Integer>();
}
locationErrors = new ArrayList<Exception>();
locationErrorRows = new ArrayList<Integer>();
LOG.error("Failed to get region location ", ex);
// This action failed before creating ars. Add it to retained but do not add to submit list.
// We will then add it to ars in an already-failed state.
@ -600,7 +597,7 @@ class AsyncProcess {
try {
MultiServerCallable<Row> callable = createCallable(server, tableName, multiAction);
try {
res = createCaller(callable).callWithoutRetries(callable);
res = createCaller(callable).callWithoutRetries(callable, operationTimeout);
} catch (IOException e) {
// The service itself failed . It may be an error coming from the communication
// layer, but, as well, a functional error raised by the server.
@ -1010,7 +1007,7 @@ class AsyncProcess {
* failed operations themselves.
* @param failedRows an optional list into which the rows that failed since the last time
* {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved.
* @returns all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)}
* @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)}
* was called, or AP was created.
*/
public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ExceptionUtil;
/**
* Implements the scanner interface for the HBase client.
@ -63,7 +64,7 @@ public class ClientScanner extends AbstractClientScanner {
protected final long maxScannerResultSize;
private final HConnection connection;
private final TableName tableName;
private final int scannerTimeout;
protected final int scannerTimeout;
protected boolean scanMetricsPublished = false;
protected RpcRetryingCaller<Result []> caller;
@ -224,7 +225,7 @@ public class ClientScanner extends AbstractClientScanner {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
this.caller.callWithRetries(callable);
this.caller.callWithRetries(callable, scannerTimeout);
this.callable = null;
}
@ -261,7 +262,7 @@ public class ClientScanner extends AbstractClientScanner {
callable = getScannerCallable(localStartKey, nbRows);
// Open a scanner on the region server starting at the
// beginning of the region
this.caller.callWithRetries(callable);
this.caller.callWithRetries(callable, scannerTimeout);
this.currentRegion = callable.getHRegionInfo();
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.incrementAndGet();
@ -326,17 +327,17 @@ public class ClientScanner extends AbstractClientScanner {
// Skip only the first row (which was the last row of the last
// already-processed batch).
callable.setCaching(1);
values = this.caller.callWithRetries(callable);
values = this.caller.callWithRetries(callable, scannerTimeout);
callable.setCaching(this.caching);
skipFirst = false;
}
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = this.caller.callWithRetries(callable);
values = this.caller.callWithRetries(callable, scannerTimeout);
if (skipFirst && values != null && values.length == 1) {
skipFirst = false; // Already skipped, unset it before scanning again
values = this.caller.callWithRetries(callable);
values = this.caller.callWithRetries(callable, scannerTimeout);
}
retryAfterOutOfOrderException = true;
} catch (DoNotRetryIOException e) {
@ -428,7 +429,7 @@ public class ClientScanner extends AbstractClientScanner {
if (callable != null) {
callable.setClose();
try {
this.caller.callWithRetries(callable);
this.caller.callWithRetries(callable, scannerTimeout);
} catch (UnknownScannerException e) {
// We used to catch this error, interpret, and rethrow. However, we
// have since decided that it's not nice for a scanner's close to

View File

@ -200,7 +200,7 @@ public class ClientSmallScanner extends ClientScanner {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = this.caller.callWithRetries(smallScanCallable);
values = this.caller.callWithRetries(smallScanCallable, scannerTimeout);
this.currentRegion = smallScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {

View File

@ -175,6 +175,7 @@ public class HBaseAdmin implements Abortable, Closeable {
private boolean aborted;
private boolean cleanupConnectionOnClose = false; // close the connection in close()
private boolean closed = false;
private int operationTimeout;
private RpcRetryingCallerFactory rpcCallerFactory;
@ -192,6 +193,11 @@ public class HBaseAdmin implements Abortable, Closeable {
this.cleanupConnectionOnClose = true;
}
public int getOperationTimeout() {
return operationTimeout;
}
/**
* Constructor for externally managed HConnections.
* The connection to master will be created when required by admin functions.
@ -217,6 +223,9 @@ public class HBaseAdmin implements Abortable, Closeable {
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.retryLongerMultiplier = this.conf.getInt(
"hbase.client.retries.longer.multiplier", 10);
this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
}
@ -3315,7 +3324,7 @@ public class HBaseAdmin implements Abortable, Closeable {
private <V> V executeCallable(MasterCallable<V> callable) throws IOException {
RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller();
try {
return caller.callWithRetries(callable);
return caller.callWithRetries(callable, operationTimeout);
} finally {
callable.close();
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ExceptionUtil;
/**
* Scanner class that contains the <code>hbase:meta</code> table scanning logic.
@ -189,6 +190,7 @@ public class MetaScanner {
try {
scanner.close();
} catch (Throwable t) {
ExceptionUtil.rethrowIfInterrupt(t);
LOG.debug("Got exception in closing the result scanner", t);
}
}
@ -196,6 +198,7 @@ public class MetaScanner {
try {
visitor.close();
} catch (Throwable t) {
ExceptionUtil.rethrowIfInterrupt(t);
LOG.debug("Got exception in closing the meta scanner visitor", t);
}
}
@ -203,6 +206,7 @@ public class MetaScanner {
try {
metaTable.close();
} catch (Throwable t) {
ExceptionUtil.rethrowIfInterrupt(t);
LOG.debug("Got exception in closing the meta table", t);
}
}

View File

@ -60,7 +60,7 @@ public class ReversedClientScanner extends ClientScanner {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
this.caller.callWithRetries(callable);
this.caller.callWithRetries(callable, scannerTimeout);
this.callable = null;
}
@ -108,7 +108,7 @@ public class ReversedClientScanner extends ClientScanner {
callable = getScannerCallable(localStartKey, nbRows, locateStartRow);
// Open a scanner on the region server starting at the
// beginning of the region
this.caller.callWithRetries(callable);
this.caller.callWithRetries(callable, scannerTimeout);
this.currentRegion = callable.getHRegionInfo();
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.incrementAndGet();

View File

@ -42,20 +42,14 @@ import com.google.protobuf.ServiceException;
* Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client
* threadlocal outstanding timeouts as so we don't persist too much.
* Dynamic rather than static so can set the generic appropriately.
*
* This object has a state. It should not be used by in parallel by different threads.
* Reusing it is possible however, even between multiple threads. However, the user will
* have to manage the synchronization on its side: there is no synchronization inside the class.
*/
@InterfaceAudience.Private
@edu.umd.cs.findbugs.annotations.SuppressWarnings
(value = "IS2_INCONSISTENT_SYNC", justification = "na")
public class RpcRetryingCaller<T> {
static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
/**
* Timeout for the call including retries
*/
private int callTimeout;
/**
* The remaining time, for the call to come. Takes into account the tries already done.
*/
private int remainingTime;
/**
* When we started making calls.
*/
@ -70,18 +64,17 @@ public class RpcRetryingCaller<T> {
public RpcRetryingCaller(Configuration conf) {
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.retries =
conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.callTimeout = conf.getInt(
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
}
private void beforeCall() {
if (callTimeout > 0) {
remainingTime = (int) (callTimeout -
private int getRemainingTime(int callTimeout) {
if (callTimeout <= 0) {
return 0;
} else {
int remainingTime = (int) (callTimeout -
(EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime));
if (remainingTime < MIN_RPC_TIMEOUT) {
// If there is no time left, we're trying anyway. It's too late.
@ -89,17 +82,10 @@ public class RpcRetryingCaller<T> {
// resetting to the minimum.
remainingTime = MIN_RPC_TIMEOUT;
}
} else {
remainingTime = 0;
return remainingTime;
}
}
public synchronized T callWithRetries(RetryingCallable<T> callable) throws IOException,
RuntimeException {
return callWithRetries(callable, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
}
/**
* Retries if invocation fails.
* @param callTimeout Timeout for this call
@ -108,11 +94,8 @@ public class RpcRetryingCaller<T> {
* @throws IOException if a remote or network exception occurs
* @throws RuntimeException other unspecified error
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings
(value = "SWL_SLEEP_WITH_LOCK_HELD", justification = "na")
public synchronized T callWithRetries(RetryingCallable<T> callable, int callTimeout)
public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
throws IOException, RuntimeException {
this.callTimeout = callTimeout;
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
@ -120,8 +103,7 @@ public class RpcRetryingCaller<T> {
long expectedSleep;
try {
callable.prepare(tries != 0); // if called with false, check table status on ZK
beforeCall();
return callable.call(remainingTime);
return callable.call(getRemainingTime(callTimeout));
} catch (Throwable t) {
ExceptionUtil.rethrowIfInterrupt(t);
if (LOG.isTraceEnabled()) {
@ -145,8 +127,8 @@ public class RpcRetryingCaller<T> {
// If, after the planned sleep, there won't be enough time left, we stop now.
long duration = singleCallDuration(expectedSleep);
if (duration > this.callTimeout) {
String msg = "callTimeout=" + this.callTimeout + ", callDuration=" + duration +
if (duration > callTimeout) {
String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +
": " + callable.getExceptionMessageAdditionalDetail();
throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
}
@ -163,8 +145,7 @@ public class RpcRetryingCaller<T> {
* @return Calculate how long a single call took
*/
private long singleCallDuration(final long expectedSleep) {
return (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime)
+ MIN_RPC_TIMEOUT + expectedSleep;
return (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + expectedSleep;
}
/**
@ -176,7 +157,7 @@ public class RpcRetryingCaller<T> {
* @throws IOException if a remote or network exception occurs
* @throws RuntimeException other unspecified error
*/
public T callWithoutRetries(RetryingCallable<T> callable)
public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
throws IOException, RuntimeException {
// The code of this method should be shared with withRetries.
this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();

View File

@ -24,6 +24,7 @@ import com.google.protobuf.HBaseZeroCopyByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
@ -52,6 +53,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
private final TableName table;
private final byte[] row;
private byte[] lastRegion;
private int operationTimeout;
private RpcRetryingCallerFactory rpcFactory;
@ -60,6 +62,9 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
this.table = table;
this.row = row;
this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration());
this.operationTimeout = conn.getConfiguration().getInt(
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
}
@Override
@ -88,7 +93,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
}
};
CoprocessorServiceResponse result = rpcFactory.<CoprocessorServiceResponse> newCaller()
.callWithRetries(callable);
.callWithRetries(callable, operationTimeout);
Message response = null;
if (result.getValue().hasValue()) {
response = responsePrototype.newBuilderForType()

View File

@ -149,7 +149,8 @@ public class TestAsyncProcess {
callable.getMulti(), nbMultiResponse, nbActions);
return new RpcRetryingCaller<MultiResponse>(conf) {
@Override
public MultiResponse callWithoutRetries( RetryingCallable<MultiResponse> callable)
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
int callTimeout)
throws IOException, RuntimeException {
try {
// sleep one second in order for threadpool to start another thread instead of reusing

View File

@ -630,7 +630,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
Configuration conf = getConf();
boolean success = RpcRetryingCallerFactory.instantiate(conf).<Boolean> newCaller()
.callWithRetries(svrCallable);
.callWithRetries(svrCallable, Integer.MAX_VALUE);
if (!success) {
LOG.warn("Attempt to bulk load region containing "
+ Bytes.toStringBinary(first) + " into table "

View File

@ -1552,7 +1552,6 @@ public class TestAdmin {
TEST_UTIL.getConfiguration().setInt(
"hbase.regionserver.logroll.errors.tolerated", 2);
TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
// For less frequently updated regions flush after every 2 flushes

View File

@ -170,7 +170,7 @@ public class TestHRegionServerBulkLoad {
};
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
caller.callWithRetries(callable);
caller.callWithRetries(callable, Integer.MAX_VALUE);
// Periodically do compaction to reduce the number of open file handles.
if (numBulkLoads.get() % 10 == 0) {
@ -190,7 +190,7 @@ public class TestHRegionServerBulkLoad {
return null;
}
};
caller.callWithRetries(callable);
caller.callWithRetries(callable, Integer.MAX_VALUE);
}
}
}

View File

@ -60,7 +60,6 @@ public class TestLogRollAbort {
// Tweak default timeout values down for faster recovery
TEST_UTIL.getConfiguration().setInt(
"hbase.regionserver.logroll.errors.tolerated", 2);
TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
// Increase the amount of time between client retries