Filter actions to trace in test

Notifications for request tracing are invoked concurrently and can still
be in flight once a tracer is installed in the test. This can lead to side-effects
since the test relied on exact invocations. This commit adds action filtering
to the test tracer to only count invocations for the relevant actions.

Closes #22418
This commit is contained in:
Simon Willnauer 2017-01-03 23:38:35 +01:00
parent c6ddff757e
commit c6573e6e56
1 changed files with 30 additions and 18 deletions

View File

@ -19,7 +19,6 @@
package org.elasticsearch.transport; package org.elasticsearch.transport;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
@ -32,7 +31,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -44,7 +42,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -58,8 +55,10 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -836,7 +835,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
assertTrue(inFlight.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS)); assertTrue(inFlight.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
} }
@TestLogging("org.elasticsearch.transport:DEBUG")
public void testTracerLog() throws InterruptedException { public void testTracerLog() throws InterruptedException {
TransportRequestHandler handler = (request, channel) -> channel.sendResponse(new StringMessageResponse("")); TransportRequestHandler handler = (request, channel) -> channel.sendResponse(new StringMessageResponse(""));
TransportRequestHandler handlerWithError = new TransportRequestHandler<StringMessageRequest>() { TransportRequestHandler handlerWithError = new TransportRequestHandler<StringMessageRequest>() {
@ -879,7 +877,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceB.registerRequestHandler("test", StringMessageRequest::new, ThreadPool.Names.SAME, handler); serviceB.registerRequestHandler("test", StringMessageRequest::new, ThreadPool.Names.SAME, handler);
serviceB.registerRequestHandler("testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError); serviceB.registerRequestHandler("testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError);
final Tracer tracer = new Tracer(); final Tracer tracer = new Tracer(new HashSet<>(Arrays.asList("test", "testError")));
// the tracer is invoked concurrently after the actual action is executed. that means a Tracer#requestSent can still be in-flight
// from a handshake executed on connect in the setup method. this might confuse this test since it expects exact number of
// invocations. To prevent any unrelated events messing with this test we filter on the actions we execute in this test.
serviceA.addTracer(tracer); serviceA.addTracer(tracer);
serviceB.addTracer(tracer); serviceB.addTracer(tracer);
@ -943,6 +944,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
} }
private static class Tracer extends MockTransportService.Tracer { private static class Tracer extends MockTransportService.Tracer {
private final Set<String> actions;
public volatile boolean sawRequestSent; public volatile boolean sawRequestSent;
public volatile boolean sawRequestReceived; public volatile boolean sawRequestReceived;
public volatile boolean sawResponseSent; public volatile boolean sawResponseSent;
@ -950,42 +952,52 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public volatile boolean sawResponseReceived; public volatile boolean sawResponseReceived;
public AtomicReference<CountDownLatch> expectedEvents = new AtomicReference<>(); public AtomicReference<CountDownLatch> expectedEvents = new AtomicReference<>();
private final Logger logger = Loggers.getLogger(getClass()); public Tracer(Set<String> actions) {
this.actions = actions;
}
@Override @Override
public void receivedRequest(long requestId, String action) { public void receivedRequest(long requestId, String action) {
super.receivedRequest(requestId, action); super.receivedRequest(requestId, action);
sawRequestReceived = true; if (actions.contains(action)) {
expectedEvents.get().countDown(); sawRequestReceived = true;
expectedEvents.get().countDown();
}
} }
@Override @Override
public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
super.requestSent(node, requestId, action, options); super.requestSent(node, requestId, action, options);
sawRequestSent = true; if (actions.contains(action)) {
expectedEvents.get().countDown(); sawRequestSent = true;
expectedEvents.get().countDown();
}
} }
@Override @Override
public void responseSent(long requestId, String action) { public void responseSent(long requestId, String action) {
super.responseSent(requestId, action); super.responseSent(requestId, action);
logger.debug("#### responseSent for action: {}", action); if (actions.contains(action)) {
sawResponseSent = true; sawResponseSent = true;
expectedEvents.get().countDown(); expectedEvents.get().countDown();
}
} }
@Override @Override
public void responseSent(long requestId, String action, Throwable t) { public void responseSent(long requestId, String action, Throwable t) {
super.responseSent(requestId, action, t); super.responseSent(requestId, action, t);
sawErrorSent = true; if (actions.contains(action)) {
expectedEvents.get().countDown(); sawErrorSent = true;
expectedEvents.get().countDown();
}
} }
@Override @Override
public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) { public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
super.receivedResponse(requestId, sourceNode, action); super.receivedResponse(requestId, sourceNode, action);
sawResponseReceived = true; if (actions.contains(action)) {
expectedEvents.get().countDown(); sawResponseReceived = true;
expectedEvents.get().countDown();
}
} }
public void reset(int expectedCount) { public void reset(int expectedCount) {