Improve deserialization failure logging (#60577)
Today when a node fails to properly deserialize a transport message with a parent task we log the following relatively uninformative message: java.lang.IllegalStateException: Message not fully read (response) for requestId [9999], handler [org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler/org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler/org.elasticsearch.transport.TransportService$6@abcdefgh], error [false]; resetting In particular, the wrapping of the listener in the `TransportService` obscures all clues as to the source of the problem, e.g. the action name or the identity of the underlying listener. This commit exposes the inner listener to the logs. Also if the listener is wrapped with `ContextPreservingActionListener` then its identity is similarly hidden. This commit also exposes the wrapped listener in this case. Relates #38939
This commit is contained in:
parent
a76fc324d4
commit
d2ddf8cd6a
|
@ -51,6 +51,11 @@ public final class ContextPreservingActionListener<R> implements ActionListener<
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getName() + "/" + delegate.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps the provided action listener in a {@link ContextPreservingActionListener} that will
|
||||
* also copy the response headers when the {@link ThreadContext.StoredContext} is closed
|
||||
|
|
|
@ -199,7 +199,7 @@ public class InboundHandler {
|
|||
response.remoteAddress(new TransportAddress(remoteAddress));
|
||||
} catch (Exception e) {
|
||||
handleException(handler, new TransportSerializationException(
|
||||
"Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e));
|
||||
"Failed to deserialize response from handler [" + handler + "]", e));
|
||||
return;
|
||||
}
|
||||
threadPool.executor(handler.executor()).execute(new AbstractRunnable() {
|
||||
|
@ -220,7 +220,8 @@ public class InboundHandler {
|
|||
try {
|
||||
error = stream.readException();
|
||||
} catch (Exception e) {
|
||||
error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
|
||||
error = new TransportSerializationException(
|
||||
"Failed to deserialize exception response from stream for handler [" + handler + "]", e);
|
||||
}
|
||||
handleException(handler, error);
|
||||
}
|
||||
|
|
|
@ -619,37 +619,44 @@ public class TransportService extends AbstractLifecycleComponent implements Repo
|
|||
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
|
||||
final TransportRequest request,
|
||||
final TransportRequestOptions options,
|
||||
TransportResponseHandler<T> handler) {
|
||||
final TransportResponseHandler<T> handler) {
|
||||
try {
|
||||
final TransportResponseHandler<T> delegate;
|
||||
if (request.getParentTask().isSet()) {
|
||||
// TODO: capture the connection instead so that we can cancel child tasks on the remote connections.
|
||||
final Releasable unregisterChildNode = taskManager.registerChildNode(request.getParentTask().getId(), connection.getNode());
|
||||
final TransportResponseHandler<T> delegate = handler;
|
||||
handler = new TransportResponseHandler<T>() {
|
||||
delegate = new TransportResponseHandler<T>() {
|
||||
@Override
|
||||
public void handleResponse(T response) {
|
||||
unregisterChildNode.close();
|
||||
delegate.handleResponse(response);
|
||||
handler.handleResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
unregisterChildNode.close();
|
||||
delegate.handleException(exp);
|
||||
handler.handleException(exp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return delegate.executor();
|
||||
return handler.executor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public T read(StreamInput in) throws IOException {
|
||||
return delegate.read(in);
|
||||
return handler.read(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getName() + "/[" + action + "]:" + handler.toString();
|
||||
}
|
||||
};
|
||||
} else {
|
||||
delegate = handler;
|
||||
}
|
||||
asyncSender.sendRequest(connection, action, request, options, handler);
|
||||
asyncSender.sendRequest(connection, action, request, options, delegate);
|
||||
} catch (final Exception ex) {
|
||||
// the caller might not handle this so we invoke the handler
|
||||
final TransportException te;
|
||||
|
|
|
@ -25,6 +25,9 @@ import org.elasticsearch.test.ESTestCase;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
||||
public class ContextPreservingActionListenerTests extends ESTestCase {
|
||||
|
||||
public void testOriginalContextIsPreservedAfterOnResponse() throws IOException {
|
||||
|
@ -150,4 +153,30 @@ public class ContextPreservingActionListenerTests extends ESTestCase {
|
|||
assertNull(threadContext.getHeader("foo"));
|
||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||
}
|
||||
|
||||
public void testToStringIncludesDelegate() {
|
||||
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
final ContextPreservingActionListener<Void> actionListener;
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||
final ActionListener<Void> delegate = new ActionListener<Void>() {
|
||||
@Override
|
||||
public void onResponse(Void aVoid) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "test delegate";
|
||||
}
|
||||
};
|
||||
|
||||
actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext);
|
||||
}
|
||||
|
||||
assertThat(actionListener.toString(), allOf(containsString("test delegate"), containsString("ContextPreservingActionListener")));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,168 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskAwareRequest;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransport;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.hasToString;
|
||||
|
||||
public class TransportServiceDeserializationFailureTests extends ESTestCase {
|
||||
|
||||
public void testDeserializationFailureLogIdentifiesListener() {
|
||||
final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
final DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
|
||||
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "local").build();
|
||||
|
||||
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
|
||||
|
||||
final String testActionName = "internal:test-action";
|
||||
|
||||
final MockTransport transport = new MockTransport() {
|
||||
@Override
|
||||
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
|
||||
if (action.equals(TransportService.HANDSHAKE_ACTION_NAME)) {
|
||||
handleResponse(requestId, new TransportService.HandshakeResponse(otherNode, new ClusterName(""), Version.CURRENT));
|
||||
}
|
||||
}
|
||||
};
|
||||
final TransportService transportService = transport.createTransportService(Settings.EMPTY,
|
||||
deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, ignored -> localNode, null,
|
||||
Collections.emptySet());
|
||||
|
||||
transportService.registerRequestHandler(testActionName, ThreadPool.Names.SAME, TransportRequest.Empty::new,
|
||||
(request, channel, task) -> channel.sendResponse(TransportResponse.Empty.INSTANCE));
|
||||
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
|
||||
final PlainActionFuture<Void> connectionFuture = new PlainActionFuture<>();
|
||||
transportService.connectToNode(otherNode, connectionFuture);
|
||||
assertTrue(connectionFuture.isDone());
|
||||
|
||||
{
|
||||
// requests without a parent task are recorded directly in the response context
|
||||
|
||||
transportService.sendRequest(otherNode, testActionName, TransportRequest.Empty.INSTANCE,
|
||||
TransportRequestOptions.EMPTY, new TransportResponseHandler<TransportResponse.Empty>() {
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty response) {
|
||||
fail("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
fail("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransportResponse.Empty read(StreamInput in) {
|
||||
throw new AssertionError("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "test handler without parent";
|
||||
}
|
||||
});
|
||||
|
||||
final List<Transport.ResponseContext<? extends TransportResponse>> responseContexts
|
||||
= transport.getResponseHandlers().prune(ignored -> true);
|
||||
assertThat(responseContexts, hasSize(1));
|
||||
final TransportResponseHandler<? extends TransportResponse> handler = responseContexts.get(0).handler();
|
||||
assertThat(handler, hasToString(containsString("test handler without parent")));
|
||||
}
|
||||
|
||||
{
|
||||
// requests with a parent task get wrapped up by the transport service, including the action name
|
||||
|
||||
final Task parentTask = transportService.getTaskManager().register("test", "test-action", new TaskAwareRequest() {
|
||||
@Override
|
||||
public void setParentTask(TaskId taskId) {
|
||||
fail("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskId getParentTask() {
|
||||
return TaskId.EMPTY_TASK_ID;
|
||||
}
|
||||
});
|
||||
|
||||
transportService.sendChildRequest(otherNode, testActionName, TransportRequest.Empty.INSTANCE, parentTask,
|
||||
TransportRequestOptions.EMPTY, new TransportResponseHandler<TransportResponse.Empty>() {
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty response) {
|
||||
fail("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
fail("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransportResponse.Empty read(StreamInput in) {
|
||||
throw new AssertionError("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "test handler with parent";
|
||||
}
|
||||
});
|
||||
|
||||
final List<Transport.ResponseContext<? extends TransportResponse>> responseContexts
|
||||
= transport.getResponseHandlers().prune(ignored -> true);
|
||||
assertThat(responseContexts, hasSize(1));
|
||||
final TransportResponseHandler<? extends TransportResponse> handler = responseContexts.get(0).handler();
|
||||
assertThat(handler, hasToString(allOf(containsString("test handler with parent"), containsString(testActionName))));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue