From c43a9ba1ddd89baaa97aec133ef8e2081545f813 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 16 Dec 2016 13:45:47 +0000 Subject: [PATCH] Fix intermittent failure of renormalization (elastic/elasticsearch#558) The synchronization was flawed Original commit: elastic/x-pack-elasticsearch@a968c68c7df399d5b0a1e13a07ec720b1e4833de --- .../ShortCircuitingRenormalizer.java | 126 +++++++++++------- .../process/normalizer/NormalizerTests.java | 6 +- .../ShortCircuitingRenormalizerTests.java | 4 +- 3 files changed, 86 insertions(+), 50 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ShortCircuitingRenormalizer.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ShortCircuitingRenormalizer.java index c3780eb08d6..82df98b4c23 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ShortCircuitingRenormalizer.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ShortCircuitingRenormalizer.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles; import java.util.Deque; +import java.util.Objects; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -26,39 +27,41 @@ public class ShortCircuitingRenormalizer implements Renormalizer { private final ScoresUpdater scoresUpdater; private final ExecutorService executorService; private final boolean isPerPartitionNormalization; - private final Deque quantilesDeque = new ConcurrentLinkedDeque<>(); + private final Deque quantilesDeque = new ConcurrentLinkedDeque<>(); + private final Deque latchDeque = new ConcurrentLinkedDeque<>(); /** * Each job may only have 1 normalization in progress at any time; the semaphore enforces this */ private final Semaphore semaphore = new Semaphore(1); - /** - * null means no normalization is in progress - */ - private CountDownLatch completionLatch; public ShortCircuitingRenormalizer(String jobId, ScoresUpdater scoresUpdater, ExecutorService executorService, - boolean isPerPartitionNormalization) - { + boolean isPerPartitionNormalization) { this.jobId = jobId; this.scoresUpdater = scoresUpdater; this.executorService = executorService; this.isPerPartitionNormalization = isPerPartitionNormalization; } - public synchronized void renormalize(Quantiles quantiles) - { - quantilesDeque.addLast(quantiles); - completionLatch = new CountDownLatch(1); - executorService.submit(() -> doRenormalizations()); + public void renormalize(Quantiles quantiles) { + // This will throw NPE if quantiles is null, so do it first + QuantilesWithLatch quantilesWithLatch = new QuantilesWithLatch(quantiles, new CountDownLatch(1)); + // Needed to ensure work is not added while the tryFinishWork() method is running + synchronized (quantilesDeque) { + // Must add to latchDeque before quantilesDeque + latchDeque.addLast(quantilesWithLatch.getLatch()); + quantilesDeque.addLast(quantilesWithLatch); + executorService.submit(() -> doRenormalizations()); + } } - public void waitUntilIdle() - { + public void waitUntilIdle() { try { - CountDownLatch latchToAwait = getCompletionLatch(); - while (latchToAwait != null) { - latchToAwait.await(); - latchToAwait = getCompletionLatch(); + // We cannot tolerate more than one thread running this loop at any time, + // but need a different lock to the other synchronized parts of the code + synchronized (latchDeque) { + for (CountDownLatch latchToAwait = latchDeque.pollFirst(); latchToAwait != null; latchToAwait = latchDeque.pollFirst()) { + latchToAwait.await(); + } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -66,44 +69,42 @@ public class ShortCircuitingRenormalizer implements Renormalizer { } } - private synchronized CountDownLatch getCompletionLatch() { - return completionLatch; - } - private Quantiles getEarliestQuantiles() { - return quantilesDeque.pollFirst(); + QuantilesWithLatch earliestQuantilesWithLatch = quantilesDeque.peekFirst(); + return (earliestQuantilesWithLatch != null) ? earliestQuantilesWithLatch.getQuantiles() : null; } - private Quantiles getLatestQuantilesAndClear() { + private QuantilesWithLatch getLatestQuantilesWithLatchAndClear() { // We discard all but the latest quantiles - Quantiles latestQuantiles = null; - for (Quantiles quantiles = quantilesDeque.pollFirst(); quantiles != null; quantiles = quantilesDeque.pollFirst()) { - latestQuantiles = quantiles; + QuantilesWithLatch latestQuantilesWithLatch = null; + for (QuantilesWithLatch quantilesWithLatch = quantilesDeque.pollFirst(); quantilesWithLatch != null; + quantilesWithLatch = quantilesDeque.pollFirst()) { + // Count down the latches associated with any discarded quantiles + if (latestQuantilesWithLatch != null) { + latestQuantilesWithLatch.getLatch().countDown(); + } + latestQuantilesWithLatch = quantilesWithLatch; } - return latestQuantiles; + return latestQuantilesWithLatch; } - private synchronized boolean tryStartWork() { + private boolean tryStartWork() { return semaphore.tryAcquire(); } - private synchronized boolean tryFinishWork() { - if (!quantilesDeque.isEmpty()) { - return false; + private boolean tryFinishWork() { + // We cannot tolerate new work being added in between the isEmpty() check and releasing the semaphore + synchronized (quantilesDeque) { + if (!quantilesDeque.isEmpty()) { + return false; + } + semaphore.release(); + return true; } - semaphore.release(); - if (completionLatch != null) { - completionLatch.countDown(); - completionLatch = null; - } - return true; } - private synchronized void forceFinishWork() { + private void forceFinishWork() { semaphore.release(); - if (completionLatch != null) { - completionLatch.countDown(); - } } private void doRenormalizations() { @@ -112,15 +113,18 @@ public class ShortCircuitingRenormalizer implements Renormalizer { return; } + CountDownLatch latch = null; try { do { // Note that if there is only one set of quantiles in the queue then both these references will point to the same quantiles. Quantiles earliestQuantiles = getEarliestQuantiles(); - Quantiles latestQuantiles = getLatestQuantilesAndClear(); - // We could end up with latestQuantiles being null if the thread running this method was - // preempted before the tryStartWork() call, another thread already running this method + QuantilesWithLatch latestQuantilesWithLatch = getLatestQuantilesWithLatchAndClear(); + // We could end up with latestQuantilesWithLatch being null if the thread running this method + // was preempted before the tryStartWork() call, another thread already running this method // did the work and exited, and then this thread got true returned by tryStartWork(). - if (latestQuantiles != null) { + if (latestQuantilesWithLatch != null) { + Quantiles latestQuantiles = latestQuantilesWithLatch.getQuantiles(); + latch = latestQuantilesWithLatch.getLatch(); // We could end up with earliestQuantiles being null if quantiles were // added between getting the earliest and latest quantiles. if (earliestQuantiles == null) { @@ -132,18 +136,46 @@ public class ShortCircuitingRenormalizer implements Renormalizer { // over the time ranges implied by all quantiles that were provided. long windowExtensionMs = latestBucketTimeMs - earliestBucketTimeMs; if (windowExtensionMs < 0) { - LOGGER.warn("[{}] Quantiles not supplied in order - {} after {}", + LOGGER.warn("[{}] Quantiles not supplied in time order - {} after {}", jobId, latestBucketTimeMs, earliestBucketTimeMs); + windowExtensionMs = 0; } scoresUpdater.update(latestQuantiles.getQuantileState(), latestBucketTimeMs, windowExtensionMs, isPerPartitionNormalization); + latch.countDown(); + latch = null; } // Loop if more work has become available while we were working, because the // tasks originally submitted to do that work will have exited early. } while (tryFinishWork() == false); } catch (RuntimeException e) { + LOGGER.error("[" + jobId + "] Normalization failed", e); + if (latch != null) { + latch.countDown(); + } forceFinishWork(); throw e; } } + + /** + * Simple grouping of a {@linkplain Quantiles} object with its corresponding {@linkplain CountDownLatch} object. + */ + private static class QuantilesWithLatch { + private final Quantiles quantiles; + private final CountDownLatch latch; + + QuantilesWithLatch(Quantiles quantiles, CountDownLatch latch) { + this.quantiles = Objects.requireNonNull(quantiles); + this.latch = Objects.requireNonNull(latch); + } + + Quantiles getQuantiles() { + return quantiles; + } + + CountDownLatch getLatch() { + return latch; + } + } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerTests.java index 2a77132e621..9ce94bb4005 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerTests.java @@ -17,6 +17,7 @@ import java.util.Deque; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.mockito.Matchers.any; @@ -45,7 +46,7 @@ public class NormalizerTests extends ESTestCase { return influencer; } - public void testNormalize() throws IOException { + public void testNormalize() throws IOException, InterruptedException { ExecutorService threadpool = Executors.newScheduledThreadPool(1); try { NormalizerProcessFactory processFactory = mock(NormalizerProcessFactory.class); @@ -69,5 +70,6 @@ public class NormalizerTests extends ESTestCase { } finally { threadpool.shutdown(); } + assertTrue(threadpool.awaitTermination(1, TimeUnit.SECONDS)); } -} \ No newline at end of file +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ShortCircuitingRenormalizerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ShortCircuitingRenormalizerTests.java index 6a77dff95aa..2f66fbb0354 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ShortCircuitingRenormalizerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ShortCircuitingRenormalizerTests.java @@ -13,6 +13,7 @@ import java.util.Date; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; @@ -26,7 +27,7 @@ public class ShortCircuitingRenormalizerTests extends ESTestCase { // Never reduce this below 4, otherwise some of the logic in the test will break private static final int TEST_SIZE = 1000; - public void testNormalize() { + public void testNormalize() throws InterruptedException { ExecutorService threadpool = Executors.newScheduledThreadPool(10); try { ScoresUpdater scoresUpdater = mock(ScoresUpdater.class); @@ -73,5 +74,6 @@ public class ShortCircuitingRenormalizerTests extends ESTestCase { } finally { threadpool.shutdown(); } + assertTrue(threadpool.awaitTermination(1, TimeUnit.SECONDS)); } }