HBASE-7756 Strange code in ServerCallable#shouldRetry

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1442560 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2013-02-05 12:29:48 +00:00
parent 7e52247cfb
commit 39c81b04e7
1 changed files with 27 additions and 20 deletions

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC; import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -59,7 +60,9 @@ public abstract class ServerCallable<T> implements Callable<T> {
protected HRegionLocation location; protected HRegionLocation location;
protected ClientProtocol server; protected ClientProtocol server;
protected int callTimeout; protected int callTimeout;
protected long globalStartTime;
protected long startTime, endTime; protected long startTime, endTime;
protected final static int MIN_RPC_TIMEOUT = 2000;
/** /**
* @param connection Connection to use. * @param connection Connection to use.
@ -112,27 +115,20 @@ public abstract class ServerCallable<T> implements Callable<T> {
} }
public void beforeCall() { public void beforeCall() {
HBaseClientRPC.setRpcTimeout(this.callTimeout); this.startTime = EnvironmentEdgeManager.currentTimeMillis();
this.startTime = System.currentTimeMillis(); int remaining = (int)(callTimeout - (this.startTime - this.globalStartTime));
if (remaining < MIN_RPC_TIMEOUT) {
// If there is no time left, we're trying anyway. It's too late.
// 0 means no timeout, and it's not the intent here. So we secure both cases by
// resetting to the minimum.
remaining = MIN_RPC_TIMEOUT;
}
HBaseClientRPC.setRpcTimeout(remaining);
} }
public void afterCall() { public void afterCall() {
HBaseClientRPC.resetRpcTimeout(); HBaseClientRPC.resetRpcTimeout();
this.endTime = System.currentTimeMillis(); this.endTime = EnvironmentEdgeManager.currentTimeMillis();
}
public void shouldRetry(Throwable throwable) throws IOException {
if (this.callTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)
if (throwable instanceof SocketTimeoutException
|| (this.endTime - this.startTime > this.callTimeout)) {
throw (SocketTimeoutException) (SocketTimeoutException) new SocketTimeoutException(
"Call to access row '" + Bytes.toString(row) + "' on table '"
+ Bytes.toString(tableName)
+ "' failed on socket timeout exception: " + throwable)
.initCause(throwable);
} else {
this.callTimeout = ((int) (this.endTime - this.startTime));
}
} }
/** /**
@ -159,13 +155,13 @@ public abstract class ServerCallable<T> implements Callable<T> {
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>(); new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
for (int tries = 0; tries < numRetries; tries++) { for (int tries = 0; tries < numRetries; tries++) {
try { try {
beforeCall(); beforeCall();
connect(tries != 0); connect(tries != 0);
return call(); return call();
} catch (Throwable t) { } catch (Throwable t) {
shouldRetry(t);
t = translateException(t); t = translateException(t);
if (t instanceof SocketTimeoutException || if (t instanceof SocketTimeoutException ||
t instanceof ConnectException || t instanceof ConnectException ||
@ -180,11 +176,21 @@ public abstract class ServerCallable<T> implements Callable<T> {
} }
RetriesExhaustedException.ThrowableWithExtraContext qt = RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(t, new RetriesExhaustedException.ThrowableWithExtraContext(t,
System.currentTimeMillis(), toString()); EnvironmentEdgeManager.currentTimeMillis(), toString());
exceptions.add(qt); exceptions.add(qt);
if (tries == numRetries - 1) { if (tries == numRetries - 1) {
throw new RetriesExhaustedException(tries, exceptions); throw new RetriesExhaustedException(tries, exceptions);
} }
long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
// If, after the planned sleep, there won't be enough time left, we stop now.
if (((this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep) >
this.callTimeout) {
throw (SocketTimeoutException) new SocketTimeoutException(
"Call to access row '" + Bytes.toString(row) + "' on table '"
+ Bytes.toString(tableName)
+ "' failed on timeout. " + " callTimeout=" + this.callTimeout +
", time=" + (this.endTime - this.startTime)).initCause(t);
}
} finally { } finally {
afterCall(); afterCall();
} }
@ -192,7 +198,7 @@ public abstract class ServerCallable<T> implements Callable<T> {
Thread.sleep(ConnectionUtils.getPauseTime(pause, tries)); Thread.sleep(ConnectionUtils.getPauseTime(pause, tries));
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new IOException("Giving up after tries=" + tries, e); throw new IOException("Interrupted after tries=" + tries, e);
} }
} }
return null; return null;
@ -206,6 +212,7 @@ public abstract class ServerCallable<T> implements Callable<T> {
*/ */
public T withoutRetries() public T withoutRetries()
throws IOException, RuntimeException { throws IOException, RuntimeException {
this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
try { try {
beforeCall(); beforeCall();
connect(false); connect(false);