HBASE-4003 Cleanup Calls Conservatively On Timeout (Karthick)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1153602 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-08-03 18:26:30 +00:00
parent 671c7beca4
commit e2f64664a6
2 changed files with 49 additions and 14 deletions

View File

@ -183,6 +183,7 @@ Release 0.91.0 - Unreleased
by using /hbase as the base node.(ramkrishna.s.vasudevan)
HBASE-4032 HBASE-451 improperly breaks public API HRegionInfo#getTableDesc
HBASE-4148 HFileOutputFormat doesn't fill in TIMERANGE_KEY metadata (Jonathan Hsieh)
HBASE-4003 Cleanup Calls Conservatively On Timeout (Karthick)
IMPROVEMENTS
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)

View File

@ -30,6 +30,7 @@ import java.io.InputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Hashtable;
@ -37,6 +38,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -167,9 +169,11 @@ public class HBaseClient {
Writable value; // value, null if error
IOException error; // exception, null if value
boolean done; // true when call is done
long startTime;
protected Call(Writable param) {
this.param = param;
this.startTime = System.currentTimeMillis();
synchronized (HBaseClient.this) {
this.id = counter++;
}
@ -201,6 +205,10 @@ public class HBaseClient {
this.value = value;
callComplete();
}
public long getStartTime() {
return this.startTime;
}
}
/** Thread that reads responses and notifies callers. Each connection owns a
@ -214,7 +222,7 @@ public class HBaseClient {
private DataOutputStream out;
// currently active calls
private final Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
private final ConcurrentSkipListMap<Integer, Call> calls = new ConcurrentSkipListMap<Integer, Call>();
private final AtomicLong lastActivity = new AtomicLong();// last I/O activity time
protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
private IOException closeException; // close reason
@ -568,20 +576,21 @@ public class HBaseClient {
}
calls.remove(id);
}
} catch (SocketTimeoutException ste) {
if (remoteId.rpcTimeout > 0) {
} catch (IOException e) {
if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
// Clean up open calls but don't treat this as a fatal condition,
// since we expect certain responses to not make it by the specified
// {@link ConnectionId#rpcTimeout}.
closeException = ste;
cleanupCalls();
closeException = e;
} else {
// Since the server did not respond within the default ping interval
// time, treat this as a fatal condition and close this connection
markClosed(ste);
markClosed(e);
}
} finally {
if (remoteId.rpcTimeout > 0) {
cleanupCalls(remoteId.rpcTimeout);
}
} catch (IOException e) {
markClosed(e);
}
}
@ -635,15 +644,40 @@ public class HBaseClient {
/* Cleanup all calls and mark them as done */
private void cleanupCalls() {
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
cleanupCalls(0);
}
private void cleanupCalls(long rpcTimeout) {
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
while (itor.hasNext()) {
Call c = itor.next().getValue();
c.setException(closeException); // local exception
// Notify the open calls, so they are aware of what just happened
synchronized (c) {
c.notifyAll();
long waitTime = System.currentTimeMillis() - c.getStartTime();
if (waitTime >= rpcTimeout) {
c.setException(closeException); // local exception
synchronized (c) {
c.notifyAll();
}
itor.remove();
} else {
break;
}
itor.remove();
}
try {
if (!calls.isEmpty()) {
Call firstCall = calls.get(calls.firstKey());
long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime();
if (maxWaitTime < rpcTimeout) {
rpcTimeout -= maxWaitTime;
}
}
if (!shouldCloseConnection.get()) {
closeException = null;
if (socket != null) {
socket.setSoTimeout((int) rpcTimeout);
}
}
} catch (SocketException e) {
LOG.debug("Couldn't lower timeout, which may result in longer than expected calls");
}
}
}