[TEST] Use a semaphore to block unitl all in-flight requests are released
This commit is contained in:
parent
814c7224f9
commit
067ca1f996
|
@ -558,13 +558,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
public void testTimeoutSendExceptionWithDelayedResponse() throws Exception {
|
||||
CountDownLatch waitForever = new CountDownLatch(1);
|
||||
CountDownLatch doneWaitingForever = new CountDownLatch(1);
|
||||
AtomicInteger inFlight = new AtomicInteger(0);
|
||||
Semaphore inFlight = new Semaphore(Integer.MAX_VALUE);
|
||||
serviceA.registerRequestHandler("sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) throws InterruptedException {
|
||||
String message = request.message;
|
||||
inFlight.incrementAndGet();
|
||||
inFlight.acquireUninterruptibly();
|
||||
try {
|
||||
if ("forever".equals(message)) {
|
||||
waitForever.await();
|
||||
|
@ -579,7 +579,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
fail(e.getMessage());
|
||||
}
|
||||
} finally {
|
||||
inFlight.decrementAndGet();
|
||||
inFlight.release();
|
||||
if ("forever".equals(message)) {
|
||||
doneWaitingForever.countDown();
|
||||
}
|
||||
|
@ -661,7 +661,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
serviceA.removeHandler("sayHelloTimeoutDelayedResponse");
|
||||
waitForever.countDown();
|
||||
doneWaitingForever.await();
|
||||
assertEquals(0, inFlight.get());
|
||||
assertTrue(inFlight.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@TestLogging(value = "test. transport.tracer:TRACE")
|
||||
|
|
Loading…
Reference in New Issue