[TEST] Make AbstractSimpleTransportTestCase#testTimeoutSendExceptionWithDelayedResponse more robust and wait for in-flight request

This commit is contained in:
Simon Willnauer 2016-07-12 20:41:37 +02:00
parent 075cb970c0
commit ec55f9fff7

View File

@ -37,13 +37,16 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
@ -553,17 +556,21 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
} }
public void testTimeoutSendExceptionWithDelayedResponse() throws Exception { public void testTimeoutSendExceptionWithDelayedResponse() throws Exception {
CountDownLatch doneLatch = new CountDownLatch(1); CountDownLatch waitForever = new CountDownLatch(1);
CountDownLatch allResponded = new CountDownLatch(1); CountDownLatch doneWaitingForever = new CountDownLatch(1);
AtomicInteger inFlight = new AtomicInteger(0);
serviceA.registerRequestHandler("sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC, serviceA.registerRequestHandler("sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<StringMessageRequest>() { new TransportRequestHandler<StringMessageRequest>() {
@Override @Override
public void messageReceived(StringMessageRequest request, TransportChannel channel) { public void messageReceived(StringMessageRequest request, TransportChannel channel) throws InterruptedException {
TimeValue sleep = TimeValue.parseTimeValue(request.message, null, "sleep"); inFlight.incrementAndGet();
try { try {
doneLatch.await(sleep.millis(), TimeUnit.MILLISECONDS); String message = request.message;
} catch (InterruptedException e) { if ("forever".equals(message)) {
// ignore waitForever.await();
} else {
TimeValue sleep = TimeValue.parseTimeValue(message, null, "sleep");
Thread.sleep(sleep.millis());
} }
try { try {
channel.sendResponse(new StringMessageResponse("hello " + request.message)); channel.sendResponse(new StringMessageResponse("hello " + request.message));
@ -571,13 +578,18 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
logger.error("Unexpected failure", e); logger.error("Unexpected failure", e);
fail(e.getMessage()); fail(e.getMessage());
} finally { } finally {
allResponded.countDown(); if ("forever".equals(message)) {
doneWaitingForever.countDown();
}
}
} finally {
inFlight.decrementAndGet();
} }
} }
}); });
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse", TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse",
new StringMessageRequest("2m"), TransportRequestOptions.builder().withTimeout(100).build(), new StringMessageRequest("forever"), TransportRequestOptions.builder().withTimeout(100).build(),
new TransportResponseHandler<StringMessageResponse>() { new TransportResponseHandler<StringMessageResponse>() {
@Override @Override
public StringMessageResponse newInstance() { public StringMessageResponse newInstance() {
@ -603,17 +615,18 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}); });
try { try {
StringMessageResponse message = res.txGet(); res.txGet();
fail("exception should be thrown"); fail("exception should be thrown");
} catch (Exception e) { } catch (Exception e) {
assertThat(e, instanceOf(ReceiveTimeoutTransportException.class)); assertThat(e, instanceOf(ReceiveTimeoutTransportException.class));
} }
latch.await(); latch.await();
List<Runnable> assertions = new ArrayList<>();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
final int counter = i; final int counter = i;
// now, try and send another request, this times, with a short timeout // now, try and send another request, this times, with a short timeout
res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse", TransportFuture<StringMessageResponse> result = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse",
new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(), new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(),
new TransportResponseHandler<StringMessageResponse>() { new TransportResponseHandler<StringMessageResponse>() {
@Override @Override
@ -638,13 +651,18 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
} }
}); });
StringMessageResponse message = res.txGet(); assertions.add(() -> {
StringMessageResponse message = result.txGet();
assertThat(message.message, equalTo("hello " + counter + "ms")); assertThat(message.message, equalTo("hello " + counter + "ms"));
});
}
for (Runnable runnable : assertions) {
runnable.run();
} }
serviceA.removeHandler("sayHelloTimeoutDelayedResponse"); serviceA.removeHandler("sayHelloTimeoutDelayedResponse");
doneLatch.countDown(); waitForever.countDown();
allResponded.await(); doneWaitingForever.await();
assertEquals(0, inFlight.get());
} }
@TestLogging(value = "test. transport.tracer:TRACE") @TestLogging(value = "test. transport.tracer:TRACE")