Traces in testAdapterSendReceiveCallbacks should only listen the relevant actions

The traces callback is only called after responses are set. This can lead to concurrent issues where the trace is notified of previously sent responses if it was added after the response was sent (enabling further execution of the test) but before the tracer call backs are called.
This commit is contained in:
Boaz Leskes 2017-02-12 09:13:49 +02:00
parent 0f21ed5b70
commit 6a8ef0ea74
1 changed files with 46 additions and 55 deletions

View File

@ -67,20 +67,19 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
@ -377,48 +376,52 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
fail(e.getMessage());
}
};
serviceA.registerRequestHandler("action", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
final String ACTION = "action";
serviceA.registerRequestHandler(ACTION, TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
requestHandler);
serviceB.registerRequestHandler("action", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
serviceB.registerRequestHandler(ACTION, TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
requestHandler);
class CountingTracer extends MockTransportService.Tracer {
List<String> requestsReceived = new CopyOnWriteArrayList<>();
List<String> requestsSent = new CopyOnWriteArrayList<>();
List<String> responseReceived = new CopyOnWriteArrayList<>();
List<String> responseSent = new CopyOnWriteArrayList<>();
class CountingTracer extends MockTransportService.Tracer {
AtomicInteger requestsReceived = new AtomicInteger();
AtomicInteger requestsSent = new AtomicInteger();
AtomicInteger responseReceived = new AtomicInteger();
AtomicInteger responseSent = new AtomicInteger();
@Override
public void receivedRequest(long requestId, String action) {
requestsReceived.add(requestId + ":" + action);
if (action.equals(ACTION)) {
requestsReceived.incrementAndGet();
}
}
@Override
public void responseSent(long requestId, String action) {
responseSent.add(requestId + ":" + action);
if (action.equals(ACTION)) {
responseSent.incrementAndGet();
}
}
@Override
public void responseSent(long requestId, String action, Throwable t) {
responseSent.add(requestId + ":" + action + ":" + t);
if (action.equals(ACTION)) {
responseSent.incrementAndGet();
}
}
@Override
public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
responseReceived.add(sourceNode + ":" + requestId + ":" + action);
if (action.equals(ACTION)) {
responseReceived.incrementAndGet();
}
}
@Override
public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
requestsSent.add(node + ":" + requestId + ":" + action);
}
public void logRequests(String name) {
logger.info("{}: requestsSent {}", name, requestsSent);
logger.info("{}: requestsReceived {}", name, requestsReceived);
logger.info("{}: responseSent {}", name, responseSent);
logger.info("{}: responseReceived {}", name, responseReceived);
if (action.equals(ACTION)) {
requestsSent.incrementAndGet();
}
}
}
final CountingTracer tracerA = new CountingTracer();
@ -428,55 +431,43 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
try {
serviceA
.submitRequest(nodeB, "action", TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get();
.submitRequest(nodeB, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(ElasticsearchException.class));
assertThat(ExceptionsHelper.unwrapCause(e.getCause()).getMessage(), equalTo("simulated"));
}
// use assert busy as call backs are sometime called after the response have been sent
try {
assertBusy(() -> {
assertThat(tracerA.requestsReceived, empty());
assertThat(tracerA.requestsSent, hasSize(1));
assertThat(tracerA.responseReceived, hasSize(1));
assertThat(tracerA.responseSent, empty());
assertThat(tracerB.requestsReceived, hasSize(1));
assertThat(tracerB.requestsSent, empty());
assertThat(tracerB.responseReceived, empty());
assertThat(tracerB.responseSent, hasSize(1));
});
} catch (AssertionError e) {
tracerA.logRequests("tracerA");
tracerB.logRequests("tracerB");
throw e;
}
assertBusy(() -> {
assertThat(tracerA.requestsReceived.get(), equalTo(0));
assertThat(tracerA.requestsSent.get(), equalTo(1));
assertThat(tracerA.responseReceived.get(), equalTo(1));
assertThat(tracerA.responseSent.get(), equalTo(0));
assertThat(tracerB.requestsReceived.get(), equalTo(1));
assertThat(tracerB.requestsSent.get(), equalTo(0));
assertThat(tracerB.responseReceived.get(), equalTo(0));
assertThat(tracerB.responseSent.get(), equalTo(1));
});
try {
serviceA
.submitRequest(nodeA, "action", TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get();
.submitRequest(nodeA, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(ElasticsearchException.class));
assertThat(ExceptionsHelper.unwrapCause(e.getCause()).getMessage(), equalTo("simulated"));
}
// use assert busy as call backs are sometime called after the response have been sent
try {
assertBusy(() -> {
assertThat(tracerA.requestsReceived, hasSize(1));
assertThat(tracerA.requestsSent, hasSize(2));
assertThat(tracerA.responseReceived, hasSize(2));
assertThat(tracerA.responseSent, hasSize(1));
assertThat(tracerB.requestsReceived, hasSize(1));
assertThat(tracerB.requestsSent, hasSize(0));
assertThat(tracerB.responseReceived, hasSize(0));
assertThat(tracerB.responseSent, hasSize(1));
});
} catch (AssertionError e) {
tracerA.logRequests("tracerA");
tracerB.logRequests("tracerB");
throw e;
}
assertBusy(() -> {
assertThat(tracerA.requestsReceived.get(), equalTo(1));
assertThat(tracerA.requestsSent.get(), equalTo(2));
assertThat(tracerA.responseReceived.get(), equalTo(2));
assertThat(tracerA.responseSent.get(), equalTo(1));
assertThat(tracerB.requestsReceived.get(), equalTo(1));
assertThat(tracerB.requestsSent.get(), equalTo(0));
assertThat(tracerB.responseReceived.get(), equalTo(0));
assertThat(tracerB.responseSent.get(), equalTo(1));
});
}
public void testVoidMessageCompressed() {