From fb2bd73bc1b56e77dc12c3100ae20c31cef66bc9 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 29 Nov 2016 15:03:02 +0100 Subject: [PATCH] Let close autodetect wait for other operations to complete. Original commit: elastic/x-pack-elasticsearch@de517f4fbac6bf1a7767596c24b598b6eb2bb227 --- .../autodetect/AutodetectCommunicator.java | 38 ++++++++++++++----- .../prelert/integration/ScheduledJobIT.java | 17 ++------- .../AutodetectCommunicatorTests.java | 25 +++++++----- 3 files changed, 47 insertions(+), 33 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicator.java index 8aa62566662..2741873c34d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicator.java @@ -32,8 +32,9 @@ import java.io.InputStream; import java.time.Duration; import java.time.ZonedDateTime; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; public class AutodetectCommunicator implements Closeable { @@ -47,7 +48,7 @@ public class AutodetectCommunicator implements Closeable { private final DataToProcessWriter autoDetectWriter; private final AutoDetectResultProcessor autoDetectResultProcessor; - final AtomicBoolean inUse = new AtomicBoolean(false); + final AtomicReference inUse = new AtomicReference<>(); public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process, StatusReporter statusReporter, AutoDetectResultProcessor autoDetectResultProcessor, StateProcessor stateProcessor) { @@ -81,7 +82,7 @@ public class AutodetectCommunicator implements Closeable { DataCounts results = autoDetectWriter.write(countingStream); autoDetectWriter.flush(); return results; - }); + }, false); } @Override @@ -91,14 +92,14 @@ public class AutodetectCommunicator implements Closeable { autodetectProcess.close(); autoDetectResultProcessor.awaitCompletion(); return null; - }); + }, true); } public void writeUpdateConfigMessage(String config) throws IOException { checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, jobId), () -> { autodetectProcess.writeUpdateConfigMessage(config); return null; - }); + }, false); } public void flushJob(InterimResultsParams params) throws IOException { @@ -110,6 +111,7 @@ public class AutodetectCommunicator implements Closeable { String flushId = autodetectProcess.flushJob(params); Duration timeout = Duration.ofSeconds(tryTimeoutSecs); + LOGGER.info("[{}] waiting for flush", jobId); boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, timeout); LOGGER.info("[{}] isFlushComplete={}", jobId, isFlushComplete); if (!isFlushComplete) { @@ -122,7 +124,7 @@ public class AutodetectCommunicator implements Closeable { // clients from querying results in the middle of normalisation. autoDetectResultProcessor.waitUntilRenormaliserIsIdle(); return null; - }); + }, false); } /** @@ -149,16 +151,32 @@ public class AutodetectCommunicator implements Closeable { return Optional.ofNullable(statusReporter.runningTotalStats()); } - private T checkAndRun(Supplier errorMessage, Callback callback) throws IOException { - if (inUse.compareAndSet(false, true)) { + private T checkAndRun(Supplier errorMessage, Callback callback, boolean wait) throws IOException { + CountDownLatch latch = new CountDownLatch(1); + if (inUse.compareAndSet(null, latch)) { try { checkProcessIsAlive(); return callback.run(); } finally { - inUse.set(false); + latch.countDown(); + inUse.set(null); } } else { - throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS); + if (wait) { + latch = inUse.get(); + if (latch != null) { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS); + } + } + checkProcessIsAlive(); + return callback.run(); + } else { + throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS); + } } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java index a5af8c76444..6ceb65e3aa3 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java @@ -72,7 +72,6 @@ public class ScheduledJobIT extends ESRestTestCase { waitForSchedulerStoppedState(client(), jobId); } - @AwaitsFix(bugUrl = "mvg fix this") public void testStartJobScheduler_GivenRealtime() throws Exception { String jobId = "_id3"; createAirlineDataIndex(); @@ -86,8 +85,10 @@ public class ScheduledJobIT extends ESRestTestCase { assertBusy(() -> { try { Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId, - Collections.singletonMap("metric", "status")); - assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"RUNNING\"")); + Collections.singletonMap("metric", "status,data_counts")); + String responseAsString = responseEntityToString(getJobResponse); + assertThat(responseAsString, containsString("\"status\":\"RUNNING\"")); + assertThat(responseAsString, containsString("\"input_record_count\":2")); } catch (Exception e1) { throw new RuntimeException(e1); } @@ -105,16 +106,6 @@ public class ScheduledJobIT extends ESRestTestCase { waitForSchedulerStoppedState(client(), jobId); client().performRequest("POST", "/_xpack/prelert/data/" + jobId + "/_close"); - assertBusy(() -> { - try { - Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId, - Collections.singletonMap("metric", "status")); - assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"CLOSED\"")); - } catch (Exception e1) { - throw new RuntimeException(e1); - } - }); - response = client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/" + jobId); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}")); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicatorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicatorTests.java index 4b4c20147e7..e1749f51b94 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicatorTests.java @@ -24,6 +24,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import static org.elasticsearch.mock.orig.Mockito.doAnswer; @@ -31,6 +32,8 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class AutodetectCommunicatorTests extends ESTestCase { @@ -139,10 +142,10 @@ public class AutodetectCommunicatorTests extends ESTestCase { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); - communicator.inUse.set(true); + communicator.inUse.set(new CountDownLatch(1)); expectThrows(ElasticsearchStatusException.class, () -> communicator.writeToJob(in, mock(DataLoadParams.class))); - communicator.inUse.set(false); + communicator.inUse.set(null); communicator.writeToJob(in, mock(DataLoadParams.class)); } @@ -152,24 +155,26 @@ public class AutodetectCommunicatorTests extends ESTestCase { when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true); AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - communicator.inUse.set(true); + communicator.inUse.set(new CountDownLatch(1)); InterimResultsParams params = mock(InterimResultsParams.class); expectThrows(ElasticsearchStatusException.class, () -> communicator.flushJob(params)); - communicator.inUse.set(false); + communicator.inUse.set(null); communicator.flushJob(params); } - public void testCloseInUse() throws IOException { + public void testCloseInUse() throws Exception { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true); AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - communicator.inUse.set(true); - expectThrows(ElasticsearchStatusException.class, communicator::close); + CountDownLatch latch = mock(CountDownLatch.class); + communicator.inUse.set(latch); + communicator.close(); + verify(latch, times(1)).await(); - communicator.inUse.set(false); + communicator.inUse.set(null); communicator.close(); } @@ -179,10 +184,10 @@ public class AutodetectCommunicatorTests extends ESTestCase { when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true); AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - communicator.inUse.set(true); + communicator.inUse.set(new CountDownLatch(1)); expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateConfigMessage("")); - communicator.inUse.set(false); + communicator.inUse.set(null); communicator.writeUpdateConfigMessage(""); }