HBASE-10490 Simplify RpcClient code

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1567919 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2014-02-13 13:51:38 +00:00
parent d5047681e0
commit 0bafd16057
4 changed files with 173 additions and 308 deletions

View File

@ -19,37 +19,14 @@
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import java.io.BufferedInputStream; import com.google.common.annotations.VisibleForTesting;
import java.io.BufferedOutputStream; import com.google.protobuf.BlockingRpcChannel;
import java.io.DataInputStream; import com.google.protobuf.Descriptors.MethodDescriptor;
import java.io.DataOutputStream; import com.google.protobuf.Message;
import java.io.FilterInputStream; import com.google.protobuf.Message.Builder;
import java.io.IOException; import com.google.protobuf.RpcController;
import java.io.InputStream; import com.google.protobuf.ServiceException;
import java.io.InterruptedIOException; import com.google.protobuf.TextFormat;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.SocketFactory;
import javax.security.sasl.SaslException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -94,14 +71,34 @@ import org.apache.hadoop.security.token.TokenSelector;
import org.cloudera.htrace.Span; import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace; import org.cloudera.htrace.Trace;
import com.google.common.annotations.VisibleForTesting; import javax.net.SocketFactory;
import com.google.protobuf.BlockingRpcChannel; import javax.security.sasl.SaslException;
import com.google.protobuf.Descriptors.MethodDescriptor; import java.io.BufferedInputStream;
import com.google.protobuf.Message; import java.io.BufferedOutputStream;
import com.google.protobuf.Message.Builder; import java.io.DataInputStream;
import com.google.protobuf.RpcController; import java.io.DataOutputStream;
import com.google.protobuf.ServiceException; import java.io.IOException;
import com.google.protobuf.TextFormat; import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -115,16 +112,15 @@ public class RpcClient {
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcClient"); public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcClient");
protected final PoolMap<ConnectionId, Connection> connections; protected final PoolMap<ConnectionId, Connection> connections;
protected int counter; // counter for call ids protected final AtomicInteger callIdCnt = new AtomicInteger();
protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
final protected Configuration conf; final protected Configuration conf;
final protected int maxIdleTime; // connections will be culled if it was idle for final protected int minIdleTimeBeforeClose; // if the connection is iddle for more than this
// maxIdleTime microsecs // time (in ms), it will be closed at any moment.
final protected int maxRetries; //the max. no. of retries for socket connections final protected int maxRetries; //the max. no. of retries for socket connections
final protected long failureSleep; // Time to sleep before retry on failure. final protected long failureSleep; // Time to sleep before retry on failure.
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives protected final boolean tcpKeepAlive; // if T then use keepalives
protected int pingInterval; // how often sends ping to the server in msecs
protected FailedServers failedServers; protected FailedServers failedServers;
private final Codec codec; private final Codec codec;
private final CompressionCodec compressor; private final CompressionCodec compressor;
@ -137,11 +133,9 @@ public class RpcClient {
private final boolean fallbackAllowed; private final boolean fallbackAllowed;
private UserProvider userProvider; private UserProvider userProvider;
final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
final private static String SOCKET_TIMEOUT = "ipc.socket.timeout"; final private static String SOCKET_TIMEOUT = "ipc.socket.timeout";
final static int DEFAULT_PING_INTERVAL = 60000; // 1 min
final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds
final static int PING_CALL_ID = -1; final static int PING_CALL_ID = -1; // Used by the server, for compatibility with old clients.
public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry"; public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000; public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
@ -213,39 +207,22 @@ public class RpcClient {
} }
} }
/**
* Indicates that we're trying to connect to a already known as dead server. We will want to
* retry: we're getting this because the region location was wrong, or because
* the server just died, in which case the retry loop will help us to wait for the
* regions to recover.
*/
@SuppressWarnings("serial") @SuppressWarnings("serial")
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
// Shouldn't this be a DoNotRetryException? St.Ack 10/2/2013
public static class FailedServerException extends HBaseIOException { public static class FailedServerException extends HBaseIOException {
public FailedServerException(String s) { public FailedServerException(String s) {
super(s); super(s);
} }
} }
/**
* set the ping interval value in configuration
*
* @param conf Configuration
* @param pingInterval the ping interval
*/
// Any reason we couldn't just do tcp keepalive instead of this pingery?
// St.Ack 20130121
public static void setPingInterval(Configuration conf, int pingInterval) {
conf.setInt(PING_INTERVAL_NAME, pingInterval);
}
/**
* Get the ping interval from configuration;
* If not set in the configuration, return the default value.
*
* @param conf Configuration
* @return the ping interval
*/
static int getPingInterval(Configuration conf) {
return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
}
/** /**
* Set the socket timeout * Set the socket timeout
* @param conf Configuration * @param conf Configuration
@ -286,9 +263,7 @@ public class RpcClient {
this.cells = cells; this.cells = cells;
this.startTime = System.currentTimeMillis(); this.startTime = System.currentTimeMillis();
this.responseDefaultType = responseDefaultType; this.responseDefaultType = responseDefaultType;
synchronized (RpcClient.this) { this.id = callIdCnt.getAndIncrement();
this.id = counter++;
}
} }
@Override @Override
@ -358,7 +333,7 @@ public class RpcClient {
protected ConnectionId remoteId; protected ConnectionId remoteId;
protected Socket socket = null; // connected socket protected Socket socket = null; // connected socket
protected DataInputStream in; protected DataInputStream in;
protected DataOutputStream out; protected DataOutputStream out; // Warning: can be locked inside a class level lock.
private InetSocketAddress server; // server ip:port private InetSocketAddress server; // server ip:port
private String serverPrincipal; // server's krb5 principal name private String serverPrincipal; // server's krb5 principal name
private AuthMethod authMethod; // authentication method private AuthMethod authMethod; // authentication method
@ -372,11 +347,8 @@ public class RpcClient {
// currently active calls // currently active calls
protected final ConcurrentSkipListMap<Integer, Call> calls = protected final ConcurrentSkipListMap<Integer, Call> calls =
new ConcurrentSkipListMap<Integer, Call>(); new ConcurrentSkipListMap<Integer, Call>();
protected final AtomicLong lastActivity =
new AtomicLong(); // last I/O activity time protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
protected final AtomicBoolean shouldCloseConnection =
new AtomicBoolean(); // indicate if the connection is closed
protected IOException closeException; // close reason
Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor) Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
throws IOException { throws IOException {
@ -470,97 +442,6 @@ public class RpcClient {
return userInfoPB.build(); return userInfoPB.build();
} }
/** Update lastActivity with the current time. */
protected void touch() {
lastActivity.set(System.currentTimeMillis());
}
/**
* Add a call to this connection's call queue and notify
* a listener; synchronized. If the connection is dead, the call is not added, and the
* caller is notified.
* This function can return a connection that is already marked as 'shouldCloseConnection'
* It is up to the user code to check this status.
* @param call to add
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
justification="Notify because new call available for processing")
protected synchronized void addCall(Call call) {
// If the connection is about to close, we manage this as if the call was already added
// to the connection calls list. If not, the connection creations are serialized, as
// mentioned in HBASE-6364
if (this.shouldCloseConnection.get()) {
if (this.closeException == null) {
call.setException(new IOException(
"Call " + call.id + " not added as the connection " + remoteId + " is closing"));
} else {
call.setException(this.closeException);
}
synchronized (call) {
call.notifyAll();
}
} else {
calls.put(call.id, call);
synchronized (call) {
notify();
}
}
}
/** This class sends a ping to the remote side when timeout on
* reading. If no failure is detected, it retries until at least
* a byte is read.
*/
protected class PingInputStream extends FilterInputStream {
/* constructor */
protected PingInputStream(InputStream in) {
super(in);
}
/* Process timeout exception
* if the connection is not going to be closed, send a ping.
* otherwise, throw the timeout exception.
*/
private void handleTimeout(SocketTimeoutException e) throws IOException {
if (shouldCloseConnection.get() || !running.get() || remoteId.rpcTimeout > 0) {
throw e;
}
sendPing();
}
/** Read a byte from the stream.
* Send a ping if timeout on read. Retries if no failure is detected
* until a byte is read.
* @throws IOException for any IO problem other than socket timeout
*/
@Override
public int read() throws IOException {
do {
try {
return super.read();
} catch (SocketTimeoutException e) {
handleTimeout(e);
}
} while (true);
}
/** Read bytes into a buffer starting from offset <code>off</code>
* Send a ping if timeout on read. Retries if no failure is detected
* until a byte is read.
*
* @return the total number of bytes read; -1 if the connection is closed.
*/
@Override
public int read(byte[] buf, int off, int len) throws IOException {
do {
try {
return super.read(buf, off, len);
} catch (SocketTimeoutException e) {
handleTimeout(e);
}
} while (true);
}
}
protected synchronized void setupConnection() throws IOException { protected synchronized void setupConnection() throws IOException {
short ioFailures = 0; short ioFailures = 0;
@ -576,10 +457,7 @@ public class RpcClient {
// connection time out is 20s // connection time out is 20s
NetUtils.connect(this.socket, remoteId.getAddress(), NetUtils.connect(this.socket, remoteId.getAddress(),
getSocketTimeout(conf)); getSocketTimeout(conf));
if (remoteId.rpcTimeout > 0) { this.socket.setSoTimeout(remoteId.rpcTimeout);
pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
}
this.socket.setSoTimeout(pingInterval);
return; return;
} catch (SocketTimeoutException toe) { } catch (SocketTimeoutException toe) {
/* The max number of retries is 45, /* The max number of retries is 45,
@ -663,30 +541,32 @@ public class RpcClient {
" time(s)."); " time(s).");
} }
/**
* @throws IOException if the connection is not open.
*/
private void checkIsOpen() throws IOException {
if (shouldCloseConnection.get()) {
throw new IOException(getName() + " is closing");
}
}
/* wait till someone signals us to start reading RPC response or /* wait till someone signals us to start reading RPC response or
* it is idle too long, it is marked as to be closed, * it is idle too long, it is marked as to be closed,
* or the client is marked as not running. * or the client is marked as not running.
* *
* Return true if it is time to read a response; false otherwise. * Return true if it is time to read a response; false otherwise.
*/ */
protected synchronized boolean waitForWork() { protected synchronized boolean waitForWork() throws InterruptedException{
if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { while (calls.isEmpty() && !shouldCloseConnection.get() && running.get() ) {
long timeout = maxIdleTime - (System.currentTimeMillis()-lastActivity.get()); wait(minIdleTimeBeforeClose);
if (timeout>0) {
try {
wait(timeout);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
} }
if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
return true; return true;
} else if (shouldCloseConnection.get()) { } else if (shouldCloseConnection.get()) {
return false; return false;
} else if (calls.isEmpty()) { // idle connection closed or stopped } else if (calls.isEmpty()) {
markClosed(null); markClosed(new IOException("idle connection closed or stopped"));
return false; return false;
} else { // get stopped but there are still pending requests } else { // get stopped but there are still pending requests
markClosed((IOException)new IOException().initCause( markClosed((IOException)new IOException().initCause(
@ -699,22 +579,6 @@ public class RpcClient {
return remoteId.getAddress(); return remoteId.getAddress();
} }
/* Send a ping to the server if the time elapsed
* since last I/O activity is equal to or greater than the ping interval
*/
protected synchronized void sendPing() throws IOException {
// Can we do tcp keepalive instead of this pinging?
long curTime = System.currentTimeMillis();
if ( curTime - lastActivity.get() >= pingInterval) {
lastActivity.set(curTime);
//noinspection SynchronizeOnNonFinalField
synchronized (this.out) {
out.writeInt(PING_CALL_ID);
out.flush();
}
}
}
@Override @Override
public void run() { public void run() {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -836,8 +700,7 @@ public class RpcClient {
}); });
} }
protected synchronized void setupIOstreams() protected synchronized void setupIOstreams() throws IOException {
throws IOException, InterruptedException {
if (socket != null || shouldCloseConnection.get()) { if (socket != null || shouldCloseConnection.get()) {
return; return;
} }
@ -867,7 +730,7 @@ public class RpcClient {
// This creates a socket with a write timeout. This timeout cannot be changed, // This creates a socket with a write timeout. This timeout cannot be changed,
// RpcClient allows to change the timeout dynamically, but we can only // RpcClient allows to change the timeout dynamically, but we can only
// change the read timeout today. // change the read timeout today.
OutputStream outStream = NetUtils.getOutputStream(socket, pingInterval); OutputStream outStream = NetUtils.getOutputStream(socket, remoteId.rpcTimeout);
// Write out the preamble -- MAGIC, version, and auth to use. // Write out the preamble -- MAGIC, version, and auth to use.
writeConnectionHeaderPreamble(outStream); writeConnectionHeaderPreamble(outStream);
if (useSasl) { if (useSasl) {
@ -879,7 +742,7 @@ public class RpcClient {
ticket = ticket.getRealUser(); ticket = ticket.getRealUser();
} }
} }
boolean continueSasl = false; boolean continueSasl;
if (ticket == null) throw new FatalConnectionException("ticket/user is null"); if (ticket == null) throw new FatalConnectionException("ticket/user is null");
try { try {
continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() { continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
@ -905,28 +768,24 @@ public class RpcClient {
useSasl = false; useSasl = false;
} }
} }
this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream))); this.in = new DataInputStream(new BufferedInputStream(inStream));
this.out = new DataOutputStream(new BufferedOutputStream(outStream)); this.out = new DataOutputStream(new BufferedOutputStream(outStream));
// Now write out the connection header // Now write out the connection header
writeConnectionHeader(); writeConnectionHeader();
// update last activity time
touch();
// start the receiver thread after the socket connection has been set up // start the receiver thread after the socket connection has been set up
start(); start();
return; return;
} }
} catch (Throwable t) { } catch (Throwable t) {
failedServers.addToFailedServers(remoteId.address); failedServers.addToFailedServers(remoteId.address);
IOException e = null; IOException e;
if (t instanceof IOException) { if (t instanceof IOException) {
e = (IOException)t; e = (IOException)t;
markClosed(e);
} else { } else {
e = new IOException("Could not set up IO Streams", t); e = new IOException("Could not set up IO Streams", t);
markClosed(e);
} }
markClosed(e);
close(); close();
throw e; throw e;
} }
@ -986,28 +845,15 @@ public class RpcClient {
this.in = null; this.in = null;
disposeSasl(); disposeSasl();
// clean up all calls // log the info
if (closeException == null) { if (LOG.isDebugEnabled()) {
if (!calls.isEmpty()) { LOG.debug(getName() + ": closing ipc connection to " + server);
LOG.warn(getName() + ": connection is closed for no cause and calls are not empty. " +
"#Calls: " + calls.size());
// clean up calls anyway
closeException = new IOException("Unexpected closed connection");
cleanupCalls();
}
} else {
// log the info
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": closing ipc connection to " + server + ": " +
closeException.getMessage(), closeException);
}
// cleanup calls
cleanupCalls();
} }
cleanupCalls();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(getName() + ": closed"); LOG.debug(getName() + ": ipc connection closed");
} }
/** /**
@ -1018,36 +864,56 @@ public class RpcClient {
* @param priority * @param priority
* @see #readResponse() * @see #readResponse()
*/ */
protected void writeRequest(Call call, final int priority) { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
if (shouldCloseConnection.get()) return; justification = "on close the reader thread must stop")
try { protected void writeRequest(Call call, final int priority) throws IOException {
RequestHeader.Builder builder = RequestHeader.newBuilder(); RequestHeader.Builder builder = RequestHeader.newBuilder();
builder.setCallId(call.id); builder.setCallId(call.id);
if (Trace.isTracing()) { if (Trace.isTracing()) {
Span s = Trace.currentSpan(); Span s = Trace.currentSpan();
builder.setTraceInfo(RPCTInfo.newBuilder(). builder.setTraceInfo(RPCTInfo.newBuilder().
setParentId(s.getSpanId()).setTraceId(s.getTraceId())); setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
} }
builder.setMethodName(call.md.getName()); builder.setMethodName(call.md.getName());
builder.setRequestParam(call.param != null); builder.setRequestParam(call.param != null);
ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells); ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
if (cellBlock != null) { if (cellBlock != null) {
CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
cellBlockBuilder.setLength(cellBlock.limit()); cellBlockBuilder.setLength(cellBlock.limit());
builder.setCellBlockMeta(cellBlockBuilder.build()); builder.setCellBlockMeta(cellBlockBuilder.build());
} }
// Only pass priority if there one. Let zero be same as no priority. // Only pass priority if there one. Let zero be same as no priority.
if (priority != 0) builder.setPriority(priority); if (priority != 0) builder.setPriority(priority);
//noinspection SynchronizeOnNonFinalField RequestHeader header = builder.build();
RequestHeader header = builder.build();
// Now we're going to write the call. We take the lock, then check that the connection
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
// know where we stand, we have to close the connection.
checkIsOpen();
calls.put(call.id, call); // On error, the call will be removed by the timeout.
try {
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
IPCUtil.write(this.out, header, call.param, cellBlock); if (Thread.interrupted()) throw new InterruptedIOException();
checkIsOpen();
try {
IPCUtil.write(this.out, header, call.param, cellBlock);
} catch (IOException e) {
// We set the value inside the synchronized block, this way the next in line
// won't even try to write
shouldCloseConnection.set(true);
throw e;
}
} }
if (LOG.isDebugEnabled()) { } finally {
LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header)); synchronized (this) {
// We added a call, and may start the connection clode. In both cases, we
// need to notify the reader.
notifyAll();
} }
} catch(IOException e) { }
markClosed(e); if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header));
} }
} }
@ -1056,7 +922,6 @@ public class RpcClient {
*/ */
protected void readResponse() { protected void readResponse() {
if (shouldCloseConnection.get()) return; if (shouldCloseConnection.get()) return;
touch();
int totalSize = -1; int totalSize = -1;
try { try {
// See HBaseServer.Call.setResponse for where we write out the response. // See HBaseServer.Call.setResponse for where we write out the response.
@ -1070,7 +935,7 @@ public class RpcClient {
LOG.debug(getName() + ": got response header " + LOG.debug(getName() + ": got response header " +
TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes"); TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes");
} }
Call call = calls.get(id); Call call = calls.remove(id);
if (call == null) { if (call == null) {
// So we got a response for which we have no corresponding 'call' here on the client-side. // So we got a response for which we have no corresponding 'call' here on the client-side.
// We probably timed out waiting, cleaned up all references, and now the server decides // We probably timed out waiting, cleaned up all references, and now the server decides
@ -1110,13 +975,11 @@ public class RpcClient {
// timeout, so check if it still exists before setting the value. // timeout, so check if it still exists before setting the value.
if (call != null) call.setResponse(value, cellBlockScanner); if (call != null) call.setResponse(value, cellBlockScanner);
} }
if (call != null) calls.remove(id);
} catch (IOException e) { } catch (IOException e) {
if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) { if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
// Clean up open calls but don't treat this as a fatal condition, // Clean up open calls but don't treat this as a fatal condition,
// since we expect certain responses to not make it by the specified // since we expect certain responses to not make it by the specified
// {@link ConnectionId#rpcTimeout}. // {@link ConnectionId#rpcTimeout}.
closeException = e;
} else { } else {
// Treat this as a fatal condition and close this connection // Treat this as a fatal condition and close this connection
markClosed(e); markClosed(e);
@ -1152,10 +1015,18 @@ public class RpcClient {
e.getStackTrace(), doNotRetry); e.getStackTrace(), doNotRetry);
} }
protected synchronized void markClosed(IOException e) { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
justification = "on close the reader thread must stop")
protected void markClosed(IOException e) {
if (e == null) throw new NullPointerException();
if (shouldCloseConnection.compareAndSet(false, true)) { if (shouldCloseConnection.compareAndSet(false, true)) {
closeException = e; if (LOG.isDebugEnabled()) {
notifyAll(); LOG.debug(getName() + ": marking at should closed, reason =" + e.getMessage());
}
synchronized (this) {
notifyAll();
}
} }
} }
@ -1165,46 +1036,38 @@ public class RpcClient {
} }
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
justification="Notify because timedout") justification="Notify because timeout")
protected void cleanupCalls(long rpcTimeout) { protected void cleanupCalls(long rpcTimeout) {
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator(); Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
while (itor.hasNext()) { while (itor.hasNext()) {
Call c = itor.next().getValue(); Call c = itor.next().getValue();
long waitTime = System.currentTimeMillis() - c.getStartTime(); long waitTime = System.currentTimeMillis() - c.getStartTime();
if (waitTime >= rpcTimeout) { if (waitTime >= rpcTimeout) {
if (this.closeException == null) { IOException ie = new CallTimeoutException("Call id=" + c.id +
// There may be no exception in the case that there are many calls ", waitTime=" + waitTime + ", rpcTimeout=" + rpcTimeout);
// being multiplexed over this connection and these are succeeding c.setException(ie);
// fine while this Call object is taking a long time to finish
// over on the server; e.g. I just asked the regionserver to bulk
// open 3k regions or its a big fat multiput into a heavily-loaded
// server (Perhaps this only happens at the extremes?)
this.closeException = new CallTimeoutException("Call id=" + c.id +
", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout);
}
c.setException(this.closeException);
synchronized (c) {
c.notifyAll();
}
itor.remove(); itor.remove();
} else { } else {
// This relies on the insertion order to be the call id order. This is not
// true under 'difficult' conditions (gc, ...).
break; break;
} }
} }
try {
if (!calls.isEmpty()) { if (!calls.isEmpty()) {
Call firstCall = calls.get(calls.firstKey()); Call firstCall = calls.get(calls.firstKey());
long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime(); long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime();
if (maxWaitTime < rpcTimeout) { if (maxWaitTime < rpcTimeout) {
rpcTimeout -= maxWaitTime; rpcTimeout -= maxWaitTime;
}
} }
}
try {
if (!shouldCloseConnection.get()) { if (!shouldCloseConnection.get()) {
closeException = null;
setSocketTimeout(socket, (int) rpcTimeout); setSocketTimeout(socket, (int) rpcTimeout);
} }
} catch (SocketException e) { } catch (SocketException e) {
LOG.debug("Couldn't lower timeout, which may result in longer than expected calls"); LOG.warn("Couldn't lower timeout, which may result in longer than expected calls");
} }
} }
} }
@ -1247,13 +1110,13 @@ public class RpcClient {
* @param localAddr client socket bind address * @param localAddr client socket bind address
*/ */
RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr) { RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr) {
this.maxIdleTime = conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s this.minIdleTimeBeforeClose =
conf.getInt("hbase.ipc.client.connection.minIdleTimeBeforeClose", 120000); // 2 minutes
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE); HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
this.pingInterval = getPingInterval(conf);
this.ipcUtil = new IPCUtil(conf); this.ipcUtil = new IPCUtil(conf);
this.conf = conf; this.conf = conf;
this.codec = getCodec(); this.codec = getCodec();
@ -1271,10 +1134,9 @@ public class RpcClient {
LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
", tcpKeepAlive=" + this.tcpKeepAlive + ", tcpKeepAlive=" + this.tcpKeepAlive +
", tcpNoDelay=" + this.tcpNoDelay + ", tcpNoDelay=" + this.tcpNoDelay +
", maxIdleTime=" + this.maxIdleTime + ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
", maxRetries=" + this.maxRetries + ", maxRetries=" + this.maxRetries +
", fallbackAllowed=" + this.fallbackAllowed + ", fallbackAllowed=" + this.fallbackAllowed +
", ping interval=" + this.pingInterval + "ms" +
", bind address=" + (this.localAddr != null ? this.localAddr : "null")); ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
} }
} }
@ -1393,7 +1255,11 @@ public class RpcClient {
while (!connections.isEmpty()) { while (!connections.isEmpty()) {
try { try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException ignored) { } catch (InterruptedException e) {
LOG.info("Interrupted while stopping the client. We still have " + connections.size() +
" connections.");
Thread.currentThread().interrupt();
return;
} }
} }
} }
@ -1429,14 +1295,12 @@ public class RpcClient {
Call call = new Call(md, param, cells, returnType); Call call = new Call(md, param, cells, returnType);
Connection connection = Connection connection =
getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor); getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
connection.writeRequest(call, priority); // send the parameter
connection.writeRequest(call, priority);
//noinspection SynchronizationOnLocalVariableOrMethodParameter //noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (call) { synchronized (call) {
while (!call.done) { while (!call.done) {
if (connection.shouldCloseConnection.get()) {
throw new IOException("Unexpected closed connection");
}
call.wait(1000); // wait for the result call.wait(1000); // wait for the result
} }
@ -1452,6 +1316,8 @@ public class RpcClient {
} }
} }
/** /**
* Take an IOException and the address we were trying to connect to * Take an IOException and the address we were trying to connect to
* and return an IOException with the input exception as the cause. * and return an IOException with the input exception as the cause.
@ -1484,7 +1350,7 @@ public class RpcClient {
* is known as actually dead. This will not prevent current operation to be retried, and, * is known as actually dead. This will not prevent current operation to be retried, and,
* depending on their own behavior, they may retry on the same server. This can be a feature, * depending on their own behavior, they may retry on the same server. This can be a feature,
* for example at startup. In any case, they're likely to get connection refused (if the * for example at startup. In any case, they're likely to get connection refused (if the
* process died) or no route to host: i.e. there next retries should be faster and with a * process died) or no route to host: i.e. their next retries should be faster and with a
* safe exception. * safe exception.
*/ */
public void cancelConnections(String hostname, int port, IOException ioe) { public void cancelConnections(String hostname, int port, IOException ioe) {
@ -1503,11 +1369,13 @@ public class RpcClient {
} }
} }
/* Get a connection from the pool, or create a new one and add it to the /**
* pool. Connections to a given host/port are reused. */ * Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given host/port are reused.
*/
protected Connection getConnection(User ticket, Call call, InetSocketAddress addr, protected Connection getConnection(User ticket, Call call, InetSocketAddress addr,
int rpcTimeout, final Codec codec, final CompressionCodec compressor) int rpcTimeout, final Codec codec, final CompressionCodec compressor)
throws IOException, InterruptedException { throws IOException {
if (!running.get()) throw new StoppedRpcClientException(); if (!running.get()) throw new StoppedRpcClientException();
Connection connection; Connection connection;
ConnectionId remoteId = ConnectionId remoteId =
@ -1519,7 +1387,6 @@ public class RpcClient {
connections.put(remoteId, connection); connections.put(remoteId, connection);
} }
} }
connection.addCall(call);
//we don't invoke the method below inside "synchronized (connections)" //we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow, //block above. The reason for that is if the server happens to be slow,
@ -1529,6 +1396,7 @@ public class RpcClient {
// waiting here; as setupIOstreams is synchronized. If the connection fails with a // waiting here; as setupIOstreams is synchronized. If the connection fails with a
// timeout, they will all fail simultaneously. This is checked in setupIOstreams. // timeout, they will all fail simultaneously. This is checked in setupIOstreams.
connection.setupIOstreams(); connection.setupIOstreams();
return connection; return connection;
} }
@ -1644,7 +1512,7 @@ public class RpcClient {
// Clear it here so we don't by mistake try and these cells processing results. // Clear it here so we don't by mistake try and these cells processing results.
pcrc.setCellScanner(null); pcrc.setCellScanner(null);
} }
Pair<Message, CellScanner> val = null; Pair<Message, CellScanner> val;
try { try {
val = call(md, param, cells, returnType, ticket, isa, rpcTimeout, val = call(md, param, cells, returnType, ticket, isa, rpcTimeout,
pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS); pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);

View File

@ -1552,7 +1552,6 @@ public class TestAdmin {
TEST_UTIL.getConfiguration().setInt( TEST_UTIL.getConfiguration().setInt(
"hbase.regionserver.logroll.errors.tolerated", 2); "hbase.regionserver.logroll.errors.tolerated", 2);
TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000); TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000); TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);

View File

@ -60,7 +60,6 @@ public class TestLogRollAbort {
// Tweak default timeout values down for faster recovery // Tweak default timeout values down for faster recovery
TEST_UTIL.getConfiguration().setInt( TEST_UTIL.getConfiguration().setInt(
"hbase.regionserver.logroll.errors.tolerated", 2); "hbase.regionserver.logroll.errors.tolerated", 2);
TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000); TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000); TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);

View File

@ -124,7 +124,6 @@ public class TestLogRolling {
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2);
TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000); TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000); TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);