[ML] Wait for model snapshot update when closing the results processor (elastic/x-pack-elasticsearch#916)

* [ML] Wait for model snapshot update when closing the results processor

* Review comments

Original commit: elastic/x-pack-elasticsearch@eeb6c3b8d8
This commit is contained in:
David Kyle 2017-04-03 10:01:21 +01:00 committed by GitHub
parent 73ddc2323b
commit 30745b8dd7
2 changed files with 36 additions and 0 deletions

View File

@ -33,6 +33,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
/** /**
* A runnable class that reads the autodetect process output in the * A runnable class that reads the autodetect process output in the
@ -61,6 +62,7 @@ public class AutoDetectResultProcessor {
private final JobResultsPersister persister; private final JobResultsPersister persister;
final CountDownLatch completionLatch = new CountDownLatch(1); final CountDownLatch completionLatch = new CountDownLatch(1);
final Semaphore updateModelSnapshotIdSemaphore = new Semaphore(1);
private final FlushListener flushListener; private final FlushListener flushListener;
/** /**
@ -193,14 +195,28 @@ public class AutoDetectResultProcessor {
protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) { protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) {
JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build(); JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build();
UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(jobId, update); UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(jobId, update);
try {
// This blocks the main processing thread in the unlikely event
// there are 2 model snapshots queued up. But it also has the
// advantage of ensuring order
updateModelSnapshotIdSemaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.info("[{}] Interrupted acquiring update model snaphot semaphore", jobId);
return;
}
client.execute(UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>() { client.execute(UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>() {
@Override @Override
public void onResponse(PutJobAction.Response response) { public void onResponse(PutJobAction.Response response) {
updateModelSnapshotIdSemaphore.release();
LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId()); LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId());
} }
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
updateModelSnapshotIdSemaphore.release();
LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" + modelSnapshot.getSnapshotId() + "]", e); LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" + modelSnapshot.getSnapshotId() + "]", e);
} }
}); });
@ -209,8 +225,13 @@ public class AutoDetectResultProcessor {
public void awaitCompletion() { public void awaitCompletion() {
try { try {
completionLatch.await(); completionLatch.await();
// Input stream has been completely processed at this point.
// Wait for any updateModelSnapshotIdOnJob calls to complete.
updateModelSnapshotIdSemaphore.acquire();
updateModelSnapshotIdSemaphore.release();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
LOGGER.info("[{}] Interrupted waiting for results processor to complete", jobId);
} }
} }

View File

@ -291,4 +291,19 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verifyNoMoreInteractions(persister); verifyNoMoreInteractions(persister);
verifyNoMoreInteractions(renormalizer); verifyNoMoreInteractions(renormalizer);
} }
public void testAwaitCompletion() {
AutodetectResult autodetectResult = mock(AutodetectResult.class);
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class);
when(iterator.hasNext()).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn(autodetectResult);
AutodetectProcess process = mock(AutodetectProcess.class);
when(process.readAutodetectResults()).thenReturn(iterator);
processorUnderTest.process(process, randomBoolean());
processorUnderTest.awaitCompletion();
assertEquals(0, processorUnderTest.completionLatch.getCount());
assertEquals(1, processorUnderTest.updateModelSnapshotIdSemaphore.availablePermits());
}
} }