Fix intermittent failure of renormalization (elastic/elasticsearch#558)

The synchronization was flawed

Original commit: elastic/x-pack-elasticsearch@a968c68c7d
This commit is contained in:
David Roberts 2016-12-16 13:45:47 +00:00 committed by GitHub
parent 5e6c63bc51
commit c43a9ba1dd
3 changed files with 86 additions and 50 deletions

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles; import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
import java.util.Deque; import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -26,39 +27,41 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
private final ScoresUpdater scoresUpdater; private final ScoresUpdater scoresUpdater;
private final ExecutorService executorService; private final ExecutorService executorService;
private final boolean isPerPartitionNormalization; private final boolean isPerPartitionNormalization;
private final Deque<Quantiles> quantilesDeque = new ConcurrentLinkedDeque<>(); private final Deque<QuantilesWithLatch> quantilesDeque = new ConcurrentLinkedDeque<>();
private final Deque<CountDownLatch> latchDeque = new ConcurrentLinkedDeque<>();
/** /**
* Each job may only have 1 normalization in progress at any time; the semaphore enforces this * Each job may only have 1 normalization in progress at any time; the semaphore enforces this
*/ */
private final Semaphore semaphore = new Semaphore(1); private final Semaphore semaphore = new Semaphore(1);
/**
* <code>null</code> means no normalization is in progress
*/
private CountDownLatch completionLatch;
public ShortCircuitingRenormalizer(String jobId, ScoresUpdater scoresUpdater, ExecutorService executorService, public ShortCircuitingRenormalizer(String jobId, ScoresUpdater scoresUpdater, ExecutorService executorService,
boolean isPerPartitionNormalization) boolean isPerPartitionNormalization) {
{
this.jobId = jobId; this.jobId = jobId;
this.scoresUpdater = scoresUpdater; this.scoresUpdater = scoresUpdater;
this.executorService = executorService; this.executorService = executorService;
this.isPerPartitionNormalization = isPerPartitionNormalization; this.isPerPartitionNormalization = isPerPartitionNormalization;
} }
public synchronized void renormalize(Quantiles quantiles) public void renormalize(Quantiles quantiles) {
{ // This will throw NPE if quantiles is null, so do it first
quantilesDeque.addLast(quantiles); QuantilesWithLatch quantilesWithLatch = new QuantilesWithLatch(quantiles, new CountDownLatch(1));
completionLatch = new CountDownLatch(1); // Needed to ensure work is not added while the tryFinishWork() method is running
executorService.submit(() -> doRenormalizations()); synchronized (quantilesDeque) {
// Must add to latchDeque before quantilesDeque
latchDeque.addLast(quantilesWithLatch.getLatch());
quantilesDeque.addLast(quantilesWithLatch);
executorService.submit(() -> doRenormalizations());
}
} }
public void waitUntilIdle() public void waitUntilIdle() {
{
try { try {
CountDownLatch latchToAwait = getCompletionLatch(); // We cannot tolerate more than one thread running this loop at any time,
while (latchToAwait != null) { // but need a different lock to the other synchronized parts of the code
latchToAwait.await(); synchronized (latchDeque) {
latchToAwait = getCompletionLatch(); for (CountDownLatch latchToAwait = latchDeque.pollFirst(); latchToAwait != null; latchToAwait = latchDeque.pollFirst()) {
latchToAwait.await();
}
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -66,44 +69,42 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
} }
} }
private synchronized CountDownLatch getCompletionLatch() {
return completionLatch;
}
private Quantiles getEarliestQuantiles() { private Quantiles getEarliestQuantiles() {
return quantilesDeque.pollFirst(); QuantilesWithLatch earliestQuantilesWithLatch = quantilesDeque.peekFirst();
return (earliestQuantilesWithLatch != null) ? earliestQuantilesWithLatch.getQuantiles() : null;
} }
private Quantiles getLatestQuantilesAndClear() { private QuantilesWithLatch getLatestQuantilesWithLatchAndClear() {
// We discard all but the latest quantiles // We discard all but the latest quantiles
Quantiles latestQuantiles = null; QuantilesWithLatch latestQuantilesWithLatch = null;
for (Quantiles quantiles = quantilesDeque.pollFirst(); quantiles != null; quantiles = quantilesDeque.pollFirst()) { for (QuantilesWithLatch quantilesWithLatch = quantilesDeque.pollFirst(); quantilesWithLatch != null;
latestQuantiles = quantiles; quantilesWithLatch = quantilesDeque.pollFirst()) {
// Count down the latches associated with any discarded quantiles
if (latestQuantilesWithLatch != null) {
latestQuantilesWithLatch.getLatch().countDown();
}
latestQuantilesWithLatch = quantilesWithLatch;
} }
return latestQuantiles; return latestQuantilesWithLatch;
} }
private synchronized boolean tryStartWork() { private boolean tryStartWork() {
return semaphore.tryAcquire(); return semaphore.tryAcquire();
} }
private synchronized boolean tryFinishWork() { private boolean tryFinishWork() {
if (!quantilesDeque.isEmpty()) { // We cannot tolerate new work being added in between the isEmpty() check and releasing the semaphore
return false; synchronized (quantilesDeque) {
if (!quantilesDeque.isEmpty()) {
return false;
}
semaphore.release();
return true;
} }
semaphore.release();
if (completionLatch != null) {
completionLatch.countDown();
completionLatch = null;
}
return true;
} }
private synchronized void forceFinishWork() { private void forceFinishWork() {
semaphore.release(); semaphore.release();
if (completionLatch != null) {
completionLatch.countDown();
}
} }
private void doRenormalizations() { private void doRenormalizations() {
@ -112,15 +113,18 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
return; return;
} }
CountDownLatch latch = null;
try { try {
do { do {
// Note that if there is only one set of quantiles in the queue then both these references will point to the same quantiles. // Note that if there is only one set of quantiles in the queue then both these references will point to the same quantiles.
Quantiles earliestQuantiles = getEarliestQuantiles(); Quantiles earliestQuantiles = getEarliestQuantiles();
Quantiles latestQuantiles = getLatestQuantilesAndClear(); QuantilesWithLatch latestQuantilesWithLatch = getLatestQuantilesWithLatchAndClear();
// We could end up with latestQuantiles being null if the thread running this method was // We could end up with latestQuantilesWithLatch being null if the thread running this method
// preempted before the tryStartWork() call, another thread already running this method // was preempted before the tryStartWork() call, another thread already running this method
// did the work and exited, and then this thread got true returned by tryStartWork(). // did the work and exited, and then this thread got true returned by tryStartWork().
if (latestQuantiles != null) { if (latestQuantilesWithLatch != null) {
Quantiles latestQuantiles = latestQuantilesWithLatch.getQuantiles();
latch = latestQuantilesWithLatch.getLatch();
// We could end up with earliestQuantiles being null if quantiles were // We could end up with earliestQuantiles being null if quantiles were
// added between getting the earliest and latest quantiles. // added between getting the earliest and latest quantiles.
if (earliestQuantiles == null) { if (earliestQuantiles == null) {
@ -132,18 +136,46 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
// over the time ranges implied by all quantiles that were provided. // over the time ranges implied by all quantiles that were provided.
long windowExtensionMs = latestBucketTimeMs - earliestBucketTimeMs; long windowExtensionMs = latestBucketTimeMs - earliestBucketTimeMs;
if (windowExtensionMs < 0) { if (windowExtensionMs < 0) {
LOGGER.warn("[{}] Quantiles not supplied in order - {} after {}", LOGGER.warn("[{}] Quantiles not supplied in time order - {} after {}",
jobId, latestBucketTimeMs, earliestBucketTimeMs); jobId, latestBucketTimeMs, earliestBucketTimeMs);
windowExtensionMs = 0;
} }
scoresUpdater.update(latestQuantiles.getQuantileState(), latestBucketTimeMs, windowExtensionMs, scoresUpdater.update(latestQuantiles.getQuantileState(), latestBucketTimeMs, windowExtensionMs,
isPerPartitionNormalization); isPerPartitionNormalization);
latch.countDown();
latch = null;
} }
// Loop if more work has become available while we were working, because the // Loop if more work has become available while we were working, because the
// tasks originally submitted to do that work will have exited early. // tasks originally submitted to do that work will have exited early.
} while (tryFinishWork() == false); } while (tryFinishWork() == false);
} catch (RuntimeException e) { } catch (RuntimeException e) {
LOGGER.error("[" + jobId + "] Normalization failed", e);
if (latch != null) {
latch.countDown();
}
forceFinishWork(); forceFinishWork();
throw e; throw e;
} }
} }
/**
* Simple grouping of a {@linkplain Quantiles} object with its corresponding {@linkplain CountDownLatch} object.
*/
private static class QuantilesWithLatch {
private final Quantiles quantiles;
private final CountDownLatch latch;
QuantilesWithLatch(Quantiles quantiles, CountDownLatch latch) {
this.quantiles = Objects.requireNonNull(quantiles);
this.latch = Objects.requireNonNull(latch);
}
Quantiles getQuantiles() {
return quantiles;
}
CountDownLatch getLatch() {
return latch;
}
}
} }

View File

@ -17,6 +17,7 @@ import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
@ -45,7 +46,7 @@ public class NormalizerTests extends ESTestCase {
return influencer; return influencer;
} }
public void testNormalize() throws IOException { public void testNormalize() throws IOException, InterruptedException {
ExecutorService threadpool = Executors.newScheduledThreadPool(1); ExecutorService threadpool = Executors.newScheduledThreadPool(1);
try { try {
NormalizerProcessFactory processFactory = mock(NormalizerProcessFactory.class); NormalizerProcessFactory processFactory = mock(NormalizerProcessFactory.class);
@ -69,5 +70,6 @@ public class NormalizerTests extends ESTestCase {
} finally { } finally {
threadpool.shutdown(); threadpool.shutdown();
} }
assertTrue(threadpool.awaitTermination(1, TimeUnit.SECONDS));
} }
} }

View File

@ -13,6 +13,7 @@ import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
@ -26,7 +27,7 @@ public class ShortCircuitingRenormalizerTests extends ESTestCase {
// Never reduce this below 4, otherwise some of the logic in the test will break // Never reduce this below 4, otherwise some of the logic in the test will break
private static final int TEST_SIZE = 1000; private static final int TEST_SIZE = 1000;
public void testNormalize() { public void testNormalize() throws InterruptedException {
ExecutorService threadpool = Executors.newScheduledThreadPool(10); ExecutorService threadpool = Executors.newScheduledThreadPool(10);
try { try {
ScoresUpdater scoresUpdater = mock(ScoresUpdater.class); ScoresUpdater scoresUpdater = mock(ScoresUpdater.class);
@ -73,5 +74,6 @@ public class ShortCircuitingRenormalizerTests extends ESTestCase {
} finally { } finally {
threadpool.shutdown(); threadpool.shutdown();
} }
assertTrue(threadpool.awaitTermination(1, TimeUnit.SECONDS));
} }
} }