HBASE-13011 TestLoadIncrementalHFiles is flakey when using AsyncRpcClient as client implementation

Added comment to AsyncRpcChannel data members

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
zhangduo 2015-02-14 08:36:38 +08:00 committed by stack
parent 6d72a993ee
commit e99091e97c
4 changed files with 180 additions and 204 deletions

View File

@ -72,7 +72,7 @@ public class AsyncCall extends DefaultPromise<Message> {
this.responseDefaultType = responseDefaultType;
this.startTime = EnvironmentEdgeManager.currentTime();
this.rpcTimeout = controller.getCallTimeout();
this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0;
}
/**
@ -84,9 +84,10 @@ public class AsyncCall extends DefaultPromise<Message> {
return this.startTime;
}
@Override public String toString() {
return "callId: " + this.id + " methodName: " + this.method.getName() + " param {" +
(this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
@Override
public String toString() {
return "callId: " + this.id + " methodName: " + this.method.getName() + " param {"
+ (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
}
/**

View File

@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
@ -31,6 +28,23 @@ import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.security.sasl.SaslException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
@ -56,18 +70,9 @@ import org.apache.hadoop.security.token.TokenSelector;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import javax.security.sasl.SaslException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
/**
* Netty RPC channel
@ -97,8 +102,6 @@ public class AsyncRpcChannel {
final String serviceName;
final InetSocketAddress address;
ConcurrentSkipListMap<Integer, AsyncCall> calls = new ConcurrentSkipListMap<>();
private int ioFailureCounter = 0;
private int connectFailureCounter = 0;
@ -108,15 +111,18 @@ public class AsyncRpcChannel {
private Token<? extends TokenIdentifier> token;
private String serverPrincipal;
volatile boolean shouldCloseConnection = false;
private IOException closeException;
// NOTE: closed and connected flags below are only changed when a lock on pendingCalls
private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
private boolean connected = false;
private boolean closed = false;
private Timeout cleanupTimer;
private final TimerTask timeoutTask = new TimerTask() {
@Override public void run(Timeout timeout) throws Exception {
cleanupTimer = null;
cleanupCalls(false);
@Override
public void run(Timeout timeout) throws Exception {
cleanupCalls();
}
};
@ -213,15 +219,20 @@ public class AsyncRpcChannel {
ch.pipeline()
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
ch.pipeline().addLast(new AsyncServerResponseHandler(this));
try {
writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
@Override public void operationComplete(ChannelFuture future) throws Exception {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
close(future.cause());
return;
}
for (AsyncCall call : calls.values()) {
List<AsyncCall> callsToWrite;
synchronized (pendingCalls) {
connected = true;
callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
}
for (AsyncCall call : callsToWrite) {
writeRequest(call);
}
}
@ -240,17 +251,18 @@ public class AsyncRpcChannel {
*/
private SaslClientHandler getSaslHandler(final Bootstrap bootstrap) throws IOException {
return new SaslClientHandler(authMethod, token, serverPrincipal, client.fallbackAllowed,
client.conf.get("hbase.rpc.protection",
SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
new SaslClientHandler.SaslExceptionHandler() {
@Override public void handle(int retryCount, Random random, Throwable cause) {
client.conf.get("hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name()
.toLowerCase()), new SaslClientHandler.SaslExceptionHandler() {
@Override
public void handle(int retryCount, Random random, Throwable cause) {
try {
// Handle Sasl failure. Try to potentially get new credentials
handleSaslConnectionFailure(retryCount, cause, ticket.getUGI());
// Try to reconnect
AsyncRpcClient.WHEEL_TIMER.newTimeout(new TimerTask() {
@Override public void run(Timeout timeout) throws Exception {
@Override
public void run(Timeout timeout) throws Exception {
connect(bootstrap);
}
}, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
@ -259,7 +271,8 @@ public class AsyncRpcChannel {
}
}
}, new SaslClientHandler.SaslSuccessfulConnectHandler() {
@Override public void onSuccess(Channel channel) {
@Override
public void onSuccess(Channel channel) {
startHBaseConnection(channel);
}
});
@ -295,66 +308,50 @@ public class AsyncRpcChannel {
public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
final PayloadCarryingRpcController controller, final Message request,
final Message responsePrototype) {
if (shouldCloseConnection) {
Promise<Message> promise = channel.eventLoop().newPromise();
promise.setFailure(new ConnectException());
return promise;
}
final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(),
method, request, controller, responsePrototype);
final AsyncCall call =
new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request,
controller, responsePrototype);
controller.notifyOnCancel(new RpcCallback<Object>() {
@Override
public void run(Object parameter) {
calls.remove(call.id);
// TODO: do not need to call AsyncCall.setFailed?
synchronized (pendingCalls) {
pendingCalls.remove(call.id);
}
}
});
// TODO: this should be handled by PayloadCarryingRpcController.
if (controller.isCanceled()) {
// To finish if the call was cancelled before we set the notification (race condition)
call.cancel(true);
return call;
}
calls.put(call.id, call);
// check again, see https://issues.apache.org/jira/browse/HBASE-12951
if (shouldCloseConnection) {
synchronized (pendingCalls) {
if (closed) {
Promise<Message> promise = channel.eventLoop().newPromise();
promise.setFailure(new ConnectException());
return promise;
}
pendingCalls.put(call.id, call);
// Add timeout for cleanup if none is present
if (cleanupTimer == null) {
cleanupTimer = AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, call.getRpcTimeout(),
if (cleanupTimer == null && call.getRpcTimeout() > 0) {
cleanupTimer =
AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, call.getRpcTimeout(),
TimeUnit.MILLISECONDS);
}
if(channel.isActive()) {
writeRequest(call);
if (!connected) {
return call;
}
}
writeRequest(call);
return call;
}
/**
* Calls method and returns a promise
* @param method to call
* @param controller to run call with
* @param request to send
* @param responsePrototype for response message
* @return Promise to listen to result
* @throws java.net.ConnectException on connection failures
*/
public Promise<Message> callMethodWithPromise(
final Descriptors.MethodDescriptor method, final PayloadCarryingRpcController controller,
final Message request, final Message responsePrototype) throws ConnectException {
if (shouldCloseConnection || !channel.isOpen()) {
throw new ConnectException();
AsyncCall removePendingCall(int id) {
synchronized (pendingCalls) {
return pendingCalls.remove(id);
}
return this.callMethod(method, controller, request, responsePrototype);
}
/**
@ -400,10 +397,6 @@ public class AsyncRpcChannel {
*/
private void writeRequest(final AsyncCall call) {
try {
if (shouldCloseConnection) {
return;
}
final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
.newBuilder();
requestHeaderBuilder.setCallId(call.id)
@ -439,24 +432,11 @@ public class AsyncRpcChannel {
IPCUtil.write(out, rh, call.param, cellBlock);
}
channel.writeAndFlush(b).addListener(new CallWriteListener(this,call));
channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
} catch (IOException e) {
if (!shouldCloseConnection) {
close(e);
}
}
}
/**
* Fail a call
*
* @param call to fail
* @param cause of fail
*/
void failCall(AsyncCall call, IOException cause) {
calls.remove(call.id);
call.setFailed(cause);
}
/**
* Set up server authorization
@ -550,18 +530,22 @@ public class AsyncRpcChannel {
* @param e exception on close
*/
public void close(final Throwable e) {
client.removeConnection(ConnectionId.hashCode(ticket,serviceName,address));
client.removeConnection(this);
// Move closing from the requesting thread to the channel thread
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (shouldCloseConnection) {
List<AsyncCall> toCleanup;
synchronized (pendingCalls) {
if (closed) {
return;
}
shouldCloseConnection = true;
closed = true;
toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
pendingCalls.clear();
}
IOException closeException = null;
if (e != null) {
if (e instanceof IOException) {
closeException = (IOException) e;
@ -569,16 +553,19 @@ public class AsyncRpcChannel {
closeException = new IOException(e);
}
}
// log the info
if (LOG.isDebugEnabled() && closeException != null) {
LOG.debug(name + ": closing ipc connection to " + address + ": " +
closeException.getMessage());
LOG.debug(name + ": closing ipc connection to " + address, closeException);
}
if (cleanupTimer != null) {
cleanupTimer.cancel();
cleanupTimer = null;
}
for (AsyncCall call : toCleanup) {
call.setFailed(closeException != null ? closeException : new ConnectionClosingException(
"Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
}
cleanupCalls(true);
channel.disconnect().addListener(ChannelFutureListener.CLOSE);
if (LOG.isDebugEnabled()) {
LOG.debug(name + ": closed");
}
@ -591,63 +578,36 @@ public class AsyncRpcChannel {
*
* @param cleanAll true if all calls should be cleaned, false for only the timed out calls
*/
public void cleanupCalls(boolean cleanAll) {
// Cancel outstanding timers
if (cleanupTimer != null) {
cleanupTimer.cancel();
private void cleanupCalls() {
List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
long currentTime = EnvironmentEdgeManager.currentTime();
long nextCleanupTaskDelay = -1L;
synchronized (pendingCalls) {
for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
AsyncCall call = iter.next();
long timeout = call.getRpcTimeout();
if (timeout > 0) {
if (currentTime - call.getStartTime() >= timeout) {
iter.remove();
toCleanup.add(call);
} else {
if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
nextCleanupTaskDelay = timeout;
}
}
}
}
if (nextCleanupTaskDelay > 0) {
cleanupTimer =
AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, nextCleanupTaskDelay,
TimeUnit.MILLISECONDS);
} else {
cleanupTimer = null;
}
if (cleanAll) {
for (AsyncCall call : calls.values()) {
synchronized (call) {
// Calls can be done on another thread so check before failing them
if(!call.isDone()) {
if (closeException == null) {
failCall(call, new ConnectionClosingException("Call id=" + call.id +
" on server " + address + " aborted: connection is closing"));
} else {
failCall(call, closeException);
}
}
}
}
} else {
for (AsyncCall call : calls.values()) {
long waitTime = EnvironmentEdgeManager.currentTime() - call.getStartTime();
long timeout = call.getRpcTimeout();
if (timeout > 0 && waitTime >= timeout) {
synchronized (call) {
// Calls can be done on another thread so check before failing them
if (!call.isDone()) {
closeException = new CallTimeoutException("Call id=" + call.id +
", waitTime=" + waitTime + ", rpcTimeout=" + timeout);
failCall(call, closeException);
}
}
} else {
// We expect the call to be ordered by timeout. It may not be the case, but stopping
// at the first valid call allows to be sure that we still have something to do without
// spending too much time by reading the full list.
break;
}
}
if (!calls.isEmpty()) {
AsyncCall firstCall = calls.firstEntry().getValue();
final long newTimeout;
long maxWaitTime = EnvironmentEdgeManager.currentTime() - firstCall.getStartTime();
if (maxWaitTime < firstCall.getRpcTimeout()) {
newTimeout = firstCall.getRpcTimeout() - maxWaitTime;
} else {
newTimeout = 0;
}
closeException = null;
cleanupTimer = AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask,
newTimeout, TimeUnit.MILLISECONDS);
}
for (AsyncCall call : toCleanup) {
call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
+ (currentTime - call.getRpcTimeout()) + ", rpcTimeout=" + call.getRpcTimeout()));
}
}
@ -745,6 +705,10 @@ public class AsyncRpcChannel {
});
}
public int getConnectionHashCode() {
return ConnectionId.hashCode(ticket, serviceName, address);
}
@Override
public String toString() {
return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
@ -755,20 +719,22 @@ public class AsyncRpcChannel {
*/
private static final class CallWriteListener implements ChannelFutureListener {
private final AsyncRpcChannel rpcChannel;
private final AsyncCall call;
private final int id;
public CallWriteListener(AsyncRpcChannel asyncRpcChannel, AsyncCall call) {
public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) {
this.rpcChannel = asyncRpcChannel;
this.call = call;
this.id = id;
}
@Override public void operationComplete(ChannelFuture future) throws Exception {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
if(!this.call.isDone()) {
AsyncCall call = rpcChannel.removePendingCall(id);
if (call != null) {
if (future.cause() instanceof IOException) {
rpcChannel.failCall(call, (IOException) future.cause());
call.setFailed((IOException) future.cause());
} else {
rpcChannel.failCall(call, new IOException(future.cause()));
call.setFailed(new IOException(future.cause()));
}
}
}

View File

@ -17,12 +17,6 @@
*/
package org.apache.hadoop.hbase.ipc;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
@ -38,6 +32,16 @@ import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
@ -49,13 +53,12 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.Threads;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
/**
* Netty client for the requests and responses
@ -169,16 +172,16 @@ public class AsyncRpcClient extends AbstractRpcClient {
* @throws InterruptedException if call is interrupted
* @throws java.io.IOException if a connection failure is encountered
*/
@Override protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
@Override
protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
InetSocketAddress addr) throws IOException, InterruptedException {
final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
Promise<Message> promise = connection.callMethodWithPromise(md, pcrc, param, returnType);
Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType);
long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
try {
Message response = promise.get();
Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
return new Pair<>(response, pcrc.cellScanner());
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
@ -186,6 +189,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
} else {
throw new IOException(e.getCause());
}
} catch (TimeoutException e) {
throw new CallTimeoutException(promise.toString());
}
}
@ -337,12 +342,20 @@ public class AsyncRpcClient extends AbstractRpcClient {
/**
* Remove connection from pool
*
* @param connectionHashCode of connection
*/
public void removeConnection(int connectionHashCode) {
public void removeConnection(AsyncRpcChannel connection) {
int connectionHashCode = connection.getConnectionHashCode();
synchronized (connections) {
// we use address as cache key, so we should check here to prevent removing the
// wrong connection
AsyncRpcChannel connectionInPool = this.connections.get(connectionHashCode);
if (connectionInPool == connection) {
this.connections.remove(connectionHashCode);
} else if (LOG.isDebugEnabled()) {
LOG.debug(String.format("%s already removed, expected instance %08x, actual %08x",
connection.toString(), System.identityHashCode(connection),
System.identityHashCode(connectionInPool)));
}
}
}

View File

@ -17,11 +17,13 @@
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner;
@ -29,7 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.ipc.RemoteException;
import java.io.IOException;
import com.google.protobuf.Message;
/**
* Handles Hbase responses
@ -52,16 +54,12 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf inBuffer = (ByteBuf) msg;
ByteBufInputStream in = new ByteBufInputStream(inBuffer);
if (channel.shouldCloseConnection) {
return;
}
int totalSize = inBuffer.readableBytes();
try {
// Read the header
RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in);
int id = responseHeader.getCallId();
AsyncCall call = channel.calls.get(id);
AsyncCall call = channel.removePendingCall(id);
if (call == null) {
// So we got a response for which we have no corresponding 'call' here on the client-side.
// We probably timed out waiting, cleaned up all references, and now the server decides
@ -85,7 +83,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
equals(FatalConnectionException.class.getName())) {
channel.close(re);
} else {
channel.failCall(call, re);
call.setFailed(re);
}
} else {
Message value = null;
@ -104,13 +102,11 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
}
call.setSuccess(value, cellBlockScanner);
}
channel.calls.remove(id);
} catch (IOException e) {
// Treat this as a fatal condition and close this connection
channel.close(e);
} finally {
inBuffer.release();
channel.cleanupCalls(false);
}
}