From d2ddf8cd6a1a1de81b487af2062def75eb64eeda Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 3 Aug 2020 11:39:11 +0100 Subject: [PATCH] Improve deserialization failure logging (#60577) Today when a node fails to properly deserialize a transport message with a parent task we log the following relatively uninformative message: java.lang.IllegalStateException: Message not fully read (response) for requestId [9999], handler [org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler/org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler/org.elasticsearch.transport.TransportService$6@abcdefgh], error [false]; resetting In particular, the wrapping of the listener in the `TransportService` obscures all clues as to the source of the problem, e.g. the action name or the identity of the underlying listener. This commit exposes the inner listener to the logs. Also if the listener is wrapped with `ContextPreservingActionListener` then its identity is similarly hidden. This commit also exposes the wrapped listener in this case. Relates #38939 --- .../ContextPreservingActionListener.java | 5 + .../transport/InboundHandler.java | 5 +- .../transport/TransportService.java | 23 ++- .../ContextPreservingActionListenerTests.java | 29 +++ ...ortServiceDeserializationFailureTests.java | 168 ++++++++++++++++++ 5 files changed, 220 insertions(+), 10 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java diff --git a/server/src/main/java/org/elasticsearch/action/support/ContextPreservingActionListener.java b/server/src/main/java/org/elasticsearch/action/support/ContextPreservingActionListener.java index 72f1e7c1d66..b3ee1a8197d 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ContextPreservingActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/ContextPreservingActionListener.java @@ -51,6 +51,11 @@ public final class ContextPreservingActionListener implements ActionListener< } } + @Override + public String toString() { + return getClass().getName() + "/" + delegate.toString(); + } + /** * Wraps the provided action listener in a {@link ContextPreservingActionListener} that will * also copy the response headers when the {@link ThreadContext.StoredContext} is closed diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index a071a26e391..045f016409f 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -199,7 +199,7 @@ public class InboundHandler { response.remoteAddress(new TransportAddress(remoteAddress)); } catch (Exception e) { handleException(handler, new TransportSerializationException( - "Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e)); + "Failed to deserialize response from handler [" + handler + "]", e)); return; } threadPool.executor(handler.executor()).execute(new AbstractRunnable() { @@ -220,7 +220,8 @@ public class InboundHandler { try { error = stream.readException(); } catch (Exception e) { - error = new TransportSerializationException("Failed to deserialize exception response from stream", e); + error = new TransportSerializationException( + "Failed to deserialize exception response from stream for handler [" + handler + "]", e); } handleException(handler, error); } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index d73337c13ef..97d93ebaaeb 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -619,37 +619,44 @@ public class TransportService extends AbstractLifecycleComponent implements Repo public final void sendRequest(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, - TransportResponseHandler handler) { + final TransportResponseHandler handler) { try { + final TransportResponseHandler delegate; if (request.getParentTask().isSet()) { // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. final Releasable unregisterChildNode = taskManager.registerChildNode(request.getParentTask().getId(), connection.getNode()); - final TransportResponseHandler delegate = handler; - handler = new TransportResponseHandler() { + delegate = new TransportResponseHandler() { @Override public void handleResponse(T response) { unregisterChildNode.close(); - delegate.handleResponse(response); + handler.handleResponse(response); } @Override public void handleException(TransportException exp) { unregisterChildNode.close(); - delegate.handleException(exp); + handler.handleException(exp); } @Override public String executor() { - return delegate.executor(); + return handler.executor(); } @Override public T read(StreamInput in) throws IOException { - return delegate.read(in); + return handler.read(in); + } + + @Override + public String toString() { + return getClass().getName() + "/[" + action + "]:" + handler.toString(); } }; + } else { + delegate = handler; } - asyncSender.sendRequest(connection, action, request, options, handler); + asyncSender.sendRequest(connection, action, request, options, delegate); } catch (final Exception ex) { // the caller might not handle this so we invoke the handler final TransportException te; diff --git a/server/src/test/java/org/elasticsearch/action/support/ContextPreservingActionListenerTests.java b/server/src/test/java/org/elasticsearch/action/support/ContextPreservingActionListenerTests.java index f0277c65c16..e821f6c6463 100644 --- a/server/src/test/java/org/elasticsearch/action/support/ContextPreservingActionListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/ContextPreservingActionListenerTests.java @@ -25,6 +25,9 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; + public class ContextPreservingActionListenerTests extends ESTestCase { public void testOriginalContextIsPreservedAfterOnResponse() throws IOException { @@ -150,4 +153,30 @@ public class ContextPreservingActionListenerTests extends ESTestCase { assertNull(threadContext.getHeader("foo")); assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); } + + public void testToStringIncludesDelegate() { + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final ContextPreservingActionListener actionListener; + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + final ActionListener delegate = new ActionListener() { + @Override + public void onResponse(Void aVoid) { + } + + @Override + public void onFailure(Exception e) { + } + + @Override + public String toString() { + return "test delegate"; + } + }; + + actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext); + } + + assertThat(actionListener.toString(), allOf(containsString("test delegate"), containsString("ContextPreservingActionListener"))); + } + } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java new file mode 100644 index 00000000000..be862ef45f5 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java @@ -0,0 +1,168 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskAwareRequest; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransport; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.hasToString; + +public class TransportServiceDeserializationFailureTests extends ESTestCase { + + public void testDeserializationFailureLogIdentifiesListener() { + final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT); + + final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "local").build(); + + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); + + final String testActionName = "internal:test-action"; + + final MockTransport transport = new MockTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + if (action.equals(TransportService.HANDSHAKE_ACTION_NAME)) { + handleResponse(requestId, new TransportService.HandshakeResponse(otherNode, new ClusterName(""), Version.CURRENT)); + } + } + }; + final TransportService transportService = transport.createTransportService(Settings.EMPTY, + deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, ignored -> localNode, null, + Collections.emptySet()); + + transportService.registerRequestHandler(testActionName, ThreadPool.Names.SAME, TransportRequest.Empty::new, + (request, channel, task) -> channel.sendResponse(TransportResponse.Empty.INSTANCE)); + + transportService.start(); + transportService.acceptIncomingRequests(); + + final PlainActionFuture connectionFuture = new PlainActionFuture<>(); + transportService.connectToNode(otherNode, connectionFuture); + assertTrue(connectionFuture.isDone()); + + { + // requests without a parent task are recorded directly in the response context + + transportService.sendRequest(otherNode, testActionName, TransportRequest.Empty.INSTANCE, + TransportRequestOptions.EMPTY, new TransportResponseHandler() { + @Override + public void handleResponse(TransportResponse.Empty response) { + fail("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + fail("should not be called"); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public TransportResponse.Empty read(StreamInput in) { + throw new AssertionError("should not be called"); + } + + @Override + public String toString() { + return "test handler without parent"; + } + }); + + final List> responseContexts + = transport.getResponseHandlers().prune(ignored -> true); + assertThat(responseContexts, hasSize(1)); + final TransportResponseHandler handler = responseContexts.get(0).handler(); + assertThat(handler, hasToString(containsString("test handler without parent"))); + } + + { + // requests with a parent task get wrapped up by the transport service, including the action name + + final Task parentTask = transportService.getTaskManager().register("test", "test-action", new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) { + fail("should not be called"); + } + + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; + } + }); + + transportService.sendChildRequest(otherNode, testActionName, TransportRequest.Empty.INSTANCE, parentTask, + TransportRequestOptions.EMPTY, new TransportResponseHandler() { + @Override + public void handleResponse(TransportResponse.Empty response) { + fail("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + fail("should not be called"); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public TransportResponse.Empty read(StreamInput in) { + throw new AssertionError("should not be called"); + } + + @Override + public String toString() { + return "test handler with parent"; + } + }); + + final List> responseContexts + = transport.getResponseHandlers().prune(ignored -> true); + assertThat(responseContexts, hasSize(1)); + final TransportResponseHandler handler = responseContexts.get(0).handler(); + assertThat(handler, hasToString(allOf(containsString("test handler with parent"), containsString(testActionName)))); + } + } + +}