Move TransportMessageListener to TransportService (#40474)

Currently the TransportMessageListener is applied and used in the
Transport class. However, local requests and responses never make it to
this class. This PR moves the listener add/remove methods to the
TransportService. After this change the Transport can only have one
listener set with it. This one listener is the TransportService, which
will then propogate the events to the external listeners.

Additionally this commit back ports #40237

Remove Tracer from MockTransportService

Currently the TransportMessageListener is applied and used in the
Transport class. However, local requests and responses never make it to
this class. This PR moves the listener add/remove methods to the
TransportService. After this change the Transport can only have one
listener set with it. This one listener is the TransportService, which
will then propogate the events to the external listeners.
This commit is contained in:
Tim Brooks 2019-03-27 09:24:20 -06:00 committed by GitHub
parent 03aaeb35cc
commit 760cfffe4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 239 additions and 375 deletions

View File

@ -82,7 +82,6 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -102,7 +101,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker"; public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker";
// This is the number of bytes necessary to read the message size // This is the number of bytes necessary to read the message size
private static final int BYTES_NEEDED_FOR_MESSAGE_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE; private static final int BYTES_NEEDED_FOR_MESSAGE_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9); private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
@ -119,7 +117,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected final NetworkService networkService; protected final NetworkService networkService;
protected final Set<ProfileSettings> profileSettings; protected final Set<ProfileSettings> profileSettings;
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener(); private static final TransportMessageListener NOOP_LISTENER = new TransportMessageListener() {};
private volatile TransportMessageListener messageListener = NOOP_LISTENER;
private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap(); private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
private final Map<String, List<TcpServerChannel>> serverChannels = newConcurrentMap(); private final Map<String, List<TcpServerChannel>> serverChannels = newConcurrentMap();
@ -181,12 +180,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected void doStart() { protected void doStart() {
} }
public void addMessageListener(TransportMessageListener listener) { @Override
messageListener.listeners.add(listener); public synchronized void setMessageListener(TransportMessageListener listener) {
if (messageListener == NOOP_LISTENER) {
messageListener = listener;
} else {
throw new IllegalStateException("Cannot set message listener twice");
} }
public boolean removeMessageListener(TransportMessageListener listener) {
return messageListener.listeners.remove(listener);
} }
@Override @Override
@ -1184,47 +1184,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
} }
} }
private static final class DelegatingTransportMessageListener implements TransportMessageListener {
private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<>();
@Override
public void onRequestReceived(long requestId, String action) {
for (TransportMessageListener listener : listeners) {
listener.onRequestReceived(requestId, action);
}
}
@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, response);
}
}
@Override
public void onResponseSent(long requestId, String action, Exception error) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, error);
}
}
@Override
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions finalOptions) {
for (TransportMessageListener listener : listeners) {
listener.onRequestSent(node, requestId, action, request, finalOptions);
}
}
@Override
public void onResponseReceived(long requestId, ResponseContext holder) {
for (TransportMessageListener listener : listeners) {
listener.onResponseReceived(requestId, holder);
}
}
}
@Override @Override
public final ResponseHandlers getResponseHandlers() { public final ResponseHandlers getResponseHandlers() {
return responseHandlers; return responseHandlers;

View File

@ -53,9 +53,7 @@ public interface Transport extends LifecycleComponent {
*/ */
RequestHandlerRegistry<? extends TransportRequest> getRequestHandler(String action); RequestHandlerRegistry<? extends TransportRequest> getRequestHandler(String action);
void addMessageListener(TransportMessageListener listener); void setMessageListener(TransportMessageListener listener);
boolean removeMessageListener(TransportMessageListener listener);
/** /**
* The address the transport is bound on. * The address the transport is bound on.

View File

@ -64,6 +64,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.function.Function; import java.util.function.Function;
@ -77,6 +78,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake"; public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake";
private final CountDownLatch blockIncomingRequestsLatch = new CountDownLatch(1); private final CountDownLatch blockIncomingRequestsLatch = new CountDownLatch(1);
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();
protected final Transport transport; protected final Transport transport;
protected final ConnectionManager connectionManager; protected final ConnectionManager connectionManager;
protected final ThreadPool threadPool; protected final ThreadPool threadPool;
@ -223,7 +225,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
@Override @Override
protected void doStart() { protected void doStart() {
transport.addMessageListener(this); transport.setMessageListener(this);
connectionManager.addListener(this); connectionManager.addListener(this);
transport.start(); transport.start();
if (transport.boundAddress() != null && logger.isInfoEnabled()) { if (transport.boundAddress() != null && logger.isInfoEnabled()) {
@ -482,6 +484,14 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
connectionManager.disconnectFromNode(node); connectionManager.disconnectFromNode(node);
} }
public void addMessageListener(TransportMessageListener listener) {
messageListener.listeners.add(listener);
}
public boolean removeMessageListener(TransportMessageListener listener) {
return messageListener.listeners.remove(listener);
}
public void addConnectionListener(TransportConnectionListener listener) { public void addConnectionListener(TransportConnectionListener listener) {
connectionManager.addListener(listener); connectionManager.addListener(listener);
} }
@ -854,38 +864,6 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
transport.registerRequestHandler(reg); transport.registerRequestHandler(reg);
} }
/** called by the {@link Transport} implementation once a request has been sent */
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) {
if (traceEnabled() && shouldTraceAction(action)) {
traceRequestSent(node, requestId, action, options);
}
}
protected boolean traceEnabled() {
return tracerLog.isTraceEnabled();
}
/** called by the {@link Transport} implementation once a response was sent to calling node */
@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
if (traceEnabled() && shouldTraceAction(action)) {
traceResponseSent(requestId, action);
}
}
/** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */
@Override
public void onResponseSent(long requestId, String action, Exception e) {
if (traceEnabled() && shouldTraceAction(action)) {
traceResponseSent(requestId, action, e);
}
}
protected void traceResponseSent(long requestId, String action, Exception e) {
tracerLog.trace(() -> new ParameterizedMessage("[{}][{}] sent error response", requestId, action), e);
}
/** /**
* called by the {@link Transport} implementation when an incoming request arrives but before * called by the {@link Transport} implementation when an incoming request arrives but before
* any parsing of it has happened (with the exception of the requestId and action) * any parsing of it has happened (with the exception of the requestId and action)
@ -897,23 +875,52 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.trace("interrupted while waiting for incoming requests block to be removed"); logger.trace("interrupted while waiting for incoming requests block to be removed");
} }
if (traceEnabled() && shouldTraceAction(action)) { if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
traceReceivedRequest(requestId, action); tracerLog.trace("[{}][{}] received request", requestId, action);
} }
messageListener.onRequestReceived(requestId, action);
} }
public RequestHandlerRegistry<? extends TransportRequest> getRequestHandler(String action) { /** called by the {@link Transport} implementation once a request has been sent */
return transport.getRequestHandler(action); @Override
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) {
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout());
}
messageListener.onRequestSent(node, requestId, action, request, options);
} }
@Override @Override
public void onResponseReceived(long requestId, Transport.ResponseContext holder) { public void onResponseReceived(long requestId, Transport.ResponseContext holder) {
if (holder == null) { if (holder == null) {
checkForTimeout(requestId); checkForTimeout(requestId);
} else if (traceEnabled() && shouldTraceAction(holder.action())) { } else if (tracerLog.isTraceEnabled() && shouldTraceAction(holder.action())) {
traceReceivedResponse(requestId, holder.connection().getNode(), holder.action()); tracerLog.trace("[{}][{}] received response from [{}]", requestId, holder.action(), holder.connection().getNode());
} }
messageListener.onResponseReceived(requestId, holder);
}
/** called by the {@link Transport} implementation once a response was sent to calling node */
@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace("[{}][{}] sent response", requestId, action);
}
messageListener.onResponseSent(requestId, action, response);
}
/** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */
@Override
public void onResponseSent(long requestId, String action, Exception e) {
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace(() -> new ParameterizedMessage("[{}][{}] sent error response", requestId, action), e);
}
messageListener.onResponseSent(requestId, action, e);
}
public RequestHandlerRegistry<? extends TransportRequest> getRequestHandler(String action) {
return transport.getRequestHandler(action);
} }
private void checkForTimeout(long requestId) { private void checkForTimeout(long requestId) {
@ -935,14 +942,14 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
sourceNode = null; sourceNode = null;
} }
// call tracer out of lock // call tracer out of lock
if (traceEnabled() == false) { if (tracerLog.isTraceEnabled() == false) {
return; return;
} }
if (action == null) { if (action == null) {
assert sourceNode == null; assert sourceNode == null;
traceUnresolvedResponse(requestId); tracerLog.trace("[{}] received response but can't resolve it to a request", requestId);
} else if (shouldTraceAction(action)) { } else if (shouldTraceAction(action)) {
traceReceivedResponse(requestId, sourceNode, action); tracerLog.trace("[{}][{}] received response from [{}]", requestId, action, sourceNode);
} }
} }
@ -972,26 +979,6 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
} }
} }
protected void traceReceivedRequest(long requestId, String action) {
tracerLog.trace("[{}][{}] received request", requestId, action);
}
protected void traceResponseSent(long requestId, String action) {
tracerLog.trace("[{}][{}] sent response", requestId, action);
}
protected void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
tracerLog.trace("[{}][{}] received response from [{}]", requestId, action, sourceNode);
}
protected void traceUnresolvedResponse(long requestId) {
tracerLog.trace("[{}] received response but can't resolve it to a request", requestId);
}
protected void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout());
}
final class TimeoutHandler implements Runnable { final class TimeoutHandler implements Runnable {
private final long requestId; private final long requestId;
@ -1257,4 +1244,45 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
private boolean isLocalNode(DiscoveryNode discoveryNode) { private boolean isLocalNode(DiscoveryNode discoveryNode) {
return Objects.requireNonNull(discoveryNode, "discovery node must not be null").equals(localNode); return Objects.requireNonNull(discoveryNode, "discovery node must not be null").equals(localNode);
} }
private static final class DelegatingTransportMessageListener implements TransportMessageListener {
private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<>();
@Override
public void onRequestReceived(long requestId, String action) {
for (TransportMessageListener listener : listeners) {
listener.onRequestReceived(requestId, action);
}
}
@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, response);
}
}
@Override
public void onResponseSent(long requestId, String action, Exception error) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, error);
}
}
@Override
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions finalOptions) {
for (TransportMessageListener listener : listeners) {
listener.onRequestSent(node, requestId, action, request, finalOptions);
}
}
@Override
public void onResponseReceived(long requestId, Transport.ResponseContext holder) {
for (TransportMessageListener listener : listeners) {
listener.onResponseReceived(requestId, holder);
}
}
}
} }

View File

@ -229,13 +229,7 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
@Override @Override
public void addMessageListener(TransportMessageListener listener) { public void setMessageListener(TransportMessageListener listener) {
this.listener = listener; this.listener = listener;
} }
@Override
public boolean removeMessageListener(TransportMessageListener listener) {
throw new UnsupportedOperationException();
}
} }

View File

@ -387,12 +387,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
} }
@Override @Override
public void addMessageListener(TransportMessageListener listener) { public void setMessageListener(TransportMessageListener listener) {
}
@Override
public boolean removeMessageListener(TransportMessageListener listener) {
throw new UnsupportedOperationException();
} }
@Override @Override

View File

@ -43,7 +43,10 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.transport.TransportSettings;
@ -291,8 +294,8 @@ public class ZenFaultDetectionTests extends ESTestCase {
PingProbe pingProbeA = new PingProbe(minExpectedPings); PingProbe pingProbeA = new PingProbe(minExpectedPings);
PingProbe pingProbeB = new PingProbe(minExpectedPings); PingProbe pingProbeB = new PingProbe(minExpectedPings);
serviceA.addTracer(pingProbeA); serviceA.addMessageListener(pingProbeA);
serviceB.addTracer(pingProbeB); serviceB.addMessageListener(pingProbeB);
MasterFaultDetection masterFDNodeA = new MasterFaultDetection(Settings.builder().put(settingsA).put(settings).build(), MasterFaultDetection masterFDNodeA = new MasterFaultDetection(Settings.builder().put(settingsA).put(settings).build(),
threadPool, serviceA, clusterStateSupplierA::get, null, clusterName); threadPool, serviceA, clusterStateSupplierA::get, null, clusterName);
@ -321,7 +324,7 @@ public class ZenFaultDetectionTests extends ESTestCase {
"release! See the breaking changes documentation for the next major version."); "release! See the breaking changes documentation for the next major version.");
} }
private static class PingProbe extends MockTransportService.Tracer { private static class PingProbe implements TransportMessageListener {
private final Set<Tuple<DiscoveryNode, Long>> inflightPings = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set<Tuple<DiscoveryNode, Long>> inflightPings = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Tuple<DiscoveryNode, Long>> completedPings = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set<Tuple<DiscoveryNode, Long>> completedPings = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final CountDownLatch waitForPings; private final CountDownLatch waitForPings;
@ -331,16 +334,17 @@ public class ZenFaultDetectionTests extends ESTestCase {
} }
@Override @Override
public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) {
if (MasterFaultDetection.MASTER_PING_ACTION_NAME.equals(action)) { if (MasterFaultDetection.MASTER_PING_ACTION_NAME.equals(action)) {
inflightPings.add(Tuple.tuple(node, requestId)); inflightPings.add(Tuple.tuple(node, requestId));
} }
} }
@Override @Override
public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) { public void onResponseReceived(long requestId, Transport.ResponseContext context) {
if (MasterFaultDetection.MASTER_PING_ACTION_NAME.equals(action)) { if (MasterFaultDetection.MASTER_PING_ACTION_NAME.equals(context.action())) {
Tuple<DiscoveryNode, Long> ping = Tuple.tuple(sourceNode, requestId); Tuple<DiscoveryNode, Long> ping = Tuple.tuple(context.connection().getNode(), requestId);
if (inflightPings.remove(ping)) { if (inflightPings.remove(ping)) {
completedPings.add(ping); completedPings.add(ping);
waitForPings.countDown(); waitForPings.countDown();

View File

@ -54,6 +54,7 @@ import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.BlockClusterStateProcessing; import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
@ -164,9 +165,9 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
CountDownLatch beginRelocationLatch = new CountDownLatch(1); CountDownLatch beginRelocationLatch = new CountDownLatch(1);
CountDownLatch receivedShardExistsRequestLatch = new CountDownLatch(1); CountDownLatch receivedShardExistsRequestLatch = new CountDownLatch(1);
// use a tracer on the target node to track relocation start and end // use a tracer on the target node to track relocation start and end
transportService.addTracer(new MockTransportService.Tracer() { transportService.addMessageListener(new TransportMessageListener() {
@Override @Override
public void receivedRequest(long requestId, String action) { public void onRequestReceived(long requestId, String action) {
if (action.equals(PeerRecoveryTargetService.Actions.FILES_INFO)) { if (action.equals(PeerRecoveryTargetService.Actions.FILES_INFO)) {
logger.info("received: {}, relocation starts", action); logger.info("received: {}, relocation starts", action);
beginRelocationLatch.countDown(); beginRelocationLatch.countDown();

View File

@ -1117,7 +1117,6 @@ public final class InternalTestCluster extends TestCluster {
if (transportService instanceof MockTransportService) { if (transportService instanceof MockTransportService) {
final MockTransportService mockTransportService = (MockTransportService) transportService; final MockTransportService mockTransportService = (MockTransportService) transportService;
mockTransportService.clearAllRules(); mockTransportService.clearAllRules();
mockTransportService.clearTracers();
} }
} }
randomlyResetClients(); randomlyResetClients();

View File

@ -264,22 +264,13 @@ public class MockTransport implements Transport, LifecycleComponent {
} }
@Override @Override
public void addMessageListener(TransportMessageListener listener) { public void setMessageListener(TransportMessageListener listener) {
if (this.listener != null) { if (this.listener != null) {
throw new IllegalStateException("listener already set"); throw new IllegalStateException("listener already set");
} }
this.listener = listener; this.listener = listener;
} }
@Override
public boolean removeMessageListener(TransportMessageListener listener) {
if (listener == this.listener) {
this.listener = null;
return true;
}
return false;
}
protected NamedWriteableRegistry writeableRegistry() { protected NamedWriteableRegistry writeableRegistry() {
return new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); return new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
} }

View File

@ -482,78 +482,6 @@ public final class MockTransportService extends TransportService {
return (StubbableConnectionManager) connectionManager; return (StubbableConnectionManager) connectionManager;
} }
List<Tracer> activeTracers = new CopyOnWriteArrayList<>();
public static class Tracer {
public void receivedRequest(long requestId, String action) {
}
public void responseSent(long requestId, String action) {
}
public void responseSent(long requestId, String action, Throwable t) {
}
public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
}
public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
}
}
public void addTracer(Tracer tracer) {
activeTracers.add(tracer);
}
public void clearTracers() {
activeTracers.clear();
}
@Override
protected boolean traceEnabled() {
return super.traceEnabled() || activeTracers.isEmpty() == false;
}
@Override
protected void traceReceivedRequest(long requestId, String action) {
super.traceReceivedRequest(requestId, action);
for (Tracer tracer : activeTracers) {
tracer.receivedRequest(requestId, action);
}
}
@Override
protected void traceResponseSent(long requestId, String action) {
super.traceResponseSent(requestId, action);
for (Tracer tracer : activeTracers) {
tracer.responseSent(requestId, action);
}
}
@Override
protected void traceResponseSent(long requestId, String action, Exception e) {
super.traceResponseSent(requestId, action, e);
for (Tracer tracer : activeTracers) {
tracer.responseSent(requestId, action, e);
}
}
@Override
protected void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
super.traceReceivedResponse(requestId, sourceNode, action);
for (Tracer tracer : activeTracers) {
tracer.receivedResponse(requestId, sourceNode, action);
}
}
@Override
protected void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
super.traceRequestSent(node, requestId, action, options);
for (Tracer tracer : activeTracers) {
tracer.requestSent(node, requestId, action, options);
}
}
public Transport getOriginalTransport() { public Transport getOriginalTransport() {
Transport transport = transport(); Transport transport = transport();
while (transport instanceof StubbableTransport) { while (transport instanceof StubbableTransport) {

View File

@ -95,13 +95,8 @@ public final class StubbableTransport implements Transport {
} }
@Override @Override
public void addMessageListener(TransportMessageListener listener) { public void setMessageListener(TransportMessageListener listener) {
delegate.addMessageListener(listener); delegate.setMessageListener(listener);
}
@Override
public boolean removeMessageListener(TransportMessageListener listener) {
return delegate.removeMessageListener(listener);
} }
@Override @Override

View File

@ -19,6 +19,8 @@
package org.elasticsearch.transport; package org.elasticsearch.transport;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.CollectionUtil;
@ -35,6 +37,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.network.NetworkUtils;
@ -53,7 +56,9 @@ import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.StubbableTransport; import org.elasticsearch.test.transport.StubbableTransport;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
@ -397,7 +402,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
assertThat(responseString.get(), equalTo("test")); assertThat(responseString.get(), equalTo("test"));
} }
public void testAdapterSendReceiveCallbacks() throws Exception { public void testMessageListeners() throws Exception {
final TransportRequestHandler<TransportRequest.Empty> requestHandler = (request, channel, task) -> { final TransportRequestHandler<TransportRequest.Empty> requestHandler = (request, channel, task) -> {
try { try {
if (randomBoolean()) { if (randomBoolean()) {
@ -416,62 +421,62 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceB.registerRequestHandler(ACTION, TransportRequest.Empty::new, ThreadPool.Names.GENERIC, serviceB.registerRequestHandler(ACTION, TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
requestHandler); requestHandler);
class CountingListener implements TransportMessageListener {
class CountingTracer extends MockTransportService.Tracer {
AtomicInteger requestsReceived = new AtomicInteger(); AtomicInteger requestsReceived = new AtomicInteger();
AtomicInteger requestsSent = new AtomicInteger(); AtomicInteger requestsSent = new AtomicInteger();
AtomicInteger responseReceived = new AtomicInteger(); AtomicInteger responseReceived = new AtomicInteger();
AtomicInteger responseSent = new AtomicInteger(); AtomicInteger responseSent = new AtomicInteger();
@Override @Override
public void receivedRequest(long requestId, String action) { public void onRequestReceived(long requestId, String action) {
if (action.equals(ACTION)) { if (action.equals(ACTION)) {
requestsReceived.incrementAndGet(); requestsReceived.incrementAndGet();
} }
} }
@Override @Override
public void responseSent(long requestId, String action) { public void onResponseSent(long requestId, String action, TransportResponse response) {
if (action.equals(ACTION)) { if (action.equals(ACTION)) {
responseSent.incrementAndGet(); responseSent.incrementAndGet();
} }
} }
@Override @Override
public void responseSent(long requestId, String action, Throwable t) { public void onResponseSent(long requestId, String action, Exception error) {
if (action.equals(ACTION)) { if (action.equals(ACTION)) {
responseSent.incrementAndGet(); responseSent.incrementAndGet();
} }
} }
@Override @Override
public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) { public void onResponseReceived(long requestId, Transport.ResponseContext context) {
if (action.equals(ACTION)) { if (context.action().equals(ACTION)) {
responseReceived.incrementAndGet(); responseReceived.incrementAndGet();
} }
} }
@Override @Override
public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) {
if (action.equals(ACTION)) { if (action.equals(ACTION)) {
requestsSent.incrementAndGet(); requestsSent.incrementAndGet();
} }
} }
} }
final CountingTracer tracerA = new CountingTracer();
final CountingTracer tracerB = new CountingTracer(); final CountingListener tracerA = new CountingListener();
serviceA.addTracer(tracerA); final CountingListener tracerB = new CountingListener();
serviceB.addTracer(tracerB); serviceA.addMessageListener(tracerA);
serviceB.addMessageListener(tracerB);
try { try {
serviceA serviceA.submitRequest(nodeB, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get();
.submitRequest(nodeB, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get();
} catch (ExecutionException e) { } catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(ElasticsearchException.class)); assertThat(e.getCause(), instanceOf(ElasticsearchException.class));
assertThat(ExceptionsHelper.unwrapCause(e.getCause()).getMessage(), equalTo("simulated")); assertThat(ExceptionsHelper.unwrapCause(e.getCause()).getMessage(), equalTo("simulated"));
} }
// use assert busy as call backs are sometime called after the response have been sent // use assert busy as callbacks are called on a different thread
assertBusy(() -> { assertBusy(() -> {
assertThat(tracerA.requestsReceived.get(), equalTo(0)); assertThat(tracerA.requestsReceived.get(), equalTo(0));
assertThat(tracerA.requestsSent.get(), equalTo(1)); assertThat(tracerA.requestsSent.get(), equalTo(1));
@ -484,22 +489,41 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}); });
try { try {
serviceA serviceB.submitRequest(nodeA, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get();
.submitRequest(nodeA, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get();
} catch (ExecutionException e) { } catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(ElasticsearchException.class)); assertThat(e.getCause(), instanceOf(ElasticsearchException.class));
assertThat(ExceptionsHelper.unwrapCause(e.getCause()).getMessage(), equalTo("simulated")); assertThat(ExceptionsHelper.unwrapCause(e.getCause()).getMessage(), equalTo("simulated"));
} }
// use assert busy as call backs are sometime called after the response have been sent // use assert busy as callbacks are called on a different thread
assertBusy(() -> { assertBusy(() -> {
assertThat(tracerA.requestsReceived.get(), equalTo(1)); assertThat(tracerA.requestsReceived.get(), equalTo(1));
assertThat(tracerA.requestsSent.get(), equalTo(2)); assertThat(tracerA.requestsSent.get(), equalTo(1));
assertThat(tracerA.responseReceived.get(), equalTo(2)); assertThat(tracerA.responseReceived.get(), equalTo(1));
assertThat(tracerA.responseSent.get(), equalTo(1)); assertThat(tracerA.responseSent.get(), equalTo(1));
assertThat(tracerB.requestsReceived.get(), equalTo(1)); assertThat(tracerB.requestsReceived.get(), equalTo(1));
assertThat(tracerB.requestsSent.get(), equalTo(0)); assertThat(tracerB.requestsSent.get(), equalTo(1));
assertThat(tracerB.responseReceived.get(), equalTo(0)); assertThat(tracerB.responseReceived.get(), equalTo(1));
assertThat(tracerB.responseSent.get(), equalTo(1));
});
// use assert busy as callbacks are called on a different thread
try {
serviceA.submitRequest(nodeA, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(ElasticsearchException.class));
assertThat(ExceptionsHelper.unwrapCause(e.getCause()).getMessage(), equalTo("simulated"));
}
// use assert busy as callbacks are called on a different thread
assertBusy(() -> {
assertThat(tracerA.requestsReceived.get(), equalTo(2));
assertThat(tracerA.requestsSent.get(), equalTo(2));
assertThat(tracerA.responseReceived.get(), equalTo(2));
assertThat(tracerA.responseSent.get(), equalTo(2));
assertThat(tracerB.requestsReceived.get(), equalTo(1));
assertThat(tracerB.requestsSent.get(), equalTo(1));
assertThat(tracerB.responseReceived.get(), equalTo(1));
assertThat(tracerB.responseSent.get(), equalTo(1)); assertThat(tracerB.responseSent.get(), equalTo(1));
}); });
} }
@ -973,20 +997,17 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
assertTrue(inFlight.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS)); assertTrue(inFlight.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
} }
public void testTracerLog() throws InterruptedException { @TestLogging(value = "org.elasticsearch.transport.TransportService.tracer:trace")
public void testTracerLog() throws Exception {
TransportRequestHandler<TransportRequest> handler = (request, channel, task) -> channel.sendResponse(new StringMessageResponse("")); TransportRequestHandler<TransportRequest> handler = (request, channel, task) -> channel.sendResponse(new StringMessageResponse(""));
TransportRequestHandler<StringMessageRequest> handlerWithError = new TransportRequestHandler<StringMessageRequest>() { TransportRequestHandler<StringMessageRequest> handlerWithError = (request, channel, task) -> {
@Override
public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) throws Exception {
if (request.timeout() > 0) { if (request.timeout() > 0) {
Thread.sleep(request.timeout); Thread.sleep(request.timeout);
} }
channel.sendResponse(new RuntimeException("")); channel.sendResponse(new RuntimeException(""));
}
}; };
final Semaphore requestCompleted = new Semaphore(0);
TransportResponseHandler<StringMessageResponse> noopResponseHandler = new TransportResponseHandler<StringMessageResponse>() { TransportResponseHandler<StringMessageResponse> noopResponseHandler = new TransportResponseHandler<StringMessageResponse>() {
@Override @Override
@ -996,12 +1017,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
@Override @Override
public void handleResponse(StringMessageResponse response) { public void handleResponse(StringMessageResponse response) {
requestCompleted.release();
} }
@Override @Override
public void handleException(TransportException exp) { public void handleException(TransportException exp) {
requestCompleted.release();
} }
@Override @Override
@ -1011,48 +1030,20 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}; };
serviceA.registerRequestHandler("internal:test", StringMessageRequest::new, ThreadPool.Names.SAME, handler); serviceA.registerRequestHandler("internal:test", StringMessageRequest::new, ThreadPool.Names.SAME, handler);
serviceA.registerRequestHandler("internal:testNotSeen", StringMessageRequest::new, ThreadPool.Names.SAME, handler);
serviceA.registerRequestHandler("internal:testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError); serviceA.registerRequestHandler("internal:testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError);
serviceB.registerRequestHandler("internal:test", StringMessageRequest::new, ThreadPool.Names.SAME, handler); serviceB.registerRequestHandler("internal:test", StringMessageRequest::new, ThreadPool.Names.SAME, handler);
serviceB.registerRequestHandler("internal:testNotSeen", StringMessageRequest::new, ThreadPool.Names.SAME, handler);
serviceB.registerRequestHandler("internal:testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError); serviceB.registerRequestHandler("internal:testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError);
final Tracer tracer = new Tracer(new HashSet<>(Arrays.asList("internal:test", "internal:testError")));
// the tracer is invoked concurrently after the actual action is executed. that means a Tracer#requestSent can still be in-flight
// from a handshake executed on connect in the setup method. this might confuse this test since it expects exact number of
// invocations. To prevent any unrelated events messing with this test we filter on the actions we execute in this test.
serviceA.addTracer(tracer);
serviceB.addTracer(tracer);
tracer.reset(4);
boolean timeout = randomBoolean();
TransportRequestOptions options = timeout ? TransportRequestOptions.builder().withTimeout(1).build() :
TransportRequestOptions.EMPTY;
serviceA.sendRequest(nodeB, "internal:test", new StringMessageRequest("", 10), options, noopResponseHandler);
requestCompleted.acquire();
tracer.expectedEvents.get().await();
assertThat("didn't see request sent", tracer.sawRequestSent, equalTo(true));
assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true));
assertThat("didn't see response sent", tracer.sawResponseSent, equalTo(true));
assertThat("didn't see response received", tracer.sawResponseReceived, equalTo(true));
assertThat("saw error sent", tracer.sawErrorSent, equalTo(false));
tracer.reset(4);
serviceA.sendRequest(nodeB, "internal:testError", new StringMessageRequest(""), noopResponseHandler);
requestCompleted.acquire();
tracer.expectedEvents.get().await();
assertThat("didn't see request sent", tracer.sawRequestSent, equalTo(true));
assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true));
assertThat("saw response sent", tracer.sawResponseSent, equalTo(false));
assertThat("didn't see response received", tracer.sawResponseReceived, equalTo(true));
assertThat("didn't see error sent", tracer.sawErrorSent, equalTo(true));
String includeSettings; String includeSettings;
String excludeSettings; String excludeSettings;
if (randomBoolean()) { if (randomBoolean()) {
// sometimes leave include empty (default) // sometimes leave include empty (default)
includeSettings = randomBoolean() ? "*" : ""; includeSettings = randomBoolean() ? "*" : "";
excludeSettings = "*Error"; excludeSettings = "internal:testNotSeen";
} else { } else {
includeSettings = "internal:test"; includeSettings = "internal:test,internal:testError";
excludeSettings = "DOESN'T_MATCH"; excludeSettings = "DOESN'T_MATCH";
} }
clusterSettingsA.applySettings(Settings.builder() clusterSettingsA.applySettings(Settings.builder()
@ -1060,97 +1051,78 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
.put(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.getKey(), excludeSettings) .put(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.getKey(), excludeSettings)
.build()); .build());
tracer.reset(4); MockLogAppender appender = new MockLogAppender();
serviceA.sendRequest(nodeB, "internal:test", new StringMessageRequest(""), noopResponseHandler); Loggers.addAppender(LogManager.getLogger("org.elasticsearch.transport.TransportService.tracer"), appender);
requestCompleted.acquire(); try {
tracer.expectedEvents.get().await(); appender.start();
assertThat("didn't see request sent", tracer.sawRequestSent, equalTo(true));
assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true)); final String requestSent = ".*\\[internal:test].*sent to.*\\{TS_B}.*";
assertThat("didn't see response sent", tracer.sawResponseSent, equalTo(true)); final MockLogAppender.LoggingExpectation requestSentExpectation =
assertThat("didn't see response received", tracer.sawResponseReceived, equalTo(true)); new MockLogAppender.PatternSeenEventExcpectation(
assertThat("saw error sent", tracer.sawErrorSent, equalTo(false)); "sent request", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, requestSent);
final String requestReceived = ".*\\[internal:test].*received request.*";
final MockLogAppender.LoggingExpectation requestReceivedExpectation =
new MockLogAppender.PatternSeenEventExcpectation(
"received request", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, requestReceived);
final String responseSent = ".*\\[internal:test].*sent response.*";
final MockLogAppender.LoggingExpectation responseSentExpectation =
new MockLogAppender.PatternSeenEventExcpectation(
"sent response", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, responseSent);
final String responseReceived = ".*\\[internal:test].*received response from.*\\{TS_B}.*";
final MockLogAppender.LoggingExpectation responseReceivedExpectation =
new MockLogAppender.PatternSeenEventExcpectation(
"received response", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, responseReceived);
appender.addExpectation(requestSentExpectation);
appender.addExpectation(requestReceivedExpectation);
appender.addExpectation(responseSentExpectation);
appender.addExpectation(responseReceivedExpectation);
StringMessageRequest request = new StringMessageRequest("", 10);
serviceA.sendRequest(nodeB, "internal:test", request, TransportRequestOptions.EMPTY, noopResponseHandler);
assertBusy(appender::assertAllExpectationsMatched);
final String errorResponseSent = ".*\\[internal:testError].*sent error response.*";
final MockLogAppender.LoggingExpectation errorResponseSentExpectation =
new MockLogAppender.PatternSeenEventExcpectation(
"sent error response", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, errorResponseSent);
final String errorResponseReceived = ".*\\[internal:testError].*received response from.*\\{TS_B}.*";
final MockLogAppender.LoggingExpectation errorResponseReceivedExpectation =
new MockLogAppender.PatternSeenEventExcpectation(
"received error response", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, errorResponseReceived);
appender.addExpectation(errorResponseSentExpectation);
appender.addExpectation(errorResponseReceivedExpectation);
tracer.reset(2);
serviceA.sendRequest(nodeB, "internal:testError", new StringMessageRequest(""), noopResponseHandler); serviceA.sendRequest(nodeB, "internal:testError", new StringMessageRequest(""), noopResponseHandler);
requestCompleted.acquire();
tracer.expectedEvents.get().await();
assertThat("saw request sent", tracer.sawRequestSent, equalTo(false));
assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true));
assertThat("saw response sent", tracer.sawResponseSent, equalTo(false));
assertThat("saw response received", tracer.sawResponseReceived, equalTo(false));
assertThat("didn't see error sent", tracer.sawErrorSent, equalTo(true));
}
private static class Tracer extends MockTransportService.Tracer { assertBusy(appender::assertAllExpectationsMatched);
private final Set<String> actions;
public volatile boolean sawRequestSent;
public volatile boolean sawRequestReceived;
public volatile boolean sawResponseSent;
public volatile boolean sawErrorSent;
public volatile boolean sawResponseReceived;
public AtomicReference<CountDownLatch> expectedEvents = new AtomicReference<>(); final String notSeenSent = "*[internal:testNotSeen]*sent to*";
final MockLogAppender.LoggingExpectation notSeenSentExpectation =
new MockLogAppender.UnseenEventExpectation(
"not seen request sent", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, notSeenSent);
final String notSeenReceived = ".*\\[internal:testNotSeen].*received request.*";
final MockLogAppender.LoggingExpectation notSeenReceivedExpectation =
new MockLogAppender.PatternSeenEventExcpectation(
"not seen request received", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, notSeenReceived);
Tracer(Set<String> actions) { appender.addExpectation(notSeenSentExpectation);
this.actions = actions; appender.addExpectation(notSeenReceivedExpectation);
}
@Override PlainTransportFuture<StringMessageResponse> future = new PlainTransportFuture<>(noopResponseHandler);
public void receivedRequest(long requestId, String action) { serviceA.sendRequest(nodeB, "internal:testNotSeen", new StringMessageRequest(""), future);
super.receivedRequest(requestId, action); future.txGet();
if (actions.contains(action)) {
sawRequestReceived = true;
expectedEvents.get().countDown();
}
}
@Override assertBusy(appender::assertAllExpectationsMatched);
public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { } finally {
super.requestSent(node, requestId, action, options); Loggers.removeAppender(LogManager.getLogger("org.elasticsearch.transport.TransportService.tracer"), appender);
if (actions.contains(action)) { appender.stop();
sawRequestSent = true;
expectedEvents.get().countDown();
} }
} }
@Override
public void responseSent(long requestId, String action) {
super.responseSent(requestId, action);
if (actions.contains(action)) {
sawResponseSent = true;
expectedEvents.get().countDown();
}
}
@Override
public void responseSent(long requestId, String action, Throwable t) {
super.responseSent(requestId, action, t);
if (actions.contains(action)) {
sawErrorSent = true;
expectedEvents.get().countDown();
}
}
@Override
public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
super.receivedResponse(requestId, sourceNode, action);
if (actions.contains(action)) {
sawResponseReceived = true;
expectedEvents.get().countDown();
}
}
public void reset(int expectedCount) {
sawRequestSent = false;
sawRequestReceived = false;
sawResponseSent = false;
sawErrorSent = false;
sawResponseReceived = false;
expectedEvents.set(new CountDownLatch(expectedCount));
}
}
public static class StringMessageRequest extends TransportRequest { public static class StringMessageRequest extends TransportRequest {
private String message; private String message;

View File

@ -868,7 +868,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
removeLeaseLatch.countDown(); removeLeaseLatch.countDown();
unfollowLatch.await(); unfollowLatch.await();
senderTransportService.transport().addMessageListener(new TransportMessageListener() { senderTransportService.addMessageListener(new TransportMessageListener() {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@Override @Override
@ -880,7 +880,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
new RetentionLeaseNotFoundException(retentionLeaseId); new RetentionLeaseNotFoundException(retentionLeaseId);
context.handler().handleException(new RemoteTransportException(e.getMessage(), e)); context.handler().handleException(new RemoteTransportException(e.getMessage(), e));
responseLatch.countDown(); responseLatch.countDown();
senderTransportService.transport().removeMessageListener(this); senderTransportService.removeMessageListener(this);
} }
} }