diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index b809cf5cb2b..3c73d6f83de 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -33,6 +33,7 @@ import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; /** * A runnable class that reads the autodetect process output in the @@ -61,6 +62,7 @@ public class AutoDetectResultProcessor { private final JobResultsPersister persister; final CountDownLatch completionLatch = new CountDownLatch(1); + final Semaphore updateModelSnapshotIdSemaphore = new Semaphore(1); private final FlushListener flushListener; /** @@ -193,14 +195,28 @@ public class AutoDetectResultProcessor { protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) { JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build(); UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(jobId, update); + + try { + // This blocks the main processing thread in the unlikely event + // there are 2 model snapshots queued up. But it also has the + // advantage of ensuring order + updateModelSnapshotIdSemaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.info("[{}] Interrupted acquiring update model snaphot semaphore", jobId); + return; + } + client.execute(UpdateJobAction.INSTANCE, updateRequest, new ActionListener() { @Override public void onResponse(PutJobAction.Response response) { + updateModelSnapshotIdSemaphore.release(); LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId()); } @Override public void onFailure(Exception e) { + updateModelSnapshotIdSemaphore.release(); LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" + modelSnapshot.getSnapshotId() + "]", e); } }); @@ -209,8 +225,13 @@ public class AutoDetectResultProcessor { public void awaitCompletion() { try { completionLatch.await(); + // Input stream has been completely processed at this point. + // Wait for any updateModelSnapshotIdOnJob calls to complete. + updateModelSnapshotIdSemaphore.acquire(); + updateModelSnapshotIdSemaphore.release(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + LOGGER.info("[{}] Interrupted waiting for results processor to complete", jobId); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index ffb82c2bc35..73b1ac8a672 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -291,4 +291,19 @@ public class AutoDetectResultProcessorTests extends ESTestCase { verifyNoMoreInteractions(persister); verifyNoMoreInteractions(renormalizer); } + + public void testAwaitCompletion() { + AutodetectResult autodetectResult = mock(AutodetectResult.class); + @SuppressWarnings("unchecked") + Iterator iterator = mock(Iterator.class); + when(iterator.hasNext()).thenReturn(true).thenReturn(false); + when(iterator.next()).thenReturn(autodetectResult); + AutodetectProcess process = mock(AutodetectProcess.class); + when(process.readAutodetectResults()).thenReturn(iterator); + processorUnderTest.process(process, randomBoolean()); + + processorUnderTest.awaitCompletion(); + assertEquals(0, processorUnderTest.completionLatch.getCount()); + assertEquals(1, processorUnderTest.updateModelSnapshotIdSemaphore.availablePermits()); + } }