Transport: A failure to handle a response might cause the transport to stop working, closes #170.

This commit is contained in:
kimchy 2010-05-12 21:59:58 +03:00
parent ece1395b57
commit 4d6f2d56f0
4 changed files with 145 additions and 21 deletions

View File

@ -35,6 +35,8 @@ import org.elasticsearch.util.timer.TimerTask;
import org.elasticsearch.util.transport.BoundTransportAddress;
import org.elasticsearch.util.transport.TransportAddress;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@ -62,6 +64,14 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
final CopyOnWriteArrayList<TransportConnectionListener> connectionListeners = new CopyOnWriteArrayList<TransportConnectionListener>();
// An LRU (don't really care about concurrency here) that holds the latest timed out requests so if they
// do show up, we can print more descriptive information about them
final Map<Long, TimeoutInfoHolder> timeoutInfoHandlers = Collections.synchronizedMap(new LinkedHashMap<Long, TimeoutInfoHolder>(100, .75F, true) {
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > 100;
}
});
private boolean throwConnectException = false;
public TransportService(Transport transport, ThreadPool threadPool, TimerService timerService) {
@ -208,6 +218,13 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
@Override public TransportResponseHandler remove(long requestId) {
RequestHolder holder = clientHandlers.remove(requestId);
if (holder == null) {
// lets see if its in the timeout holder
TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId);
if (timeoutInfoHolder != null) {
logger.warn("Transport response handler timed out, action [{}], node [{}]", timeoutInfoHolder.action(), timeoutInfoHolder.node());
} else {
logger.warn("Transport response handler not found of id [{}]", requestId);
}
return null;
}
if (holder.timeout() != null) {
@ -253,11 +270,34 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
RequestHolder holder = clientHandlers.remove(requestId);
if (holder != null) {
// add it to the timeout information holder, in case we are going to get a response later
timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(holder.node(), holder.action()));
holder.handler().handleException(new ReceiveTimeoutTransportException(holder.node(), holder.action()));
}
}
}
static class TimeoutInfoHolder {
private final DiscoveryNode node;
private final String action;
TimeoutInfoHolder(DiscoveryNode node, String action) {
this.node = node;
this.action = action;
}
public DiscoveryNode node() {
return node;
}
public String action() {
return action;
}
}
static class RequestHolder<T extends Streamable> {
private final TransportResponseHandler<T> handler;

View File

@ -176,13 +176,13 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
handleRequest(stream, requestId, sourceTransport);
} else {
final TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
if (handler == null) {
throw new ResponseHandlerNotFoundTransportException(requestId);
}
if (Transport.Helper.isError(status)) {
handlerResponseError(stream, handler);
} else {
handleResponse(stream, handler);
// ignore if its null, the adapter logs it
if (handler != null) {
if (Transport.Helper.isError(status)) {
handlerResponseError(stream, handler);
} else {
handleResponse(stream, handler);
}
}
}
} catch (Exception e) {

View File

@ -58,6 +58,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
StreamInput streamIn = new ChannelBufferStreamInput(buffer);
streamIn = HandlesStreamInput.Cached.cached(streamIn);
@ -66,25 +67,29 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
boolean isRequest = isRequest(status);
if (isRequest) {
handleRequest(event, streamIn, requestId);
handleRequest(buffer, event, streamIn, requestId);
} else {
final TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
if (handler == null) {
throw new ResponseHandlerNotFoundTransportException(requestId);
}
if (isError(status)) {
handlerResponseError(streamIn, handler);
// ignore if its null, the adapter logs it
if (handler != null) {
if (isError(status)) {
handlerResponseError(buffer, streamIn, handler);
} else {
handleResponse(buffer, streamIn, handler);
}
} else {
handleResponse(streamIn, handler);
// if its null, skip those bytes (remove 8 for the request id, and 1 for the status)
buffer.skipBytes(buffer.readableBytes());
}
}
}
private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) {
private void handleResponse(ChannelBuffer channelBuffer, StreamInput buffer, final TransportResponseHandler handler) {
final Streamable streamable = handler.newInstance();
try {
streamable.readFrom(buffer);
} catch (Exception e) {
channelBuffer.skipBytes(channelBuffer.readableBytes());
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + streamable.getClass().getName() + "]", e));
return;
}
@ -108,12 +113,13 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
}
}
private void handlerResponseError(StreamInput buffer, final TransportResponseHandler handler) {
private void handlerResponseError(ChannelBuffer channelBuffer, StreamInput buffer, final TransportResponseHandler handler) {
Throwable error;
try {
ThrowableObjectInputStream ois = new ThrowableObjectInputStream(buffer);
error = (Throwable) ois.readObject();
} catch (Exception e) {
channelBuffer.skipBytes(channelBuffer.readableBytes());
error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
}
handleException(handler, error);
@ -139,7 +145,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
}
}
private void handleRequest(MessageEvent event, StreamInput buffer, long requestId) throws IOException {
private void handleRequest(ChannelBuffer channelBuffer, MessageEvent event, StreamInput buffer, long requestId) throws IOException {
final String action = buffer.readUTF();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, event.getChannel(), requestId);
@ -170,6 +176,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
handler.messageReceived(streamable, transportChannel);
}
} catch (Exception e) {
channelBuffer.skipBytes(channelBuffer.readableBytes());
try {
transportChannel.sendResponse(e);
} catch (IOException e1) {

View File

@ -173,8 +173,8 @@ public abstract class AbstractSimpleTransportTests {
assertThat(latch.await(5, TimeUnit.SECONDS), equalTo(true));
}
@Test public void testTimeoutSendException() throws Exception {
serviceA.registerHandler("sayHelloTimeout", new BaseTransportRequestHandler<StringMessage>() {
@Test public void testTimeoutSendExceptionWithNeverSendingBackResponse() throws Exception {
serviceA.registerHandler("sayHelloTimeoutNoResponse", new BaseTransportRequestHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@ -192,7 +192,7 @@ public abstract class AbstractSimpleTransportTests {
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHelloTimeout",
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutNoResponse",
new StringMessage("moshe"), TimeValue.timeValueMillis(100), new BaseTransportResponseHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
@ -214,7 +214,84 @@ public abstract class AbstractSimpleTransportTests {
assertThat(e, instanceOf(ReceiveTimeoutTransportException.class));
}
serviceA.removeHandler("sayHelloTimeout");
serviceA.removeHandler("sayHelloTimeoutNoResponse");
System.out.println("after ...");
}
@Test public void testTimeoutSendExceptionWithDelayedResponse() throws Exception {
serviceA.registerHandler("sayHelloTimeoutDelayedResponse", new BaseTransportRequestHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) {
System.out.println("got message: " + request.message);
TimeValue sleep = TimeValue.parseTimeValue(request.message, null);
try {
Thread.sleep(sleep.millis());
} catch (InterruptedException e) {
// ignore
}
try {
channel.sendResponse(new StringMessage("hello " + request.message));
} catch (IOException e) {
e.printStackTrace();
assertThat(e.getMessage(), false, equalTo(true));
}
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse",
new StringMessage("300ms"), TimeValue.timeValueMillis(100), new BaseTransportResponseHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void handleResponse(StringMessage response) {
assertThat("got response instead of exception", false, equalTo(true));
}
@Override public void handleException(RemoteTransportException exp) {
assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class));
}
});
try {
StringMessage message = res.txGet();
assertThat("exception should be thrown", false, equalTo(true));
} catch (Exception e) {
assertThat(e, instanceOf(ReceiveTimeoutTransportException.class));
}
// sleep for 400 millis to make sure we get back the response
Thread.sleep(400);
for (int i = 0; i < 10; i++) {
final int counter = i;
// now, try and send another request, this times, with a short timeout
res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse",
new StringMessage(counter + "ms"), TimeValue.timeValueMillis(100), new BaseTransportResponseHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void handleResponse(StringMessage response) {
System.out.println("got response: " + response.message);
assertThat("hello " + counter + "ms", equalTo(response.message));
}
@Override public void handleException(RemoteTransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response for " + counter + ": " + exp.getDetailedMessage(), false, equalTo(true));
}
});
StringMessage message = res.txGet();
assertThat(message.message, equalTo("hello " + counter + "ms"));
}
serviceA.removeHandler("sayHelloTimeoutDelayedResponse");
System.out.println("after ...");
}