diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 9b0a1dafd14..6110752b421 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -82,7 +82,6 @@ import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -102,7 +101,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker"; - // This is the number of bytes necessary to read the message size private static final int BYTES_NEEDED_FOR_MESSAGE_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE; private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9); @@ -119,7 +117,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements protected final NetworkService networkService; protected final Set profileSettings; - private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener(); + private static final TransportMessageListener NOOP_LISTENER = new TransportMessageListener() {}; + private volatile TransportMessageListener messageListener = NOOP_LISTENER; private final ConcurrentMap profileBoundAddresses = newConcurrentMap(); private final Map> serverChannels = newConcurrentMap(); @@ -181,12 +180,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements protected void doStart() { } - public void addMessageListener(TransportMessageListener listener) { - messageListener.listeners.add(listener); - } - - public boolean removeMessageListener(TransportMessageListener listener) { - return messageListener.listeners.remove(listener); + @Override + public synchronized void setMessageListener(TransportMessageListener listener) { + if (messageListener == NOOP_LISTENER) { + messageListener = listener; + } else { + throw new IllegalStateException("Cannot set message listener twice"); + } } @Override @@ -1184,47 +1184,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } } - private static final class DelegatingTransportMessageListener implements TransportMessageListener { - - private final List listeners = new CopyOnWriteArrayList<>(); - - @Override - public void onRequestReceived(long requestId, String action) { - for (TransportMessageListener listener : listeners) { - listener.onRequestReceived(requestId, action); - } - } - - @Override - public void onResponseSent(long requestId, String action, TransportResponse response) { - for (TransportMessageListener listener : listeners) { - listener.onResponseSent(requestId, action, response); - } - } - - @Override - public void onResponseSent(long requestId, String action, Exception error) { - for (TransportMessageListener listener : listeners) { - listener.onResponseSent(requestId, action, error); - } - } - - @Override - public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, - TransportRequestOptions finalOptions) { - for (TransportMessageListener listener : listeners) { - listener.onRequestSent(node, requestId, action, request, finalOptions); - } - } - - @Override - public void onResponseReceived(long requestId, ResponseContext holder) { - for (TransportMessageListener listener : listeners) { - listener.onResponseReceived(requestId, holder); - } - } - } - @Override public final ResponseHandlers getResponseHandlers() { return responseHandlers; diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index 4a8a061602a..4357f005df5 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -53,9 +53,7 @@ public interface Transport extends LifecycleComponent { */ RequestHandlerRegistry getRequestHandler(String action); - void addMessageListener(TransportMessageListener listener); - - boolean removeMessageListener(TransportMessageListener listener); + void setMessageListener(TransportMessageListener listener); /** * The address the transport is bound on. diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 48ef529f9c3..1288f6fe16f 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -64,6 +64,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.function.Function; @@ -77,6 +78,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake"; private final CountDownLatch blockIncomingRequestsLatch = new CountDownLatch(1); + private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener(); protected final Transport transport; protected final ConnectionManager connectionManager; protected final ThreadPool threadPool; @@ -223,7 +225,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran @Override protected void doStart() { - transport.addMessageListener(this); + transport.setMessageListener(this); connectionManager.addListener(this); transport.start(); if (transport.boundAddress() != null && logger.isInfoEnabled()) { @@ -482,6 +484,14 @@ public class TransportService extends AbstractLifecycleComponent implements Tran connectionManager.disconnectFromNode(node); } + public void addMessageListener(TransportMessageListener listener) { + messageListener.listeners.add(listener); + } + + public boolean removeMessageListener(TransportMessageListener listener) { + return messageListener.listeners.remove(listener); + } + public void addConnectionListener(TransportConnectionListener listener) { connectionManager.addListener(listener); } @@ -854,38 +864,6 @@ public class TransportService extends AbstractLifecycleComponent implements Tran transport.registerRequestHandler(reg); } - /** called by the {@link Transport} implementation once a request has been sent */ - public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, - TransportRequestOptions options) { - if (traceEnabled() && shouldTraceAction(action)) { - traceRequestSent(node, requestId, action, options); - } - } - - protected boolean traceEnabled() { - return tracerLog.isTraceEnabled(); - } - - /** called by the {@link Transport} implementation once a response was sent to calling node */ - @Override - public void onResponseSent(long requestId, String action, TransportResponse response) { - if (traceEnabled() && shouldTraceAction(action)) { - traceResponseSent(requestId, action); - } - } - - /** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */ - @Override - public void onResponseSent(long requestId, String action, Exception e) { - if (traceEnabled() && shouldTraceAction(action)) { - traceResponseSent(requestId, action, e); - } - } - - protected void traceResponseSent(long requestId, String action, Exception e) { - tracerLog.trace(() -> new ParameterizedMessage("[{}][{}] sent error response", requestId, action), e); - } - /** * called by the {@link Transport} implementation when an incoming request arrives but before * any parsing of it has happened (with the exception of the requestId and action) @@ -897,23 +875,52 @@ public class TransportService extends AbstractLifecycleComponent implements Tran } catch (InterruptedException e) { logger.trace("interrupted while waiting for incoming requests block to be removed"); } - if (traceEnabled() && shouldTraceAction(action)) { - traceReceivedRequest(requestId, action); + if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { + tracerLog.trace("[{}][{}] received request", requestId, action); } + messageListener.onRequestReceived(requestId, action); } - public RequestHandlerRegistry getRequestHandler(String action) { - return transport.getRequestHandler(action); + /** called by the {@link Transport} implementation once a request has been sent */ + @Override + public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, + TransportRequestOptions options) { + if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { + tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout()); + } + messageListener.onRequestSent(node, requestId, action, request, options); } - @Override public void onResponseReceived(long requestId, Transport.ResponseContext holder) { if (holder == null) { checkForTimeout(requestId); - } else if (traceEnabled() && shouldTraceAction(holder.action())) { - traceReceivedResponse(requestId, holder.connection().getNode(), holder.action()); + } else if (tracerLog.isTraceEnabled() && shouldTraceAction(holder.action())) { + tracerLog.trace("[{}][{}] received response from [{}]", requestId, holder.action(), holder.connection().getNode()); } + messageListener.onResponseReceived(requestId, holder); + } + + /** called by the {@link Transport} implementation once a response was sent to calling node */ + @Override + public void onResponseSent(long requestId, String action, TransportResponse response) { + if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { + tracerLog.trace("[{}][{}] sent response", requestId, action); + } + messageListener.onResponseSent(requestId, action, response); + } + + /** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */ + @Override + public void onResponseSent(long requestId, String action, Exception e) { + if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { + tracerLog.trace(() -> new ParameterizedMessage("[{}][{}] sent error response", requestId, action), e); + } + messageListener.onResponseSent(requestId, action, e); + } + + public RequestHandlerRegistry getRequestHandler(String action) { + return transport.getRequestHandler(action); } private void checkForTimeout(long requestId) { @@ -935,14 +942,14 @@ public class TransportService extends AbstractLifecycleComponent implements Tran sourceNode = null; } // call tracer out of lock - if (traceEnabled() == false) { + if (tracerLog.isTraceEnabled() == false) { return; } if (action == null) { assert sourceNode == null; - traceUnresolvedResponse(requestId); + tracerLog.trace("[{}] received response but can't resolve it to a request", requestId); } else if (shouldTraceAction(action)) { - traceReceivedResponse(requestId, sourceNode, action); + tracerLog.trace("[{}][{}] received response from [{}]", requestId, action, sourceNode); } } @@ -972,26 +979,6 @@ public class TransportService extends AbstractLifecycleComponent implements Tran } } - protected void traceReceivedRequest(long requestId, String action) { - tracerLog.trace("[{}][{}] received request", requestId, action); - } - - protected void traceResponseSent(long requestId, String action) { - tracerLog.trace("[{}][{}] sent response", requestId, action); - } - - protected void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) { - tracerLog.trace("[{}][{}] received response from [{}]", requestId, action, sourceNode); - } - - protected void traceUnresolvedResponse(long requestId) { - tracerLog.trace("[{}] received response but can't resolve it to a request", requestId); - } - - protected void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { - tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout()); - } - final class TimeoutHandler implements Runnable { private final long requestId; @@ -1257,4 +1244,45 @@ public class TransportService extends AbstractLifecycleComponent implements Tran private boolean isLocalNode(DiscoveryNode discoveryNode) { return Objects.requireNonNull(discoveryNode, "discovery node must not be null").equals(localNode); } + + private static final class DelegatingTransportMessageListener implements TransportMessageListener { + + private final List listeners = new CopyOnWriteArrayList<>(); + + @Override + public void onRequestReceived(long requestId, String action) { + for (TransportMessageListener listener : listeners) { + listener.onRequestReceived(requestId, action); + } + } + + @Override + public void onResponseSent(long requestId, String action, TransportResponse response) { + for (TransportMessageListener listener : listeners) { + listener.onResponseSent(requestId, action, response); + } + } + + @Override + public void onResponseSent(long requestId, String action, Exception error) { + for (TransportMessageListener listener : listeners) { + listener.onResponseSent(requestId, action, error); + } + } + + @Override + public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, + TransportRequestOptions finalOptions) { + for (TransportMessageListener listener : listeners) { + listener.onRequestSent(node, requestId, action, request, finalOptions); + } + } + + @Override + public void onResponseReceived(long requestId, Transport.ResponseContext holder) { + for (TransportMessageListener listener : listeners) { + listener.onResponseReceived(requestId, holder); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index 7ae8156088d..9a74282d51f 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -229,13 +229,7 @@ abstract class FailAndRetryMockTransport imp @Override - public void addMessageListener(TransportMessageListener listener) { + public void setMessageListener(TransportMessageListener listener) { this.listener = listener; } - - @Override - public boolean removeMessageListener(TransportMessageListener listener) { - throw new UnsupportedOperationException(); - } - } diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 0a4ef759cb6..25179427d86 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -387,12 +387,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { } @Override - public void addMessageListener(TransportMessageListener listener) { - } - - @Override - public boolean removeMessageListener(TransportMessageListener listener) { - throw new UnsupportedOperationException(); + public void setMessageListener(TransportMessageListener listener) { } @Override diff --git a/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index 4aea0892e82..6b4eb4e2577 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -43,7 +43,10 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportConnectionListener; +import org.elasticsearch.transport.TransportMessageListener; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; @@ -291,8 +294,8 @@ public class ZenFaultDetectionTests extends ESTestCase { PingProbe pingProbeA = new PingProbe(minExpectedPings); PingProbe pingProbeB = new PingProbe(minExpectedPings); - serviceA.addTracer(pingProbeA); - serviceB.addTracer(pingProbeB); + serviceA.addMessageListener(pingProbeA); + serviceB.addMessageListener(pingProbeB); MasterFaultDetection masterFDNodeA = new MasterFaultDetection(Settings.builder().put(settingsA).put(settings).build(), threadPool, serviceA, clusterStateSupplierA::get, null, clusterName); @@ -321,7 +324,7 @@ public class ZenFaultDetectionTests extends ESTestCase { "release! See the breaking changes documentation for the next major version."); } - private static class PingProbe extends MockTransportService.Tracer { + private static class PingProbe implements TransportMessageListener { private final Set> inflightPings = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set> completedPings = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final CountDownLatch waitForPings; @@ -331,16 +334,17 @@ public class ZenFaultDetectionTests extends ESTestCase { } @Override - public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { + public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, + TransportRequestOptions options) { if (MasterFaultDetection.MASTER_PING_ACTION_NAME.equals(action)) { inflightPings.add(Tuple.tuple(node, requestId)); } } @Override - public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) { - if (MasterFaultDetection.MASTER_PING_ACTION_NAME.equals(action)) { - Tuple ping = Tuple.tuple(sourceNode, requestId); + public void onResponseReceived(long requestId, Transport.ResponseContext context) { + if (MasterFaultDetection.MASTER_PING_ACTION_NAME.equals(context.action())) { + Tuple ping = Tuple.tuple(context.connection().getNode(), requestId); if (inflightPings.remove(ping)) { completedPings.add(ping); waitForPings.countDown(); diff --git a/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index a16fd5d95b4..23c1a837d43 100644 --- a/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -54,6 +54,7 @@ import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.disruption.BlockClusterStateProcessing; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -164,9 +165,9 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { CountDownLatch beginRelocationLatch = new CountDownLatch(1); CountDownLatch receivedShardExistsRequestLatch = new CountDownLatch(1); // use a tracer on the target node to track relocation start and end - transportService.addTracer(new MockTransportService.Tracer() { + transportService.addMessageListener(new TransportMessageListener() { @Override - public void receivedRequest(long requestId, String action) { + public void onRequestReceived(long requestId, String action) { if (action.equals(PeerRecoveryTargetService.Actions.FILES_INFO)) { logger.info("received: {}, relocation starts", action); beginRelocationLatch.countDown(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 1da6f29c193..544dea0600b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1117,7 +1117,6 @@ public final class InternalTestCluster extends TestCluster { if (transportService instanceof MockTransportService) { final MockTransportService mockTransportService = (MockTransportService) transportService; mockTransportService.clearAllRules(); - mockTransportService.clearTracers(); } } randomlyResetClients(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java index 52ba7efa3fa..a9c70deaaea 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java @@ -264,22 +264,13 @@ public class MockTransport implements Transport, LifecycleComponent { } @Override - public void addMessageListener(TransportMessageListener listener) { + public void setMessageListener(TransportMessageListener listener) { if (this.listener != null) { throw new IllegalStateException("listener already set"); } this.listener = listener; } - @Override - public boolean removeMessageListener(TransportMessageListener listener) { - if (listener == this.listener) { - this.listener = null; - return true; - } - return false; - } - protected NamedWriteableRegistry writeableRegistry() { return new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index d34a78ebb9e..408748d4193 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -482,78 +482,6 @@ public final class MockTransportService extends TransportService { return (StubbableConnectionManager) connectionManager; } - List activeTracers = new CopyOnWriteArrayList<>(); - - public static class Tracer { - public void receivedRequest(long requestId, String action) { - } - - public void responseSent(long requestId, String action) { - } - - public void responseSent(long requestId, String action, Throwable t) { - } - - public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) { - } - - public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { - } - } - - public void addTracer(Tracer tracer) { - activeTracers.add(tracer); - } - - public void clearTracers() { - activeTracers.clear(); - } - - @Override - protected boolean traceEnabled() { - return super.traceEnabled() || activeTracers.isEmpty() == false; - } - - @Override - protected void traceReceivedRequest(long requestId, String action) { - super.traceReceivedRequest(requestId, action); - for (Tracer tracer : activeTracers) { - tracer.receivedRequest(requestId, action); - } - } - - @Override - protected void traceResponseSent(long requestId, String action) { - super.traceResponseSent(requestId, action); - for (Tracer tracer : activeTracers) { - tracer.responseSent(requestId, action); - } - } - - @Override - protected void traceResponseSent(long requestId, String action, Exception e) { - super.traceResponseSent(requestId, action, e); - for (Tracer tracer : activeTracers) { - tracer.responseSent(requestId, action, e); - } - } - - @Override - protected void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) { - super.traceReceivedResponse(requestId, sourceNode, action); - for (Tracer tracer : activeTracers) { - tracer.receivedResponse(requestId, sourceNode, action); - } - } - - @Override - protected void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { - super.traceRequestSent(node, requestId, action, options); - for (Tracer tracer : activeTracers) { - tracer.requestSent(node, requestId, action, options); - } - } - public Transport getOriginalTransport() { Transport transport = transport(); while (transport instanceof StubbableTransport) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java index 673ed493875..e2ff8a7ef5d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java @@ -95,13 +95,8 @@ public final class StubbableTransport implements Transport { } @Override - public void addMessageListener(TransportMessageListener listener) { - delegate.addMessageListener(listener); - } - - @Override - public boolean removeMessageListener(TransportMessageListener listener) { - return delegate.removeMessageListener(listener); + public void setMessageListener(TransportMessageListener listener) { + delegate.setMessageListener(listener); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 089b2ac7f7b..5c98d1b9dfb 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -19,6 +19,8 @@ package org.elasticsearch.transport; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.CollectionUtil; @@ -35,6 +37,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; @@ -53,7 +56,9 @@ import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.node.Node; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.StubbableTransport; import org.elasticsearch.threadpool.TestThreadPool; @@ -397,7 +402,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { assertThat(responseString.get(), equalTo("test")); } - public void testAdapterSendReceiveCallbacks() throws Exception { + public void testMessageListeners() throws Exception { final TransportRequestHandler requestHandler = (request, channel, task) -> { try { if (randomBoolean()) { @@ -416,62 +421,62 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { serviceB.registerRequestHandler(ACTION, TransportRequest.Empty::new, ThreadPool.Names.GENERIC, requestHandler); - - class CountingTracer extends MockTransportService.Tracer { + class CountingListener implements TransportMessageListener { AtomicInteger requestsReceived = new AtomicInteger(); AtomicInteger requestsSent = new AtomicInteger(); AtomicInteger responseReceived = new AtomicInteger(); AtomicInteger responseSent = new AtomicInteger(); @Override - public void receivedRequest(long requestId, String action) { + public void onRequestReceived(long requestId, String action) { if (action.equals(ACTION)) { requestsReceived.incrementAndGet(); } } @Override - public void responseSent(long requestId, String action) { + public void onResponseSent(long requestId, String action, TransportResponse response) { if (action.equals(ACTION)) { responseSent.incrementAndGet(); } } @Override - public void responseSent(long requestId, String action, Throwable t) { + public void onResponseSent(long requestId, String action, Exception error) { if (action.equals(ACTION)) { responseSent.incrementAndGet(); } } @Override - public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) { - if (action.equals(ACTION)) { + public void onResponseReceived(long requestId, Transport.ResponseContext context) { + if (context.action().equals(ACTION)) { responseReceived.incrementAndGet(); } } @Override - public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { + public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, + TransportRequestOptions options) { if (action.equals(ACTION)) { requestsSent.incrementAndGet(); } } } - final CountingTracer tracerA = new CountingTracer(); - final CountingTracer tracerB = new CountingTracer(); - serviceA.addTracer(tracerA); - serviceB.addTracer(tracerB); + + final CountingListener tracerA = new CountingListener(); + final CountingListener tracerB = new CountingListener(); + serviceA.addMessageListener(tracerA); + serviceB.addMessageListener(tracerB); try { - serviceA - .submitRequest(nodeB, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get(); + serviceA.submitRequest(nodeB, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(ElasticsearchException.class)); assertThat(ExceptionsHelper.unwrapCause(e.getCause()).getMessage(), equalTo("simulated")); } - // use assert busy as call backs are sometime called after the response have been sent + // use assert busy as callbacks are called on a different thread assertBusy(() -> { assertThat(tracerA.requestsReceived.get(), equalTo(0)); assertThat(tracerA.requestsSent.get(), equalTo(1)); @@ -484,22 +489,41 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { }); try { - serviceA - .submitRequest(nodeA, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get(); + serviceB.submitRequest(nodeA, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(ElasticsearchException.class)); assertThat(ExceptionsHelper.unwrapCause(e.getCause()).getMessage(), equalTo("simulated")); } - // use assert busy as call backs are sometime called after the response have been sent + // use assert busy as callbacks are called on a different thread assertBusy(() -> { assertThat(tracerA.requestsReceived.get(), equalTo(1)); - assertThat(tracerA.requestsSent.get(), equalTo(2)); - assertThat(tracerA.responseReceived.get(), equalTo(2)); + assertThat(tracerA.requestsSent.get(), equalTo(1)); + assertThat(tracerA.responseReceived.get(), equalTo(1)); assertThat(tracerA.responseSent.get(), equalTo(1)); assertThat(tracerB.requestsReceived.get(), equalTo(1)); - assertThat(tracerB.requestsSent.get(), equalTo(0)); - assertThat(tracerB.responseReceived.get(), equalTo(0)); + assertThat(tracerB.requestsSent.get(), equalTo(1)); + assertThat(tracerB.responseReceived.get(), equalTo(1)); + assertThat(tracerB.responseSent.get(), equalTo(1)); + }); + + // use assert busy as callbacks are called on a different thread + try { + serviceA.submitRequest(nodeA, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(ElasticsearchException.class)); + assertThat(ExceptionsHelper.unwrapCause(e.getCause()).getMessage(), equalTo("simulated")); + } + + // use assert busy as callbacks are called on a different thread + assertBusy(() -> { + assertThat(tracerA.requestsReceived.get(), equalTo(2)); + assertThat(tracerA.requestsSent.get(), equalTo(2)); + assertThat(tracerA.responseReceived.get(), equalTo(2)); + assertThat(tracerA.responseSent.get(), equalTo(2)); + assertThat(tracerB.requestsReceived.get(), equalTo(1)); + assertThat(tracerB.requestsSent.get(), equalTo(1)); + assertThat(tracerB.responseReceived.get(), equalTo(1)); assertThat(tracerB.responseSent.get(), equalTo(1)); }); } @@ -973,20 +997,17 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { assertTrue(inFlight.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS)); } - public void testTracerLog() throws InterruptedException { + @TestLogging(value = "org.elasticsearch.transport.TransportService.tracer:trace") + public void testTracerLog() throws Exception { TransportRequestHandler handler = (request, channel, task) -> channel.sendResponse(new StringMessageResponse("")); - TransportRequestHandler handlerWithError = new TransportRequestHandler() { - @Override - public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) throws Exception { - if (request.timeout() > 0) { - Thread.sleep(request.timeout); - } - channel.sendResponse(new RuntimeException("")); - + TransportRequestHandler handlerWithError = (request, channel, task) -> { + if (request.timeout() > 0) { + Thread.sleep(request.timeout); } + channel.sendResponse(new RuntimeException("")); + }; - final Semaphore requestCompleted = new Semaphore(0); TransportResponseHandler noopResponseHandler = new TransportResponseHandler() { @Override @@ -996,12 +1017,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Override public void handleResponse(StringMessageResponse response) { - requestCompleted.release(); } @Override public void handleException(TransportException exp) { - requestCompleted.release(); } @Override @@ -1011,48 +1030,20 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { }; serviceA.registerRequestHandler("internal:test", StringMessageRequest::new, ThreadPool.Names.SAME, handler); + serviceA.registerRequestHandler("internal:testNotSeen", StringMessageRequest::new, ThreadPool.Names.SAME, handler); serviceA.registerRequestHandler("internal:testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError); serviceB.registerRequestHandler("internal:test", StringMessageRequest::new, ThreadPool.Names.SAME, handler); + serviceB.registerRequestHandler("internal:testNotSeen", StringMessageRequest::new, ThreadPool.Names.SAME, handler); serviceB.registerRequestHandler("internal:testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError); - final Tracer tracer = new Tracer(new HashSet<>(Arrays.asList("internal:test", "internal:testError"))); - // the tracer is invoked concurrently after the actual action is executed. that means a Tracer#requestSent can still be in-flight - // from a handshake executed on connect in the setup method. this might confuse this test since it expects exact number of - // invocations. To prevent any unrelated events messing with this test we filter on the actions we execute in this test. - serviceA.addTracer(tracer); - serviceB.addTracer(tracer); - - tracer.reset(4); - boolean timeout = randomBoolean(); - TransportRequestOptions options = timeout ? TransportRequestOptions.builder().withTimeout(1).build() : - TransportRequestOptions.EMPTY; - serviceA.sendRequest(nodeB, "internal:test", new StringMessageRequest("", 10), options, noopResponseHandler); - requestCompleted.acquire(); - tracer.expectedEvents.get().await(); - assertThat("didn't see request sent", tracer.sawRequestSent, equalTo(true)); - assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true)); - assertThat("didn't see response sent", tracer.sawResponseSent, equalTo(true)); - assertThat("didn't see response received", tracer.sawResponseReceived, equalTo(true)); - assertThat("saw error sent", tracer.sawErrorSent, equalTo(false)); - - tracer.reset(4); - serviceA.sendRequest(nodeB, "internal:testError", new StringMessageRequest(""), noopResponseHandler); - requestCompleted.acquire(); - tracer.expectedEvents.get().await(); - assertThat("didn't see request sent", tracer.sawRequestSent, equalTo(true)); - assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true)); - assertThat("saw response sent", tracer.sawResponseSent, equalTo(false)); - assertThat("didn't see response received", tracer.sawResponseReceived, equalTo(true)); - assertThat("didn't see error sent", tracer.sawErrorSent, equalTo(true)); - String includeSettings; String excludeSettings; if (randomBoolean()) { // sometimes leave include empty (default) includeSettings = randomBoolean() ? "*" : ""; - excludeSettings = "*Error"; + excludeSettings = "internal:testNotSeen"; } else { - includeSettings = "internal:test"; + includeSettings = "internal:test,internal:testError"; excludeSettings = "DOESN'T_MATCH"; } clusterSettingsA.applySettings(Settings.builder() @@ -1060,97 +1051,78 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { .put(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.getKey(), excludeSettings) .build()); - tracer.reset(4); - serviceA.sendRequest(nodeB, "internal:test", new StringMessageRequest(""), noopResponseHandler); - requestCompleted.acquire(); - tracer.expectedEvents.get().await(); - assertThat("didn't see request sent", tracer.sawRequestSent, equalTo(true)); - assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true)); - assertThat("didn't see response sent", tracer.sawResponseSent, equalTo(true)); - assertThat("didn't see response received", tracer.sawResponseReceived, equalTo(true)); - assertThat("saw error sent", tracer.sawErrorSent, equalTo(false)); + MockLogAppender appender = new MockLogAppender(); + Loggers.addAppender(LogManager.getLogger("org.elasticsearch.transport.TransportService.tracer"), appender); + try { + appender.start(); - tracer.reset(2); - serviceA.sendRequest(nodeB, "internal:testError", new StringMessageRequest(""), noopResponseHandler); - requestCompleted.acquire(); - tracer.expectedEvents.get().await(); - assertThat("saw request sent", tracer.sawRequestSent, equalTo(false)); - assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true)); - assertThat("saw response sent", tracer.sawResponseSent, equalTo(false)); - assertThat("saw response received", tracer.sawResponseReceived, equalTo(false)); - assertThat("didn't see error sent", tracer.sawErrorSent, equalTo(true)); - } + final String requestSent = ".*\\[internal:test].*sent to.*\\{TS_B}.*"; + final MockLogAppender.LoggingExpectation requestSentExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "sent request", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, requestSent); + final String requestReceived = ".*\\[internal:test].*received request.*"; + final MockLogAppender.LoggingExpectation requestReceivedExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "received request", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, requestReceived); + final String responseSent = ".*\\[internal:test].*sent response.*"; + final MockLogAppender.LoggingExpectation responseSentExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "sent response", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, responseSent); + final String responseReceived = ".*\\[internal:test].*received response from.*\\{TS_B}.*"; + final MockLogAppender.LoggingExpectation responseReceivedExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "received response", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, responseReceived); - private static class Tracer extends MockTransportService.Tracer { - private final Set actions; - public volatile boolean sawRequestSent; - public volatile boolean sawRequestReceived; - public volatile boolean sawResponseSent; - public volatile boolean sawErrorSent; - public volatile boolean sawResponseReceived; + appender.addExpectation(requestSentExpectation); + appender.addExpectation(requestReceivedExpectation); + appender.addExpectation(responseSentExpectation); + appender.addExpectation(responseReceivedExpectation); - public AtomicReference expectedEvents = new AtomicReference<>(); + StringMessageRequest request = new StringMessageRequest("", 10); + serviceA.sendRequest(nodeB, "internal:test", request, TransportRequestOptions.EMPTY, noopResponseHandler); - Tracer(Set actions) { - this.actions = actions; - } + assertBusy(appender::assertAllExpectationsMatched); - @Override - public void receivedRequest(long requestId, String action) { - super.receivedRequest(requestId, action); - if (actions.contains(action)) { - sawRequestReceived = true; - expectedEvents.get().countDown(); - } - } + final String errorResponseSent = ".*\\[internal:testError].*sent error response.*"; + final MockLogAppender.LoggingExpectation errorResponseSentExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "sent error response", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, errorResponseSent); - @Override - public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { - super.requestSent(node, requestId, action, options); - if (actions.contains(action)) { - sawRequestSent = true; - expectedEvents.get().countDown(); - } - } + final String errorResponseReceived = ".*\\[internal:testError].*received response from.*\\{TS_B}.*"; + final MockLogAppender.LoggingExpectation errorResponseReceivedExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "received error response", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, errorResponseReceived); - @Override - public void responseSent(long requestId, String action) { - super.responseSent(requestId, action); - if (actions.contains(action)) { - sawResponseSent = true; - expectedEvents.get().countDown(); - } - } + appender.addExpectation(errorResponseSentExpectation); + appender.addExpectation(errorResponseReceivedExpectation); - @Override - public void responseSent(long requestId, String action, Throwable t) { - super.responseSent(requestId, action, t); - if (actions.contains(action)) { - sawErrorSent = true; - expectedEvents.get().countDown(); - } - } + serviceA.sendRequest(nodeB, "internal:testError", new StringMessageRequest(""), noopResponseHandler); - @Override - public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) { - super.receivedResponse(requestId, sourceNode, action); - if (actions.contains(action)) { - sawResponseReceived = true; - expectedEvents.get().countDown(); - } - } + assertBusy(appender::assertAllExpectationsMatched); - public void reset(int expectedCount) { - sawRequestSent = false; - sawRequestReceived = false; - sawResponseSent = false; - sawErrorSent = false; - sawResponseReceived = false; - expectedEvents.set(new CountDownLatch(expectedCount)); + final String notSeenSent = "*[internal:testNotSeen]*sent to*"; + final MockLogAppender.LoggingExpectation notSeenSentExpectation = + new MockLogAppender.UnseenEventExpectation( + "not seen request sent", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, notSeenSent); + final String notSeenReceived = ".*\\[internal:testNotSeen].*received request.*"; + final MockLogAppender.LoggingExpectation notSeenReceivedExpectation = + new MockLogAppender.PatternSeenEventExcpectation( + "not seen request received", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, notSeenReceived); + + appender.addExpectation(notSeenSentExpectation); + appender.addExpectation(notSeenReceivedExpectation); + + PlainTransportFuture future = new PlainTransportFuture<>(noopResponseHandler); + serviceA.sendRequest(nodeB, "internal:testNotSeen", new StringMessageRequest(""), future); + future.txGet(); + + assertBusy(appender::assertAllExpectationsMatched); + } finally { + Loggers.removeAppender(LogManager.getLogger("org.elasticsearch.transport.TransportService.tracer"), appender); + appender.stop(); } } - public static class StringMessageRequest extends TransportRequest { private String message; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index c3bb73f0452..dceb2db9b54 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -868,7 +868,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { removeLeaseLatch.countDown(); unfollowLatch.await(); - senderTransportService.transport().addMessageListener(new TransportMessageListener() { + senderTransportService.addMessageListener(new TransportMessageListener() { @SuppressWarnings("rawtypes") @Override @@ -880,7 +880,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { new RetentionLeaseNotFoundException(retentionLeaseId); context.handler().handleException(new RemoteTransportException(e.getMessage(), e)); responseLatch.countDown(); - senderTransportService.transport().removeMessageListener(this); + senderTransportService.removeMessageListener(this); } }