HBASE-10525 Allow the client to use a different thread for writing to ease interrupt

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1571210 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2014-02-24 10:05:39 +00:00
parent 92a625fc68
commit 7e1ac02210
5 changed files with 468 additions and 152 deletions

View File

@ -114,6 +114,14 @@
<Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/> <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
</Match> </Match>
<Match>
<Class name="org.apache.hadoop.hbase.ipc.RpcClient$Connection"/>
<Or>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
<Bug pattern="NN_NAKED_NOTIFY"/>
</Or>
</Match>
<Match> <Match>
<Class name="org.apache.hadoop.hbase.regionserver.HRegion"/> <Class name="org.apache.hadoop.hbase.regionserver.HRegion"/>
<Or> <Or>

View File

@ -753,12 +753,21 @@ class ConnectionManager {
* @param rpcClient Client we should use instead. * @param rpcClient Client we should use instead.
* @return Previous rpcClient * @return Previous rpcClient
*/ */
@VisibleForTesting
RpcClient setRpcClient(final RpcClient rpcClient) { RpcClient setRpcClient(final RpcClient rpcClient) {
RpcClient oldRpcClient = this.rpcClient; RpcClient oldRpcClient = this.rpcClient;
this.rpcClient = rpcClient; this.rpcClient = rpcClient;
return oldRpcClient; return oldRpcClient;
} }
/**
* For tests only.
*/
@VisibleForTesting
RpcClient getRpcClient() {
return rpcClient;
}
/** /**
* An identifier that will remain the same for a given connection. * An identifier that will remain the same for a given connection.
* @return * @return

View File

@ -70,11 +70,13 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.TokenSelector;
import org.cloudera.htrace.Span; import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace; import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceScope;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import javax.security.sasl.SaslException; import javax.security.sasl.SaslException;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
@ -96,6 +98,8 @@ import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -115,13 +119,13 @@ public class RpcClient {
protected final AtomicInteger callIdCnt = new AtomicInteger(); protected final AtomicInteger callIdCnt = new AtomicInteger();
protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
final protected Configuration conf; final protected Configuration conf;
final protected int minIdleTimeBeforeClose; // if the connection is iddle for more than this protected final int minIdleTimeBeforeClose; // if the connection is iddle for more than this
// time (in ms), it will be closed at any moment. // time (in ms), it will be closed at any moment.
final protected int maxRetries; //the max. no. of retries for socket connections final protected int maxRetries; //the max. no. of retries for socket connections
final protected long failureSleep; // Time to sleep before retry on failure. final protected long failureSleep; // Time to sleep before retry on failure.
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives protected final boolean tcpKeepAlive; // if T then use keepalives
protected FailedServers failedServers; protected final FailedServers failedServers;
private final Codec codec; private final Codec codec;
private final CompressionCodec compressor; private final CompressionCodec compressor;
private final IPCUtil ipcUtil; private final IPCUtil ipcUtil;
@ -140,10 +144,14 @@ public class RpcClient {
public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry"; public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000; public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
public final static String IDLE_TIME = "hbase.ipc.client.connection.minIdleTimeBeforeClose";
public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY =
"hbase.ipc.client.fallback-to-simple-auth-allowed"; "hbase.ipc.client.fallback-to-simple-auth-allowed";
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false; public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
public static final String ALLOWS_INTERRUPTS = "hbase.ipc.client.allowsInterrupt";
// thread-specific RPC timeout, which may override that of what was passed in. // thread-specific RPC timeout, which may override that of what was passed in.
// This is used to change dynamically the timeout (for read only) when retrying: if // This is used to change dynamically the timeout (for read only) when retrying: if
// the time allowed for the operation is less than the usual socket timeout, then // the time allowed for the operation is less than the usual socket timeout, then
@ -223,15 +231,6 @@ public class RpcClient {
} }
} }
/**
* Set the socket timeout
* @param conf Configuration
* @param socketTimeout the socket timeout
*/
public static void setSocketTimeout(Configuration conf, int socketTimeout) {
conf.setInt(SOCKET_TIMEOUT, socketTimeout);
}
/** /**
* @return the socket timeout * @return the socket timeout
*/ */
@ -252,7 +251,7 @@ public class RpcClient {
// The return type. Used to create shell into which we deserialize the response if any. // The return type. Used to create shell into which we deserialize the response if any.
Message responseDefaultType; Message responseDefaultType;
IOException error; // exception, null if value IOException error; // exception, null if value
boolean done; // true when call is done volatile boolean done; // true when call is done
long startTime; long startTime;
final MethodDescriptor md; final MethodDescriptor md;
@ -261,7 +260,7 @@ public class RpcClient {
this.param = param; this.param = param;
this.md = md; this.md = md;
this.cells = cells; this.cells = cells;
this.startTime = System.currentTimeMillis(); this.startTime = EnvironmentEdgeManager.currentTimeMillis();
this.responseDefaultType = responseDefaultType; this.responseDefaultType = responseDefaultType;
this.id = callIdCnt.getAndIncrement(); this.id = callIdCnt.getAndIncrement();
} }
@ -325,6 +324,24 @@ public class RpcClient {
return new Connection(remoteId, codec, compressor); return new Connection(remoteId, codec, compressor);
} }
/**
* see {@link org.apache.hadoop.hbase.ipc.RpcClient.Connection.CallSender}
*/
private static class CallFuture {
Call call;
int priority;
Span span;
// We will use this to stop the writer
final static CallFuture DEATH_PILL = new CallFuture(null, -1, null);
CallFuture(Call call, int priority, Span span) {
this.call = call;
this.priority = priority;
this.span = span;
}
}
/** Thread that reads responses and notifies callers. Each connection owns a /** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this * socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */ * socket: responses may be delivered out of order. */
@ -349,6 +366,123 @@ public class RpcClient {
new ConcurrentSkipListMap<Integer, Call>(); new ConcurrentSkipListMap<Integer, Call>();
protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean(); protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
protected final CallSender callSender;
/**
* If the client wants to interrupt its calls easily (i.e. call Thread#interrupt),
* it gets into a java issue: an interruption during a write closes the socket/channel.
* A way to avoid this is to use a different thread for writing. This way, on interruptions,
* we either cancel the writes or ignore the answer if the write is already done, but we
* don't stop the write in the middle.
* This adds a thread per region server in the client, so it's kept as an option.
* <p>
* The implementation is simple: the client threads adds their call to the queue, and then
* wait for an answer. The CallSender blocks on the queue, and writes the calls one
* after the other. On interruption, the client cancels its call. The CallSender checks that
* the call has not been canceled before writing it.
* </p>
* When the connection closes, all the calls not yet sent are dismissed. The client thread
* is notified with an appropriate exception, as if the call was already sent but the answer
* not yet received.
* </p>
*/
private class CallSender extends Thread implements Closeable {
protected final BlockingQueue<CallFuture> callsToWrite;
public CallFuture sendCall(Call call, int priority, Span span)
throws InterruptedException, IOException {
CallFuture cts = new CallFuture(call, priority, span);
callsToWrite.add(cts);
checkIsOpen(); // We check after the put, to be sure that the call we added won't stay
// in the list while the cleanup was already done.
return cts;
}
public void close(){
assert shouldCloseConnection.get();
callsToWrite.offer(CallFuture.DEATH_PILL);
// We don't care if we can't add the death pill to the queue: the writer
// won't be blocked in the 'take', as its queue is full.
}
CallSender(String name, Configuration conf) {
int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
callsToWrite = new ArrayBlockingQueue<CallFuture>(queueSize);
setDaemon(true);
setName(name + " - writer");
}
public void cancel(CallFuture cts){
cts.call.done = true;
callsToWrite.remove(cts);
calls.remove(cts.call.id);
}
/**
* Reads the call from the queue, write them on the socket.
*/
@Override
public void run() {
while (!shouldCloseConnection.get()) {
CallFuture cts = null;
try {
cts = callsToWrite.take();
} catch (InterruptedException e) {
markClosed(new InterruptedIOException());
}
if (cts == null || cts == CallFuture.DEATH_PILL){
assert shouldCloseConnection.get();
break;
}
if (cts.call.done) {
continue;
}
if (remoteId.rpcTimeout > 0) {
long waitTime = EnvironmentEdgeManager.currentTimeMillis() - cts.call.getStartTime();
if (waitTime >= remoteId.rpcTimeout) {
IOException ie = new CallTimeoutException("Call id=" + cts.call.id +
", waitTime=" + waitTime + ", rpcTimetout=" + remoteId.rpcTimeout +
", expired before being sent to the server.");
cts.call.setException(ie); // includes a notify
continue;
}
}
try {
Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span);
} catch (IOException e) {
LOG.warn("call write error for call #" + cts.call.id + ", message =" + e.getMessage());
cts.call.setException(e);
markClosed(e);
}
}
cleanup();
}
/**
* Cleans the call not yet sent when we finish.
*/
private void cleanup() {
assert shouldCloseConnection.get();
IOException ie = new IOException("Connection to " + server + " is closing.");
while (true) {
CallFuture cts = callsToWrite.poll();
if (cts == null) {
break;
}
if (cts.call != null && !cts.call.done) {
cts.call.setException(ie);
}
}
}
}
Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor) Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
throws IOException { throws IOException {
@ -421,6 +555,13 @@ public class RpcClient {
((ticket==null)?" from an unknown user": (" from " ((ticket==null)?" from an unknown user": (" from "
+ ticket.getUserName()))); + ticket.getUserName())));
this.setDaemon(true); this.setDaemon(true);
if (conf.getBoolean(ALLOWS_INTERRUPTS, false)) {
callSender = new CallSender(getName(), conf);
callSender.start();
} else {
callSender = null;
}
} }
private UserInformation getUserInfo(UserGroupInformation ugi) { private UserInformation getUserInfo(UserGroupInformation ugi) {
@ -470,7 +611,7 @@ public class RpcClient {
} }
} }
protected void closeConnection() { protected synchronized void closeConnection() {
if (socket == null) { if (socket == null) {
return; return;
} }
@ -556,23 +697,38 @@ public class RpcClient {
* *
* Return true if it is time to read a response; false otherwise. * Return true if it is time to read a response; false otherwise.
*/ */
protected synchronized boolean waitForWork() throws InterruptedException{ protected synchronized boolean waitForWork() throws InterruptedException {
while (calls.isEmpty() && !shouldCloseConnection.get() && running.get() ) { // beware of the concurrent access to the calls list: we can add calls, but as well
wait(minIdleTimeBeforeClose); // remove them.
long waitUntil = EnvironmentEdgeManager.currentTimeMillis() + minIdleTimeBeforeClose;
while (!shouldCloseConnection.get() && running.get() &&
EnvironmentEdgeManager.currentTimeMillis() < waitUntil && calls.isEmpty()) {
wait(Math.min(minIdleTimeBeforeClose, 1000));
} }
if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { if (shouldCloseConnection.get()) {
return true;
} else if (shouldCloseConnection.get()) {
return false;
} else if (calls.isEmpty()) {
markClosed(new IOException("idle connection closed or stopped"));
return false;
} else { // get stopped but there are still pending requests
markClosed((IOException)new IOException().initCause(
new InterruptedException()));
return false; return false;
} }
if (!running.get()) {
markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
return false;
}
if (!calls.isEmpty()) {
// shouldCloseConnection can be set to true by a parallel thread here. The caller
// will need to check anyway.
return true;
}
// Connection is idle.
// We expect the number of calls to be zero here, but actually someone can
// adds a call at the any moment, as there is no synchronization between this task
// and adding new calls. It's not a big issue, but it will get an exception.
markClosed(new IOException(
"idle connection closed with " + calls.size() + " pending request(s)"));
return false;
} }
public InetSocketAddress getRemoteAddress() { public InetSocketAddress getRemoteAddress() {
@ -590,7 +746,7 @@ public class RpcClient {
readResponse(); readResponse();
} }
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn(getName() + ": unexpected exception receiving call responses", t); LOG.debug(getName() + ": unexpected exception receiving call responses", t);
markClosed(new IOException("Unexpected exception receiving call responses", t)); markClosed(new IOException("Unexpected exception receiving call responses", t));
} }
@ -811,9 +967,8 @@ public class RpcClient {
/** /**
* Write the connection header. * Write the connection header.
* Out is not synchronized because only the first thread does this.
*/ */
private void writeConnectionHeader() throws IOException { private synchronized void writeConnectionHeader() throws IOException {
synchronized (this.out) { synchronized (this.out) {
this.out.writeInt(this.header.getSerializedSize()); this.out.writeInt(this.header.getSerializedSize());
this.header.writeTo(this.out); this.header.writeTo(this.out);
@ -852,8 +1007,18 @@ public class RpcClient {
cleanupCalls(); cleanupCalls();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": ipc connection closed"); LOG.debug(getName() + ": ipc connection to " + server + " closed");
}
}
protected void tracedWriteRequest(Call call, int priority, Span span) throws IOException {
TraceScope ts = Trace.continueSpan(span);
try {
writeRequest(call, priority, span);
} finally {
ts.close();
}
} }
/** /**
@ -864,15 +1029,12 @@ public class RpcClient {
* @param priority * @param priority
* @see #readResponse() * @see #readResponse()
*/ */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", private void writeRequest(Call call, final int priority, Span span) throws IOException {
justification = "on close the reader thread must stop")
protected void writeRequest(Call call, final int priority) throws IOException {
RequestHeader.Builder builder = RequestHeader.newBuilder(); RequestHeader.Builder builder = RequestHeader.newBuilder();
builder.setCallId(call.id); builder.setCallId(call.id);
if (Trace.isTracing()) { if (span != null) {
Span s = Trace.currentSpan(); builder.setTraceInfo(
builder.setTraceInfo(RPCTInfo.newBuilder(). RPCTInfo.newBuilder().setParentId(span.getSpanId()).setTraceId(span.getTraceId()));
setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
} }
builder.setMethodName(call.md.getName()); builder.setMethodName(call.md.getName());
builder.setRequestParam(call.param != null); builder.setRequestParam(call.param != null);
@ -890,11 +1052,12 @@ public class RpcClient {
// is still valid, and, if so we do the write to the socket. If the write fails, we don't // is still valid, and, if so we do the write to the socket. If the write fails, we don't
// know where we stand, we have to close the connection. // know where we stand, we have to close the connection.
checkIsOpen(); checkIsOpen();
calls.put(call.id, call); // On error, the call will be removed by the timeout. IOException writeException = null;
try { synchronized (this.out) {
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
if (Thread.interrupted()) throw new InterruptedIOException(); if (Thread.interrupted()) throw new InterruptedIOException();
checkIsOpen();
calls.put(call.id, call); // We put first as we don't want the connection to become idle.
checkIsOpen(); // Now we're checking that it didn't became idle in between.
try { try {
IPCUtil.write(this.out, header, call.param, cellBlock); IPCUtil.write(this.out, header, call.param, cellBlock);
@ -902,16 +1065,19 @@ public class RpcClient {
// We set the value inside the synchronized block, this way the next in line // We set the value inside the synchronized block, this way the next in line
// won't even try to write // won't even try to write
shouldCloseConnection.set(true); shouldCloseConnection.set(true);
throw e; writeException = e;
} }
} }
} finally {
synchronized (this) { // We added a call, and may be started the connection close. In both cases, we
// We added a call, and may start the connection clode. In both cases, we
// need to notify the reader. // need to notify the reader.
synchronized (this) {
notifyAll(); notifyAll();
} }
}
// Now that we notified, we can rethrow the exception if any. Otherwise we're good.
if (writeException != null) throw writeException;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header)); LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header));
} }
@ -922,7 +1088,7 @@ public class RpcClient {
*/ */
protected void readResponse() { protected void readResponse() {
if (shouldCloseConnection.get()) return; if (shouldCloseConnection.get()) return;
int totalSize = -1; int totalSize;
try { try {
// See HBaseServer.Call.setResponse for where we write out the response. // See HBaseServer.Call.setResponse for where we write out the response.
// Total size of the response. Unused. But have to read it in anyways. // Total size of the response. Unused. But have to read it in anyways.
@ -936,7 +1102,8 @@ public class RpcClient {
TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes"); TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes");
} }
Call call = calls.remove(id); Call call = calls.remove(id);
if (call == null) { boolean expectedCall = (call != null && !call.done);
if (!expectedCall) {
// So we got a response for which we have no corresponding 'call' here on the client-side. // So we got a response for which we have no corresponding 'call' here on the client-side.
// We probably timed out waiting, cleaned up all references, and now the server decides // We probably timed out waiting, cleaned up all references, and now the server decides
// to return a response. There is nothing we can do w/ the response at this stage. Clean // to return a response. There is nothing we can do w/ the response at this stage. Clean
@ -954,12 +1121,12 @@ public class RpcClient {
if (isFatalConnectionException(exceptionResponse)) { if (isFatalConnectionException(exceptionResponse)) {
markClosed(re); markClosed(re);
} else { } else {
if (call != null) call.setException(re); if (expectedCall) call.setException(re);
} }
} else { } else {
Message value = null; Message value = null;
// Call may be null because it may have timedout and been cleaned up on this side already // Call may be null because it may have timedout and been cleaned up on this side already
if (call != null && call.responseDefaultType != null) { if (expectedCall && call.responseDefaultType != null) {
Builder builder = call.responseDefaultType.newBuilderForType(); Builder builder = call.responseDefaultType.newBuilderForType();
builder.mergeDelimitedFrom(in); builder.mergeDelimitedFrom(in);
value = builder.build(); value = builder.build();
@ -973,7 +1140,7 @@ public class RpcClient {
} }
// it's possible that this call may have been cleaned up due to a RPC // it's possible that this call may have been cleaned up due to a RPC
// timeout, so check if it still exists before setting the value. // timeout, so check if it still exists before setting the value.
if (call != null) call.setResponse(value, cellBlockScanner); if (expectedCall) call.setResponse(value, cellBlockScanner);
} }
} catch (IOException e) { } catch (IOException e) {
if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) { if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
@ -985,11 +1152,9 @@ public class RpcClient {
markClosed(e); markClosed(e);
} }
} finally { } finally {
if (remoteId.rpcTimeout > 0) {
cleanupCalls(remoteId.rpcTimeout); cleanupCalls(remoteId.rpcTimeout);
} }
} }
}
/** /**
* @param e * @param e
@ -1015,34 +1180,42 @@ public class RpcClient {
e.getStackTrace(), doNotRetry); e.getStackTrace(), doNotRetry);
} }
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", protected synchronized void markClosed(IOException e) {
justification = "on close the reader thread must stop")
protected void markClosed(IOException e) {
if (e == null) throw new NullPointerException(); if (e == null) throw new NullPointerException();
if (shouldCloseConnection.compareAndSet(false, true)) { if (shouldCloseConnection.compareAndSet(false, true)) {
LOG.warn(getName() + ": marking at should close, reason =" + e.getMessage());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": marking at should closed, reason =" + e.getMessage()); LOG.debug(getName() + ": marking at should close, reason =" + e.getMessage());
}
if (callSender != null) {
callSender.close();
} }
synchronized (this) {
notifyAll(); notifyAll();
} }
} }
}
/* Cleanup all calls and mark them as done */ /* Cleanup all calls and mark them as done */
protected void cleanupCalls() { protected void cleanupCalls() {
cleanupCalls(0); cleanupCalls(-1);
} }
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", /**
justification="Notify because timeout") * Cleanup the calls older than a given timeout, in milli seconds.
protected void cleanupCalls(long rpcTimeout) { * @param rpcTimeout -1 for all calls, > 0 otherwise. 0 means no timeout and does nothing.
*/
protected synchronized void cleanupCalls(long rpcTimeout) {
if (rpcTimeout == 0) return;
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator(); Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
while (itor.hasNext()) { while (itor.hasNext()) {
Call c = itor.next().getValue(); Call c = itor.next().getValue();
long waitTime = System.currentTimeMillis() - c.getStartTime(); long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime();
if (waitTime >= rpcTimeout) { if (rpcTimeout < 0) {
IOException ie = new IOException("Call id=" + c.id + ", waitTime=" + waitTime);
c.setException(ie);
itor.remove();
} else if (waitTime >= rpcTimeout) {
IOException ie = new CallTimeoutException("Call id=" + c.id + IOException ie = new CallTimeoutException("Call id=" + c.id +
", waitTime=" + waitTime + ", rpcTimeout=" + rpcTimeout); ", waitTime=" + waitTime + ", rpcTimeout=" + rpcTimeout);
c.setException(ie); c.setException(ie);
@ -1050,34 +1223,19 @@ public class RpcClient {
} else { } else {
// This relies on the insertion order to be the call id order. This is not // This relies on the insertion order to be the call id order. This is not
// true under 'difficult' conditions (gc, ...). // true under 'difficult' conditions (gc, ...).
rpcTimeout -= waitTime;
break; break;
} }
} }
if (!calls.isEmpty()) { if (!shouldCloseConnection.get() && socket != null && rpcTimeout > 0) {
Call firstCall = calls.get(calls.firstKey());
long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime();
if (maxWaitTime < rpcTimeout) {
rpcTimeout -= maxWaitTime;
}
}
try { try {
if (!shouldCloseConnection.get()) { socket.setSoTimeout((int)rpcTimeout);
setSocketTimeout(socket, (int) rpcTimeout);
}
} catch (SocketException e) { } catch (SocketException e) {
LOG.warn("Couldn't lower timeout, which may result in longer than expected calls"); LOG.warn("Couldn't change timeout, which may result in longer than expected calls");
} }
} }
} }
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
justification="Presume sync not needed setting socket timeout")
private static void setSocketTimeout(final Socket socket, final int rpcTimeout)
throws java.net.SocketException {
if (socket == null) return;
socket.setSoTimeout(rpcTimeout);
} }
/** /**
@ -1110,8 +1268,7 @@ public class RpcClient {
* @param localAddr client socket bind address * @param localAddr client socket bind address
*/ */
RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr) { RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr) {
this.minIdleTimeBeforeClose = this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
conf.getInt("hbase.ipc.client.connection.minIdleTimeBeforeClose", 120000); // 2 minutes
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE); HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
@ -1289,19 +1446,32 @@ public class RpcClient {
* @throws IOException * @throws IOException
*/ */
Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells, Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
Message returnType, User ticket, InetSocketAddress addr, Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout, int priority)
int rpcTimeout, int priority) throws IOException, InterruptedException {
throws InterruptedException, IOException {
Call call = new Call(md, param, cells, returnType); Call call = new Call(md, param, cells, returnType);
Connection connection = Connection connection =
getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor); getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
connection.writeRequest(call, priority); CallFuture cts = null;
if (connection.callSender != null){
cts = connection.callSender.sendCall(call, priority, Trace.currentSpan());
} else {
connection.tracedWriteRequest(call, priority, Trace.currentSpan());
}
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (call) {
while (!call.done) { while (!call.done) {
call.wait(1000); // wait for the result try {
synchronized (call) {
call.wait(1000); // wait for the result. We will be notified by the reader.
}
} catch (InterruptedException e) {
if (cts != null) {
connection.callSender.cancel(cts);
} else {
call.done = true;
}
throw e;
}
} }
if (call.error != null) { if (call.error != null) {
@ -1312,9 +1482,9 @@ public class RpcClient {
// local exception // local exception
throw wrapException(addr, call.error); throw wrapException(addr, call.error);
} }
return new Pair<Message, CellScanner>(call.response, call.cells); return new Pair<Message, CellScanner>(call.response, call.cells);
} }
}
@ -1361,9 +1531,8 @@ public class RpcClient {
connection.getRemoteAddress().getHostName().equals(hostname)) { connection.getRemoteAddress().getHostName().equals(hostname)) {
LOG.info("The server on " + hostname + ":" + port + LOG.info("The server on " + hostname + ":" + port +
" is dead - stopping the connection " + connection.remoteId); " is dead - stopping the connection " + connection.remoteId);
connection.closeConnection(); connection.interrupt(); // We're interrupting a Reader. It means we want it to finish.
// We could do a connection.interrupt(), but it's safer not to do it, as the // This will close the connection as well.
// interrupted exception behavior is not defined nor enforced enough.
} }
} }
} }
@ -1465,10 +1634,6 @@ public class RpcClient {
rpcTimeout.set(t); rpcTimeout.set(t);
} }
public static int getRpcTimeout() {
return rpcTimeout.get();
}
/** /**
* Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given * Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given
* default timeout. * default timeout.
@ -1484,18 +1649,10 @@ public class RpcClient {
/** /**
* Make a blocking call. Throws exceptions if there are network problems or if the remote code * Make a blocking call. Throws exceptions if there are network problems or if the remote code
* threw an exception. * threw an exception.
* @param md
* @param controller
* @param param
* @param returnType
* @param isa
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection. * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
* new Connection each time. * new Connection each time.
* @param rpcTimeout
* @return A pair with the Message response and the Cell data (if any). * @return A pair with the Message response and the Cell data (if any).
* @throws InterruptedException
* @throws IOException
*/ */
Message callBlockingMethod(MethodDescriptor md, RpcController controller, Message callBlockingMethod(MethodDescriptor md, RpcController controller,
Message param, Message returnType, final User ticket, final InetSocketAddress isa, Message param, Message returnType, final User ticket, final InetSocketAddress isa,
@ -1503,7 +1660,7 @@ public class RpcClient {
throws ServiceException { throws ServiceException {
long startTime = 0; long startTime = 0;
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
startTime = System.currentTimeMillis(); startTime = EnvironmentEdgeManager.currentTimeMillis();
} }
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller; PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller;
CellScanner cells = null; CellScanner cells = null;
@ -1524,11 +1681,9 @@ public class RpcClient {
} }
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
long callTime = System.currentTimeMillis() - startTime; long callTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
if (LOG.isTraceEnabled()) {
LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms"); LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
} }
}
return val.getFirst(); return val.getFirst();
} catch (Throwable e) { } catch (Throwable e) {
throw new ServiceException(e); throw new ServiceException(e);
@ -1538,9 +1693,6 @@ public class RpcClient {
/** /**
* Creates a "channel" that can be used by a blocking protobuf service. Useful setting up * Creates a "channel" that can be used by a blocking protobuf service. Useful setting up
* protobuf blocking stubs. * protobuf blocking stubs.
* @param sn
* @param ticket
* @param rpcTimeout
* @return A blocking rpc channel that goes via this rpc client instance. * @return A blocking rpc channel that goes via this rpc client instance.
*/ */
public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn,
@ -1551,10 +1703,10 @@ public class RpcClient {
/** /**
* Blocking rpc channel that goes via hbase rpc. * Blocking rpc channel that goes via hbase rpc.
*/ */
// Public so can be subclassed for tests. @VisibleForTesting
public static class BlockingRpcChannelImplementation implements BlockingRpcChannel { public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
private final InetSocketAddress isa; private final InetSocketAddress isa;
private volatile RpcClient rpcClient; private final RpcClient rpcClient;
private final int rpcTimeout; private final int rpcTimeout;
private final User ticket; private final User ticket;

View File

@ -795,7 +795,7 @@ public class RpcServer implements RpcServerInterface {
} catch (InterruptedException ieo) { } catch (InterruptedException ieo) {
throw ieo; throw ieo;
} catch (Exception e) { } catch (Exception e) {
LOG.warn(getName() + ": count of bytes read: " + count, e); LOG.info(getName() + ": count of bytes read: " + count, e);
count = -1; //so that the (count < 0) block is executed count = -1; //so that the (count < 0) block is executed
} }
if (count < 0) { if (count < 0) {

View File

@ -39,6 +39,8 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -46,7 +48,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -228,6 +230,151 @@ public class TestHCM {
hci.getClient(sn); // will throw an exception: RegionServerStoppedException hci.getClient(sn); // will throw an exception: RegionServerStoppedException
} }
/**
* Test that we can handle connection close: it will trigger a retry, but the calls will
* finish.
*/
@Test
public void testConnectionCloseAllowsInterrupt() throws Exception {
testConnectionClose(true);
}
@Test
public void testConnectionNotAllowsInterrupt() throws Exception {
testConnectionClose(false);
}
private void testConnectionClose(boolean allowsInterrupt) throws Exception {
String tableName = "HCM-testConnectionClose" + allowsInterrupt;
TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
// We want to work on a separate connection.
c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE - 1); // retry a lot
c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 0); // don't wait between retries.
c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
c2.setBoolean(RpcClient.ALLOWS_INTERRUPTS, allowsInterrupt);
final HTable table = new HTable(c2, tableName.getBytes());
Put put = new Put(ROW);
put.add(FAM_NAM, ROW, ROW);
table.put(put);
// 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
final AtomicInteger step = new AtomicInteger(0);
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(null);
Thread t = new Thread("testConnectionCloseThread") {
public void run() {
int done = 0;
try {
step.set(1);
while (step.get() == 1) {
Get get = new Get(ROW);
table.get(get);
done++;
if (done % 100 == 0)
LOG.info("done=" + done);
}
} catch (Throwable t) {
failed.set(t);
LOG.error(t);
}
step.set(3);
}
};
t.start();
TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return step.get() == 1;
}
});
ServerName sn = table.getRegionLocation(ROW).getServerName();
ConnectionManager.HConnectionImplementation conn =
(ConnectionManager.HConnectionImplementation) table.getConnection();
RpcClient rpcClient = conn.getRpcClient();
LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
for (int i = 0; i < 5000; i++) {
rpcClient.cancelConnections(sn.getHostname(), sn.getPort(), null);
Thread.sleep(5);
}
step.compareAndSet(1, 2);
// The test may fail here if the thread doing the gets is stuck. The wait to find
// out what's happening is to look for the thread named 'testConnectionCloseThread'
TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return step.get() == 3;
}
});
Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
}
/**
* Test that connection can become idle without breaking everything.
*/
@Test
public void testConnectionIdle() throws Exception {
String tableName = "HCM-testConnectionIdle";
TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
int idleTime = 20000;
boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
// We want to work on a separate connection.
c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
c2.setInt(RpcClient.IDLE_TIME, idleTime);
final HTable table = new HTable(c2, tableName.getBytes());
Put put = new Put(ROW);
put.add(FAM_NAM, ROW, ROW);
table.put(put);
ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
mee.setValue(System.currentTimeMillis());
EnvironmentEdgeManager.injectEdge(mee);
LOG.info("first get");
table.get(new Get(ROW));
LOG.info("first get - changing the time & sleeping");
mee.incValue(idleTime + 1000);
Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
// 1500 = sleep time in RpcClient#waitForWork + a margin
LOG.info("second get - connection has been marked idle in the middle");
// To check that the connection actually became idle would need to read some private
// fields of RpcClient.
table.get(new Get(ROW));
mee.incValue(idleTime + 1000);
LOG.info("third get - connection is idle, but the reader doesn't know yet");
// We're testing here a special case:
// time limit reached BUT connection not yet reclaimed AND a new call.
// in this situation, we don't close the connection, instead we use it immediately.
// If we're very unlucky we can have a race condition in the test: the connection is already
// under closing when we do the get, so we have an exception, and we don't retry as the
// retry number is 1. The probability is very very low, and seems acceptable for now. It's
// a test issue only.
table.get(new Get(ROW));
LOG.info("we're done - time will change back");
EnvironmentEdgeManager.reset();
TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
}
/** /**
* Test that the connection to the dead server is cut immediately when we receive the * Test that the connection to the dead server is cut immediately when we receive the
* notification. * notification.