diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PostDataAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PostDataAction.java index 2b4573d8063..699110a8ab1 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PostDataAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PostDataAction.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.prelert.action; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; @@ -25,6 +24,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.StatusToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.prelert.PrelertPlugin; @@ -124,6 +126,14 @@ public class PostDataAction extends Action listener) { + protected void doExecute(Task task, Request request, ActionListener listener) { + PostDataTask postDataTask = (PostDataTask) task; TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build(); DataLoadParams params = new DataLoadParams(timeRange, request.isIgnoreDowntime()); threadPool.executor(PrelertPlugin.THREAD_POOL_NAME).execute(() -> { try { - DataCounts dataCounts = processManager.processData(request.getJobId(), request.content.streamInput(), params); + DataCounts dataCounts = processManager.processData(request.getJobId(), request.content.streamInput(), params, + postDataTask::isCancelled); listener.onResponse(new Response(dataCounts)); - } catch (IOException | ElasticsearchException e) { + } catch (Exception e) { listener.onFailure(e); } }); } + + @Override + protected final void doExecute(Request request, ActionListener listener) { + throw new UnsupportedOperationException("the task parameter is required"); + } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/data/DataProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/data/DataProcessor.java index 7300725b401..f0d8d5ade81 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/data/DataProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/data/DataProcessor.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadPar import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams; import java.io.InputStream; +import java.util.function.Supplier; public interface DataProcessor { @@ -27,12 +28,13 @@ public interface DataProcessor { *
  • If a high proportion of the records chronologically out of order
  • * * - * @param jobId the jobId - * @param input Data input stream - * @param params Data processing parameters + * @param jobId the jobId + * @param input Data input stream + * @param params Data processing parameters + * @param cancelled Whether the data processing has been cancelled * @return Count of records, fields, bytes, etc written */ - DataCounts processData(String jobId, InputStream input, DataLoadParams params); + DataCounts processData(String jobId, InputStream input, DataLoadParams params, Supplier cancelled); /** * Flush the running job, ensuring that the native process has had the diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java index 4f6a1d93450..a59ae0f0661 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java @@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; public class AutodetectProcessManager extends AbstractComponent implements DataProcessor { @@ -93,7 +94,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP } @Override - public DataCounts processData(String jobId, InputStream input, DataLoadParams params) { + public DataCounts processData(String jobId, InputStream input, DataLoadParams params, Supplier cancelled) { Allocation allocation = jobManager.getJobAllocation(jobId); if (allocation.getStatus().isAnyOf(JobStatus.PAUSING, JobStatus.PAUSED)) { return new DataCounts(jobId); @@ -105,7 +106,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP return c; }); try { - return communicator.writeToJob(input, params); + return communicator.writeToJob(input, params, cancelled); // TODO check for errors from autodetect } catch (IOException e) { String msg = String.format(Locale.ROOT, "Exception writing to process for job %s", jobId); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicator.java index e713135cf8d..dfa894f9323 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicator.java @@ -73,13 +73,13 @@ public class AutodetectCommunicator implements Closeable { job.getSchedulerConfig(), new TransformConfigs(job.getTransforms()) , statusReporter, LOGGER); } - public DataCounts writeToJob(InputStream inputStream, DataLoadParams params) throws IOException { + public DataCounts writeToJob(InputStream inputStream, DataLoadParams params, Supplier cancelled) throws IOException { return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, jobId), () -> { if (params.isResettingBuckets()) { autodetectProcess.writeResetBucketsControlMessage(params); } CountingInputStream countingStream = new CountingInputStream(inputStream, statusReporter); - DataCounts results = autoDetectWriter.write(countingStream); + DataCounts results = autoDetectWriter.write(countingStream, cancelled); autoDetectWriter.flush(); return results; }, false); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriter.java index a8634ac8ca4..d0eae921d4e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriter.java @@ -21,9 +21,11 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Supplier; import org.apache.logging.log4j.Logger; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.xpack.prelert.job.AnalysisConfig; import org.elasticsearch.xpack.prelert.job.DataDescription; import org.elasticsearch.xpack.prelert.job.DataDescription.DataFormat; @@ -94,7 +96,8 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter /** - * Create the transforms. This must be called before {@linkplain #write(java.io.InputStream)} + * Create the transforms. This must be called before + * {@linkplain DataToProcessWriter#write(java.io.InputStream, java.util.function.Supplier)} * even if no transforms are configured as it creates the * date transform and sets up the field mappings.
    *

    @@ -204,6 +207,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter * First all the transforms whose outputs the Date transform relies * on are executed then the date transform then the remaining transforms. * + * @param cancelled Determines whether the process writting has been cancelled * @param input The record the transforms should read their input from. The contents should * align with the header parameter passed to {@linkplain #buildTransformsAndWriteHeader(String[])} * @param output The record that will be written to the length encoded writer. @@ -211,8 +215,12 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter * the size of the map returned by {@linkplain #outputFieldIndexes()} * @param numberOfFieldsRead The total number read not just those included in the analysis */ - protected boolean applyTransformsAndWrite(String[] input, String[] output, long numberOfFieldsRead) + protected boolean applyTransformsAndWrite(Supplier cancelled, String[] input, String[] output, long numberOfFieldsRead) throws IOException { + if (cancelled.get()) { + throw new TaskCancelledException("cancelled"); + } + readWriteArea[TransformFactory.INPUT_ARRAY_INDEX] = input; readWriteArea[TransformFactory.OUTPUT_ARRAY_INDEX] = output; Arrays.fill(readWriteArea[TransformFactory.SCRATCH_ARRAY_INDEX], ""); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriter.java index 24705070e08..1a2f970c9b8 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriter.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.function.Supplier; /** * A writer for transforming and piping CSV data from an @@ -62,7 +63,7 @@ class CsvDataToProcessWriter extends AbstractDataToProcessWriter { * header a exception is thrown */ @Override - public DataCounts write(InputStream inputStream) throws IOException { + public DataCounts write(InputStream inputStream, Supplier cancelled) throws IOException { CsvPreference csvPref = new CsvPreference.Builder( dataDescription.getQuoteCharacter(), dataDescription.getFieldDelimiter(), @@ -117,7 +118,7 @@ class CsvDataToProcessWriter extends AbstractDataToProcessWriter { } fillRecordFromLine(line, inputRecord); - applyTransformsAndWrite(inputRecord, record, inputFieldCount); + applyTransformsAndWrite(cancelled, inputRecord, record, inputFieldCount); } // This function can throw diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataToProcessWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataToProcessWriter.java index f636e8980f1..eade5b79b62 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataToProcessWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataToProcessWriter.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.prelert.job.process.autodetect.writer; import java.io.IOException; import java.io.InputStream; +import java.util.function.Supplier; import org.elasticsearch.xpack.prelert.job.DataCounts; @@ -24,7 +25,7 @@ public interface DataToProcessWriter { * * @return Counts of the records processed, bytes read etc */ - DataCounts write(InputStream inputStream) throws IOException; + DataCounts write(InputStream inputStream, Supplier cancelled) throws IOException; /** * Flush the outputstream diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriter.java index 2295115883a..e31c09804ec 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriter.java @@ -12,6 +12,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import org.apache.logging.log4j.Logger; @@ -59,11 +60,11 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter { * timeField is missing from the JOSN inputIndex an exception is thrown */ @Override - public DataCounts write(InputStream inputStream) throws IOException { + public DataCounts write(InputStream inputStream, Supplier cancelled) throws IOException { statusReporter.startNewIncrementalCount(); try (JsonParser parser = new JsonFactory().createParser(inputStream)) { - writeJson(parser); + writeJson(parser, cancelled); // this line can throw and will be propagated statusReporter.finishReporting(); @@ -72,7 +73,7 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter { return statusReporter.incrementalStats(); } - private void writeJson(JsonParser parser) throws IOException { + private void writeJson(JsonParser parser, Supplier cancelled) throws IOException { Collection analysisFields = inputFields(); buildTransformsAndWriteHeader(analysisFields.toArray(new String[0])); @@ -101,7 +102,7 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter { record[inOut.outputIndex] = (field == null) ? "" : field; } - applyTransformsAndWrite(input, record, inputFieldCount); + applyTransformsAndWrite(cancelled, input, record, inputFieldCount); inputFieldCount = recordReader.read(input, gotFields); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriter.java index 357382f94bb..780fa5762b9 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriter.java @@ -13,6 +13,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; import java.util.Map; +import java.util.function.Supplier; import org.apache.logging.log4j.Logger; @@ -43,7 +44,7 @@ public class SingleLineDataToProcessWriter extends AbstractDataToProcessWriter { } @Override - public DataCounts write(InputStream inputStream) throws IOException { + public DataCounts write(InputStream inputStream, Supplier cancelled) throws IOException { statusReporter.startNewIncrementalCount(); try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { @@ -56,7 +57,7 @@ public class SingleLineDataToProcessWriter extends AbstractDataToProcessWriter { for (String line = bufferedReader.readLine(); line != null; line = bufferedReader.readLine()) { Arrays.fill(record, ""); - applyTransformsAndWrite(new String[]{line}, record, 1); + applyTransformsAndWrite(cancelled, new String[]{line}, record, 1); } statusReporter.finishReporting(); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java index 9a1808d3271..e8d64967a10 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java @@ -133,7 +133,7 @@ class ScheduledJob { if (extractedData.isPresent()) { DataCounts counts; try { - counts = dataProcessor.processData(jobId, extractedData.get(), DATA_LOAD_PARAMS); + counts = dataProcessor.processData(jobId, extractedData.get(), DATA_LOAD_PARAMS, () -> false); } catch (Exception e) { error = new AnalysisProblemException(e); break; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java index c3c792fac85..ca7ac809389 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.prelert.job.DataDescription; import org.elasticsearch.xpack.prelert.job.Detector; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.JobStatus; +import org.elasticsearch.xpack.prelert.job.data.DataProcessor; import org.elasticsearch.xpack.prelert.job.metadata.Allocation; import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; @@ -37,6 +38,7 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; import static org.elasticsearch.mock.orig.Mockito.doThrow; import static org.elasticsearch.mock.orig.Mockito.times; @@ -51,7 +53,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; /** - * Calling the {@link AutodetectProcessManager#processData(String, InputStream, DataLoadParams)} + * Calling the {@link DataProcessor#processData(String, InputStream, DataLoadParams, java.util.function.Supplier)} * method causes an AutodetectCommunicator to be created on demand. Most of these tests have to * do that before they can assert other things */ @@ -77,7 +79,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { assertEquals(0, manager.numberOfRunningJobs()); DataLoadParams params = new DataLoadParams(TimeRange.builder().build()); - manager.processData("foo", createInputStream(""), params); + manager.processData("foo", createInputStream(""), params, () -> false); assertEquals(1, manager.numberOfRunningJobs()); } @@ -87,10 +89,11 @@ public class AutodetectProcessManagerTests extends ESTestCase { DataLoadParams params = mock(DataLoadParams.class); InputStream inputStream = createInputStream(""); - doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream, params); + Supplier cancellable = () -> false; + doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream, params, cancellable); ESTestCase.expectThrows(ElasticsearchException.class, - () -> manager.processData("foo", inputStream, params)); + () -> manager.processData("foo", inputStream, params, cancellable)); } public void testCloseJob() { @@ -99,7 +102,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = createManager(communicator); assertEquals(0, manager.numberOfRunningJobs()); - manager.processData("foo", createInputStream(""), mock(DataLoadParams.class)); + manager.processData("foo", createInputStream(""), mock(DataLoadParams.class), () -> false); // job is created assertEquals(1, manager.numberOfRunningJobs()); @@ -111,10 +114,11 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectProcessManager manager = createManager(communicator); + Supplier cancellable = () -> false; DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), true); InputStream inputStream = createInputStream(""); - manager.processData("foo", inputStream, params); - verify(communicator).writeToJob(inputStream, params); + manager.processData("foo", inputStream, params, cancellable); + verify(communicator).writeToJob(inputStream, params, cancellable); } public void testFlush() throws IOException { @@ -123,7 +127,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); InputStream inputStream = createInputStream(""); - manager.processData("foo", inputStream, mock(DataLoadParams.class)); + manager.processData("foo", inputStream, mock(DataLoadParams.class), () -> false); InterimResultsParams params = InterimResultsParams.builder().build(); manager.flushJob("foo", params); @@ -154,7 +158,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = createManager(communicator); assertFalse(manager.jobHasActiveAutodetectProcess("foo")); - manager.processData("foo", createInputStream(""), mock(DataLoadParams.class)); + manager.processData("foo", createInputStream(""), mock(DataLoadParams.class), () -> false); assertTrue(manager.jobHasActiveAutodetectProcess("foo")); assertFalse(manager.jobHasActiveAutodetectProcess("bar")); @@ -170,7 +174,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { givenAllocationWithStatus(JobStatus.PAUSING); InputStream inputStream = createInputStream(""); - DataCounts dataCounts = manager.processData("foo", inputStream, mock(DataLoadParams.class)); + DataCounts dataCounts = manager.processData("foo", inputStream, mock(DataLoadParams.class), () -> false); assertThat(dataCounts, equalTo(new DataCounts("foo"))); } @@ -182,7 +186,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { givenAllocationWithStatus(JobStatus.PAUSED); AutodetectProcessManager manager = createManager(communicator); InputStream inputStream = createInputStream(""); - DataCounts dataCounts = manager.processData("foo", inputStream, mock(DataLoadParams.class)); + DataCounts dataCounts = manager.processData("foo", inputStream, mock(DataLoadParams.class), () -> false); assertThat(dataCounts, equalTo(new DataCounts("foo"))); } @@ -227,7 +231,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private AutodetectProcessManager createManagerAndCallProcessData(AutodetectCommunicator communicator, String jobId) { AutodetectProcessManager manager = createManager(communicator); - manager.processData(jobId, createInputStream(""), mock(DataLoadParams.class)); + manager.processData(jobId, createInputStream(""), mock(DataLoadParams.class), () -> false); return manager; } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicatorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicatorTests.java index e1749f51b94..340e2cf4ad4 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicatorTests.java @@ -42,7 +42,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build()); AutodetectProcess process = mockAutodetectProcessWithOutputStream(); try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) { - communicator.writeToJob(new ByteArrayInputStream(new byte[0]), params); + communicator.writeToJob(new ByteArrayInputStream(new byte[0]), params, () -> false); Mockito.verify(process).writeResetBucketsControlMessage(params); } } @@ -143,10 +143,11 @@ public class AutodetectCommunicatorTests extends ESTestCase { AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); communicator.inUse.set(new CountDownLatch(1)); - expectThrows(ElasticsearchStatusException.class, () -> communicator.writeToJob(in, mock(DataLoadParams.class))); + expectThrows(ElasticsearchStatusException.class, + () -> communicator.writeToJob(in, mock(DataLoadParams.class), () -> false)); communicator.inUse.set(null); - communicator.writeToJob(in, mock(DataLoadParams.class)); + communicator.writeToJob(in, mock(DataLoadParams.class), () -> false); } public void testFlushInUse() throws IOException { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriterTests.java index 0bc08ce9a08..29dc32943a2 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriterTests.java @@ -333,7 +333,7 @@ public class AbstractDataToProcessWriterTests extends ESTestCase { String[] input = { "1", "metricA", "0" }; String[] output = new String[3]; - assertFalse(writer.applyTransformsAndWrite(input, output, 3)); + assertFalse(writer.applyTransformsAndWrite(() -> false, input, output, 3)); verify(autodetectProcess, never()).writeRecord(output); verify(statusReporter, never()).reportRecordWritten(anyLong(), anyLong()); @@ -344,7 +344,7 @@ public class AbstractDataToProcessWriterTests extends ESTestCase { // this is ok input = new String[] { "2", "metricB", "0" }; String[] expectedOutput = { "2", null, null }; - assertTrue(writer.applyTransformsAndWrite(input, output, 3)); + assertTrue(writer.applyTransformsAndWrite(() -> false, input, output, 3)); verify(autodetectProcess, times(1)).writeRecord(expectedOutput); verify(statusReporter, times(1)).reportRecordWritten(3, 2000); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriterTests.java index 8f80d40c7e8..b6c2a01826b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriterTests.java @@ -5,8 +5,10 @@ */ package org.elasticsearch.xpack.prelert.job.process.autodetect.writer; +import static org.elasticsearch.xpack.prelert.job.process.autodetect.writer.JsonDataToProcessWriterTests.endLessStream; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -21,8 +23,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.Logger; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcess; @@ -80,6 +86,32 @@ public class CsvDataToProcessWriterTests extends ESTestCase { analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector)).build(); } + public void testWrite_cancel() throws Exception { + InputStream inputStream = endLessStream("time,metric,value\n", "1,,foo\n"); + CsvDataToProcessWriter writer = createWriter(); + + AtomicBoolean cancel = new AtomicBoolean(false); + AtomicReference exception = new AtomicReference<>(); + Thread t = new Thread(() -> { + try { + writer.write(inputStream, cancel::get); + } catch (Exception e) { + exception.set(e); + } + }); + t.start(); + try { + assertBusy(() -> verify(statusReporter, atLeastOnce()).reportRecordWritten(anyLong(), anyLong())); + } finally { + cancel.set(true); + t.join(); + } + + assertNotNull(exception.get()); + assertEquals(TaskCancelledException.class, exception.get().getClass()); + assertEquals("cancelled", exception.get().getMessage()); + } + public void testWrite_GivenTimeFormatIsEpochAndDataIsValid() throws IOException { StringBuilder input = new StringBuilder(); @@ -89,7 +121,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -121,7 +153,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -144,7 +176,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -169,7 +201,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { when(statusReporter.getLatestRecordTime()).thenReturn(new Date(5000L)); CsvDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -200,7 +232,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -234,7 +266,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -265,7 +297,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(""); CsvDataToProcessWriter writer = createWriter(); - DataCounts counts = writer.write(inputStream); + DataCounts counts = writer.write(inputStream, () -> false); assertEquals(0L, counts.getInputBytes()); assertEquals(0L, counts.getInputRecordCount()); } @@ -292,7 +324,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { input.append("1970-01-01,00:00:02Z,foo,6.0\n"); InputStream inputStream = createInputStream(input.toString()); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -331,7 +363,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -357,7 +389,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); - SuperCsvException e = ESTestCase.expectThrows(SuperCsvException.class, () -> writer.write(inputStream)); + SuperCsvException e = ESTestCase.expectThrows(SuperCsvException.class, () -> writer.write(inputStream, () -> false)); // Expected line numbers are 2 and 10001, but SuperCSV may print the // numbers using a different locale's digit characters assertTrue(e.getMessage(), e.getMessage().matches( diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataWithTransformsToProcessWriterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataWithTransformsToProcessWriterTests.java index a78262dff21..112b02f482b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataWithTransformsToProcessWriterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataWithTransformsToProcessWriterTests.java @@ -69,7 +69,7 @@ public class DataWithTransformsToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); AbstractDataToProcessWriter writer = createWriter(true); - writer.write(inputStream); + writer.write(inputStream, () -> false); List expectedRecords = new ArrayList<>(); // The final field is the control field @@ -92,7 +92,7 @@ public class DataWithTransformsToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); AbstractDataToProcessWriter writer = createWriter(false); - writer.write(inputStream); + writer.write(inputStream, () -> false); List expectedRecords = new ArrayList<>(); // The final field is the control field diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriterTests.java index 6059b19206d..e54679296bb 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriterTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.prelert.job.process.autodetect.writer; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -19,9 +20,12 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcess; import org.junit.Before; @@ -77,6 +81,32 @@ public class JsonDataToProcessWriterTests extends ESTestCase { analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector)).build(); } + public void testWrite_cancel() throws Exception { + InputStream inputStream = endLessStream("", "{\"time\":1}"); + JsonDataToProcessWriter writer = createWriter(); + + AtomicBoolean cancel = new AtomicBoolean(false); + AtomicReference exception = new AtomicReference<>(); + Thread t = new Thread(() -> { + try { + writer.write(inputStream, cancel::get); + } catch (Exception e) { + exception.set(e); + } + }); + t.start(); + try { + assertBusy(() -> verify(statusReporter, atLeastOnce()).reportRecordWritten(anyLong(), anyLong())); + } finally { + cancel.set(true); + t.join(); + } + + assertNotNull(exception.get()); + assertEquals(TaskCancelledException.class, exception.get().getClass()); + assertEquals("cancelled", exception.get().getMessage()); + } + public void testWrite_GivenTimeFormatIsEpochAndDataIsValid() throws Exception { StringBuilder input = new StringBuilder(); input.append("{\"time\":\"1\", \"metric\":\"foo\", \"value\":\"1.0\"}"); @@ -84,7 +114,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -106,7 +136,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -135,7 +165,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); List expectedRecords = new ArrayList<>(); // The final field is the control field @@ -164,7 +194,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -193,7 +223,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -219,7 +249,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - ESTestCase.expectThrows(ElasticsearchParseException.class, () -> writer.write(inputStream)); + ESTestCase.expectThrows(ElasticsearchParseException.class, () -> writer.write(inputStream, () -> false)); } public void testWrite_GivenJsonWithArrayField() @@ -234,7 +264,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -263,7 +293,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -304,7 +334,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { input.append("{\"date\":\"1970-01-01\", \"time-of-day\":\"00:00:02Z\", \"value\":\"6.0\"}"); InputStream inputStream = createInputStream(input.toString()); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -341,7 +371,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -370,4 +400,31 @@ public class JsonDataToProcessWriterTests extends ESTestCase { assertArrayEquals(expectedRecords.get(i), writtenRecords.get(i)); } } + + static InputStream endLessStream(String firstLine, String repeatLine) { + return new InputStream() { + + int pos = 0; + boolean firstLineRead = false; + final byte[] first = firstLine.getBytes(StandardCharsets.UTF_8); + final byte[] repeat = repeatLine.getBytes(StandardCharsets.UTF_8); + + @Override + public int read() throws IOException { + if (firstLineRead == false) { + if (pos == first.length) { + pos = 0; + firstLineRead = true; + } else { + return first[pos++]; + } + } + + if (pos == repeat.length) { + pos = 0; + } + return repeat[pos++]; + } + }; + } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriterTests.java index 848fde2221d..ea14e6494e3 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriterTests.java @@ -5,7 +5,10 @@ */ package org.elasticsearch.xpack.prelert.job.process.autodetect.writer; +import static org.elasticsearch.xpack.prelert.job.process.autodetect.writer.JsonDataToProcessWriterTests.endLessStream; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -18,8 +21,11 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.Logger; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcess; import org.junit.Before; @@ -71,6 +77,38 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase { transformConfigs = new ArrayList<>(); } + public void testWrite_cancel() throws Exception { + TransformConfig transformConfig = new TransformConfig("extract"); + transformConfig.setInputs(Arrays.asList("raw")); + transformConfig.setOutputs(Arrays.asList("time", "message")); + transformConfig.setArguments(Arrays.asList("(.{20}) (.*)")); + transformConfigs.add(transformConfig); + + InputStream inputStream = endLessStream("", "2015-04-29 10:00:00Z this is a message\n"); + SingleLineDataToProcessWriter writer = createWriter(); + + AtomicBoolean cancel = new AtomicBoolean(false); + AtomicReference exception = new AtomicReference<>(); + Thread t = new Thread(() -> { + try { + writer.write(inputStream, cancel::get); + } catch (Exception e) { + exception.set(e); + } + }); + t.start(); + try { + assertBusy(() -> verify(statusReporter, atLeastOnce()).reportRecordWritten(anyLong(), anyLong())); + } finally { + cancel.set(true); + t.join(); + } + + assertNotNull(exception.get()); + assertEquals(TaskCancelledException.class, exception.get().getClass()); + assertEquals("cancelled", exception.get().getMessage()); + } + public void testWrite_GivenDataIsValid() throws Exception { TransformConfig transformConfig = new TransformConfig("extract"); transformConfig.setInputs(Arrays.asList("raw")); @@ -85,7 +123,7 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); SingleLineDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).getLatestRecordTime(); verify(statusReporter, times(1)).startNewIncrementalCount(); verify(statusReporter, times(1)).setAnalysedFieldsPerRecord(1); @@ -122,7 +160,7 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); SingleLineDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).getLatestRecordTime(); verify(statusReporter, times(1)).startNewIncrementalCount(); verify(statusReporter, times(1)).setAnalysedFieldsPerRecord(1); @@ -148,7 +186,7 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); SingleLineDataToProcessWriter writer = createWriter(); - writer.write(inputStream); + writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); verify(statusReporter, times(1)).setAnalysedFieldsPerRecord(1); verify(statusReporter, times(1)).reportDateParseError(1); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java index 348812089c0..3b37b2ac036 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java @@ -35,7 +35,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.util.Arrays; -import java.util.Collections; import java.util.Date; import java.util.Optional; import java.util.concurrent.ExecutorService; @@ -104,7 +103,7 @@ public class ScheduledJobServiceTests extends ESTestCase { when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); when(dataExtractor.next()).thenReturn(Optional.of(in)); - when(dataProcessor.processData(anyString(), eq(in), any())).thenReturn(dataCounts); + when(dataProcessor.processData(anyString(), eq(in), any(), any())).thenReturn(dataCounts); scheduledJobService.start(builder.build(), allocation); verify(dataExtractor).newSearch(eq(0L), eq(60000L), any()); @@ -126,7 +125,7 @@ public class ScheduledJobServiceTests extends ESTestCase { when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); when(dataExtractor.next()).thenReturn(Optional.of(in)); - when(dataProcessor.processData(anyString(), eq(in), any())).thenReturn(dataCounts); + when(dataProcessor.processData(anyString(), eq(in), any(), any())).thenReturn(dataCounts); scheduledJobService.start(builder.build(), allocation); verify(dataExtractor).newSearch(eq(0L), eq(60000L), any()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java index 67c3081cd66..c60904d6995 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java @@ -49,7 +49,7 @@ public class ScheduledJobTests extends ESTestCase { InputStream inputStream = mock(InputStream.class); when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); - when(dataProcessor.processData(eq("_job_id"), same(inputStream), any())).thenReturn(dataCounts); + when(dataProcessor.processData(eq("_job_id"), same(inputStream), any(), any())).thenReturn(dataCounts); } public void testLookBackRunWithEndTime() throws Exception { @@ -141,7 +141,7 @@ public class ScheduledJobTests extends ESTestCase { public void testAnalysisProblem() throws Exception { dataProcessor = mock(DataProcessor.class); - when(dataProcessor.processData(eq("_job_id"), any(), any())).thenThrow(new RuntimeException()); + when(dataProcessor.processData(eq("_job_id"), any(), any(), any())).thenThrow(new RuntimeException()); ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1); SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L);