Simplify flush listener by using computeIfAbsent(...)
Make sure CountDownLatch gets removed when it is no longer needed Also add CountDownLatch is it is missing when we ack a flush id, we may ack before we wait for it Original commit: elastic/x-pack-elasticsearch@83a993b9ad
This commit is contained in:
parent
7f6907da8b
commit
fd743cbfc6
|
@ -22,25 +22,23 @@ class FlushListener {
|
|||
return false;
|
||||
}
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
CountDownLatch previous = awaitingFlushed.putIfAbsent(flushId, latch);
|
||||
if (previous != null) {
|
||||
latch = previous;
|
||||
}
|
||||
CountDownLatch latch = awaitingFlushed.computeIfAbsent(flushId, (key) -> new CountDownLatch(1));
|
||||
try {
|
||||
return latch.await(timeout, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return false;
|
||||
} finally {
|
||||
// the flush id will no longer be used from this point, so we can remove it.
|
||||
awaitingFlushed.remove(flushId);
|
||||
}
|
||||
}
|
||||
|
||||
void acknowledgeFlush(String flushId) {
|
||||
CountDownLatch latch = awaitingFlushed.get(flushId);
|
||||
if (latch == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// acknowledgeFlush(...) could be called before waitForFlush(...)
|
||||
// a flush api call writes a flush command to the analytical process and then via a different thread the
|
||||
// result reader then reads whether the flush has been acked.
|
||||
CountDownLatch latch = awaitingFlushed.computeIfAbsent(flushId, (key) -> new CountDownLatch(1));
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ public class FlushListenerTests extends ESTestCase {
|
|||
assertFalse(bool.get());
|
||||
listener.acknowledgeFlush("_id");
|
||||
assertBusy(() -> assertTrue(bool.get()));
|
||||
assertEquals(0, listener.awaitingFlushed.size());
|
||||
}
|
||||
|
||||
public void testClear() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue