diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java index bafcc49bf9d..7947de35c40 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java @@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.function.Supplier; class ScheduledJob { @@ -38,7 +39,6 @@ class ScheduledJob { private final DataExtractorFactory dataExtractorFactory; private final Supplier currentTimeSupplier; - private volatile DataExtractor dataExtractor; private volatile long lookbackStartTimeMs; private volatile Long lastEndTimeMs; private volatile boolean running = true; @@ -104,9 +104,6 @@ class ScheduledJob { } public void stop() { - if (dataExtractor != null) { - dataExtractor.cancel(); - } running = false; auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_STOPPED)); } @@ -124,8 +121,12 @@ class ScheduledJob { RuntimeException error = null; long recordCount = 0; - dataExtractor = dataExtractorFactory.newExtractor(start, end); + DataExtractor dataExtractor = dataExtractorFactory.newExtractor(start, end); while (dataExtractor.hasNext()) { + if (!isRunning() && !dataExtractor.isCancelled()) { + dataExtractor.cancel(); + } + Optional extractedData; try { extractedData = dataExtractor.next(); @@ -136,12 +137,7 @@ class ScheduledJob { if (extractedData.isPresent()) { DataCounts counts; try (InputStream in = extractedData.get()) { - PostDataAction.Request request = new PostDataAction.Request(jobId); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - Streams.copy(in, outputStream); - request.setContent(new BytesArray(outputStream.toByteArray())); - PostDataAction.Response response = client.execute(PostDataAction.INSTANCE, request).get(); - counts = response.getDataCounts(); + counts = postData(in); } catch (Exception e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); @@ -155,7 +151,6 @@ class ScheduledJob { } } } - dataExtractor = null; lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1); @@ -179,6 +174,15 @@ class ScheduledJob { } } + private DataCounts postData(InputStream inputStream) throws IOException, ExecutionException, InterruptedException { + PostDataAction.Request request = new PostDataAction.Request(jobId); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + Streams.copy(inputStream, outputStream); + request.setContent(new BytesArray(outputStream.toByteArray())); + PostDataAction.Response response = client.execute(PostDataAction.INSTANCE, request).get(); + return response.getDataCounts(); + } + private long nextRealtimeTimestamp() { long epochMs = currentTimeSupplier.get() + frequencyMs; return toIntervalStartEpochMs(epochMs) + NEXT_TASK_DELAY_MS; @@ -195,7 +199,6 @@ class ScheduledJob { AnalysisProblemException(Throwable cause) { super(cause); } - } class ExtractionProblemException extends RuntimeException { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java index 14632f99bd5..1e6ab130abd 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java @@ -155,8 +155,6 @@ public class ScheduledJobRunner extends AbstractComponent { holder.problemTracker.finishReport(); doScheduleRealtime(nextDelayInMsSinceEpoch, jobId, holder); }); - } else { - holder.stop(null); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/DataExtractor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/DataExtractor.java index 613186668b6..98187a92886 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/DataExtractor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/DataExtractor.java @@ -24,6 +24,11 @@ public interface DataExtractor { */ Optional next() throws IOException; + /** + * @return {@code true} if the extractor has been cancelled, or {@code false} otherwise + */ + boolean isCancelled(); + /** * Cancel the current search. */ diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java index adf369e434e..a1558a7adb4 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java @@ -37,6 +37,12 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; +/** + * An implementation that extracts data from elasticsearch using search and scroll on a client. + * It supports safe and responsive cancellation by continuing the scroll until a new timestamp + * is seen. + * Note that this class is NOT thread-safe. + */ class ScrollDataExtractor implements DataExtractor { private static final Logger LOGGER = Loggers.getLogger(ScrollDataExtractor.class); @@ -44,10 +50,10 @@ class ScrollDataExtractor implements DataExtractor { private final Client client; private final ScrollDataExtractorContext context; - private volatile String scrollId; - private volatile boolean isCancelled; - private volatile boolean hasNext; - private volatile Long timestampOnCancel; + private String scrollId; + private boolean isCancelled; + private boolean hasNext; + private Long timestampOnCancel; public ScrollDataExtractor(Client client, ScrollDataExtractorContext dataExtractorContext) { this.client = Objects.requireNonNull(client); @@ -60,6 +66,11 @@ class ScrollDataExtractor implements DataExtractor { return hasNext; } + @Override + public boolean isCancelled() { + return isCancelled; + } + @Override public void cancel() { LOGGER.trace("[{}] Data extractor received cancel request", context.jobId); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorTests.java index 91e15171beb..3ada091ca91 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorTests.java @@ -213,6 +213,7 @@ public class ScrollDataExtractorTests extends ESTestCase { ); extractor.setNextResponse(response2); + assertThat(extractor.isCancelled(), is(true)); assertThat(extractor.hasNext(), is(true)); stream = extractor.next(); assertThat(stream.isPresent(), is(true));