diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java index a6a14191171..b846401723a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java @@ -28,7 +28,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; -import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import java.io.IOException; @@ -256,7 +256,7 @@ public class FlushJobAction extends Action listener) { - InterimResultsParams.Builder paramsBuilder = InterimResultsParams.builder(); + FlushJobParams.Builder paramsBuilder = FlushJobParams.builder(); paramsBuilder.calcInterim(request.getCalcInterim()); if (request.getAdvanceTime() != null) { paramsBuilder.advanceTime(request.getAdvanceTime()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 6d5f9e123da..2f4cd55aeb0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -24,7 +24,7 @@ import org.elasticsearch.xpack.ml.job.process.CountingInputStream; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; -import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; @@ -196,7 +196,7 @@ public class AutodetectCommunicator implements Closeable { }, handler); } - public void flushJob(InterimResultsParams params, BiConsumer handler) { + public void flushJob(FlushJobParams params, BiConsumer handler) { submitOperation(() -> { String flushId = autodetectProcess.flushJob(params); waitFlushToCompletion(flushId); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java index e70463a5c6c..c8ab913c0b8 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java @@ -9,7 +9,7 @@ import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; -import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; @@ -78,11 +78,12 @@ public interface AutodetectProcess extends Closeable { * in a flush acknowledgment by the autodetect process once the flush has * been processed. * - * @param params Should interim results be generated + * @param params Parameters describing the controls that will accompany the flushing + * (e.g. calculating interim results, time control, etc.) * @return The flush Id * @throws IOException If the flush failed */ - String flushJob(InterimResultsParams params) throws IOException; + String flushJob(FlushJobParams params) throws IOException; /** * Flush the output data stream @@ -107,7 +108,7 @@ public interface AutodetectProcess extends Closeable { /** * Returns true if the process still running. - * Methods such as {@link #flushJob(InterimResultsParams)} are essentially + * Methods such as {@link #flushJob(FlushJobParams)} are essentially * asynchronous the command will be continue to execute in the process after * the call has returned. This method tests whether something catastrophic * occurred in the process during its execution. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index fedeb43b419..e7d2eb7521b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -39,7 +39,7 @@ import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; -import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; @@ -198,10 +198,10 @@ public class AutodetectProcessManager extends AbstractComponent { * sitting in buffers. * * @param jobTask The job task - * @param params Parameters about whether interim results calculation - * should occur and for which period of time + * @param params Parameters describing the controls that will accompany the flushing + * (e.g. calculating interim results, time control, etc.) */ - public void flushJob(JobTask jobTask, InterimResultsParams params, Consumer handler) { + public void flushJob(JobTask jobTask, FlushJobParams params, Consumer handler) { logger.debug("Flushing job {}", jobTask.getJobId()); AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobTask.getAllocationId()); if (communicator == null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index c86e205c067..a8acb7093ba 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -10,7 +10,7 @@ import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; -import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; @@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit; /** * A placeholder class simulating the actions of the native Autodetect process. * Most methods consume data without performing any action however, after a call to - * {@link #flushJob(InterimResultsParams)} a {@link org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement} + * {@link #flushJob(FlushJobParams)} a {@link org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement} * message is expected on the {@link #readAutodetectResults()} ()} stream. This class writes the flush * acknowledgement immediately. */ @@ -76,7 +76,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { * @return {@link #FLUSH_ID} */ @Override - public String flushJob(InterimResultsParams params) throws IOException { + public String flushJob(FlushJobParams params) throws IOException { FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID); AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement); results.add(result); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index b2de0836223..148434d0534 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -16,7 +16,7 @@ import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; -import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter; @@ -157,9 +157,9 @@ class NativeAutodetectProcess implements AutodetectProcess { } @Override - public String flushJob(InterimResultsParams params) throws IOException { + public String flushJob(FlushJobParams params) throws IOException { ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfAnalysisFields); - writer.writeCalcInterimMessage(params); + writer.writeFlushControlMessage(params); return writer.writeFlushMessage(); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/InterimResultsParams.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java similarity index 93% rename from plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/InterimResultsParams.java rename to plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java index 67ce83227a1..0ef757b801b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/InterimResultsParams.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java @@ -12,12 +12,12 @@ import org.elasticsearch.xpack.ml.utils.time.TimeUtils; import java.util.Objects; -public class InterimResultsParams { +public class FlushJobParams { private final boolean calcInterim; private final TimeRange timeRange; private final Long advanceTimeSeconds; - private InterimResultsParams(boolean calcInterim, TimeRange timeRange, Long advanceTimeSeconds) { + private FlushJobParams(boolean calcInterim, TimeRange timeRange, Long advanceTimeSeconds) { this.calcInterim = calcInterim; this.timeRange = Objects.requireNonNull(timeRange); this.advanceTimeSeconds = advanceTimeSeconds; @@ -54,7 +54,7 @@ public class InterimResultsParams { public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - InterimResultsParams that = (InterimResultsParams) o; + FlushJobParams that = (FlushJobParams) o; return calcInterim == that.calcInterim && Objects.equals(timeRange, that.timeRange) && Objects.equals(advanceTimeSeconds, that.advanceTimeSeconds); @@ -91,10 +91,10 @@ public class InterimResultsParams { return this; } - public InterimResultsParams build() { + public FlushJobParams build() { checkValidFlushArgumentsCombination(); Long advanceTimeSeconds = checkAdvanceTimeParam(); - return new InterimResultsParams(calcInterim, timeRange, advanceTimeSeconds); + return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds); } private void checkValidFlushArgumentsCombination() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java index 7fa2b9079fd..537bba0b1ee 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java @@ -11,7 +11,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; -import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import java.io.IOException; import java.io.OutputStream; @@ -95,12 +95,19 @@ public class ControlMsgToProcessWriter { } /** - * Send an instruction to calculate interim results to the C++ autodetect process. + * Writes the control messages that are requested when flushing a job. + * Those control messages need to be followed by a flush message in order + * for them to reach the C++ process immediately. List of supported controls: * - * @param params Parameters indicating whether interim results should be written - * and for which buckets + *
    + *
  • advance time
  • + *
  • calculate interim results
  • + *
+ * + * @param params Parameters describing the controls that will accompany the flushing + * (e.g. calculating interim results, time control, etc.) */ - public void writeCalcInterimMessage(InterimResultsParams params) throws IOException { + public void writeFlushControlMessage(FlushJobParams params) throws IOException { if (params.shouldAdvanceTime()) { writeMessage(ADVANCE_TIME_MESSAGE_CODE + params.getAdvanceTime()); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index c1930bee8fb..5704ce6cbe8 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -19,7 +19,7 @@ import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; -import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.junit.Before; import org.mockito.Mockito; @@ -72,7 +72,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class); when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(true); try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor)) { - InterimResultsParams params = InterimResultsParams.builder().build(); + FlushJobParams params = FlushJobParams.builder().build(); communicator.flushJob(params, (aVoid, e) -> {}); Mockito.verify(process).flushJob(params); } @@ -93,7 +93,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { when(process.isProcessAlive()).thenReturn(false); when(process.readError()).thenReturn("Mock process is dead"); AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); - InterimResultsParams params = InterimResultsParams.builder().build(); + FlushJobParams params = FlushJobParams.builder().build(); Exception[] holder = new ElasticsearchException[1]; communicator.flushJob(params, (aVoid, e1) -> holder[0] = e1); assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", holder[0].getMessage()); @@ -105,7 +105,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class); when(autoDetectResultProcessor.waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1)))) .thenReturn(false).thenReturn(true); - InterimResultsParams params = InterimResultsParams.builder().build(); + FlushJobParams params = FlushJobParams.builder().build(); try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autoDetectResultProcessor)) { communicator.flushJob(params, (aVoid, e) -> {}); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index be89b1b1dcf..dc7ba3de7fb 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -33,7 +33,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; -import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; @@ -298,7 +298,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { manager.processData(jobTask, inputStream, randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); - InterimResultsParams params = InterimResultsParams.builder().build(); + FlushJobParams params = FlushJobParams.builder().build(); manager.flushJob(jobTask, params, e -> {}); verify(communicator).flushJob(same(params), any()); @@ -308,7 +308,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo"); - InterimResultsParams params = InterimResultsParams.builder().build(); + FlushJobParams params = FlushJobParams.builder().build(); doAnswer(invocationOnMock -> { BiConsumer handler = (BiConsumer) invocationOnMock.getArguments()[1]; handler.accept(null, new IOException("blah")); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java index 2418fe185d3..6120e78cd25 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java @@ -7,7 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; -import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import java.util.Iterator; @@ -16,7 +16,7 @@ public class BlackHoleAutodetectProcessTests extends ESTestCase { public void testFlushJob_writesAck() throws Exception { try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess("foo")) { - String flushId = process.flushJob(InterimResultsParams.builder().build()); + String flushId = process.flushJob(FlushJobParams.builder().build()); Iterator iterator = process.readAutodetectResults(); iterator.hasNext(); AutodetectResult result = iterator.next(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 8dcc333c1dc..d585f3d1f0e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -11,7 +11,7 @@ import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; -import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter; import org.junit.Assert; @@ -110,7 +110,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) { process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); - InterimResultsParams params = InterimResultsParams.builder().build(); + FlushJobParams params = FlushJobParams.builder().build(); process.flushJob(params); ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/InterimResultsParamsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParamsTests.java similarity index 79% rename from plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/InterimResultsParamsTests.java rename to plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParamsTests.java index 278d6d3ec6b..e99dc25f14d 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/InterimResultsParamsTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParamsTests.java @@ -7,9 +7,9 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.params; import org.elasticsearch.test.ESTestCase; -public class InterimResultsParamsTests extends ESTestCase { +public class FlushJobParamsTests extends ESTestCase { public void testBuilder_GivenDefault() { - InterimResultsParams params = InterimResultsParams.builder().build(); + FlushJobParams params = FlushJobParams.builder().build(); assertFalse(params.shouldCalculateInterim()); assertFalse(params.shouldAdvanceTime()); assertEquals("", params.getStart()); @@ -18,7 +18,7 @@ public class InterimResultsParamsTests extends ESTestCase { public void testBuilder_GivenCalcInterim() { - InterimResultsParams params = InterimResultsParams.builder().calcInterim(true).build(); + FlushJobParams params = FlushJobParams.builder().calcInterim(true).build(); assertTrue(params.shouldCalculateInterim()); assertFalse(params.shouldAdvanceTime()); assertEquals("", params.getStart()); @@ -27,7 +27,7 @@ public class InterimResultsParamsTests extends ESTestCase { public void testBuilder_GivenCalcInterimAndStart() { - InterimResultsParams params = InterimResultsParams.builder() + FlushJobParams params = FlushJobParams.builder() .calcInterim(true) .forTimeRange(TimeRange.builder().startTime("42").build()) .build(); @@ -39,7 +39,7 @@ public class InterimResultsParamsTests extends ESTestCase { public void testBuilder_GivenCalcInterimAndEnd_throws() { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> InterimResultsParams.builder() + () -> FlushJobParams.builder() .calcInterim(true) .forTimeRange(TimeRange.builder().endTime("100").build()) .build()); @@ -49,7 +49,7 @@ public class InterimResultsParamsTests extends ESTestCase { public void testBuilder_GivenCalcInterimAndStartAndEnd() { - InterimResultsParams params = InterimResultsParams.builder() + FlushJobParams params = FlushJobParams.builder() .calcInterim(true) .forTimeRange(TimeRange.builder().startTime("3600").endTime("7200").build()) .build(); @@ -61,7 +61,7 @@ public class InterimResultsParamsTests extends ESTestCase { public void testBuilder_GivenAdvanceTime() { - InterimResultsParams params = InterimResultsParams.builder().advanceTime("1821").build(); + FlushJobParams params = FlushJobParams.builder().advanceTime("1821").build(); assertFalse(params.shouldCalculateInterim()); assertEquals("", params.getStart()); assertEquals("", params.getEnd()); @@ -71,7 +71,7 @@ public class InterimResultsParamsTests extends ESTestCase { public void testBuilder_GivenCalcInterimAndAdvanceTime() { - InterimResultsParams params = InterimResultsParams.builder() + FlushJobParams params = FlushJobParams.builder() .calcInterim(true) .advanceTime("1940") .build(); @@ -84,7 +84,7 @@ public class InterimResultsParamsTests extends ESTestCase { public void testBuilder_GivenCalcInterimWithTimeRangeAndAdvanceTime() { - InterimResultsParams params = InterimResultsParams.builder() + FlushJobParams params = FlushJobParams.builder() .calcInterim(true) .forTimeRange(TimeRange.builder().startTime("1").endTime("2").build()) .advanceTime("1940") @@ -98,27 +98,27 @@ public class InterimResultsParamsTests extends ESTestCase { public void testValidate_GivenOnlyStartSpecified() { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> InterimResultsParams.builder().forTimeRange(TimeRange.builder().startTime("1").build()).build()); + () -> FlushJobParams.builder().forTimeRange(TimeRange.builder().startTime("1").build()).build()); assertEquals("Invalid flush parameters: unexpected 'start'.", e.getMessage()); } public void testFlushUpload_GivenOnlyEndSpecified() { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> InterimResultsParams.builder().forTimeRange(TimeRange.builder().endTime("1").build()).build()); + () -> FlushJobParams.builder().forTimeRange(TimeRange.builder().endTime("1").build()).build()); assertEquals("Invalid flush parameters: unexpected 'end'.", e.getMessage()); } public void testFlushUpload_GivenInterimResultsAndOnlyEndSpecified() { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> InterimResultsParams.builder().calcInterim(true).forTimeRange(TimeRange.builder().endTime("1").build()).build()); + () -> FlushJobParams.builder().calcInterim(true).forTimeRange(TimeRange.builder().endTime("1").build()).build()); assertEquals("Invalid flush parameters: 'start' has not been specified.", e.getMessage()); } public void testFlushUpload_GivenInterimResultsAndStartAndEndSpecifiedAsEpochs() { - InterimResultsParams params = InterimResultsParams.builder().calcInterim(true) + FlushJobParams params = FlushJobParams.builder().calcInterim(true) .forTimeRange(TimeRange.builder().startTime("1428494400").endTime("1428498000").build()).build(); assertTrue(params.shouldCalculateInterim()); assertFalse(params.shouldAdvanceTime()); @@ -128,7 +128,7 @@ public class InterimResultsParamsTests extends ESTestCase { public void testFlushUpload_GivenInterimResultsAndSameStartAndEnd() { - InterimResultsParams params = InterimResultsParams.builder().calcInterim(true) + FlushJobParams params = FlushJobParams.builder().calcInterim(true) .forTimeRange(TimeRange.builder().startTime("1428494400").endTime("1428494400").build()).build(); assertTrue(params.shouldCalculateInterim()); @@ -138,7 +138,7 @@ public class InterimResultsParamsTests extends ESTestCase { } public void testFlushUpload_GivenInterimResultsAndOnlyStartSpecified() { - InterimResultsParams params = InterimResultsParams.builder().calcInterim(true) + FlushJobParams params = FlushJobParams.builder().calcInterim(true) .forTimeRange(TimeRange.builder().startTime("1428494400").build()).build(); assertTrue(params.shouldCalculateInterim()); @@ -148,7 +148,7 @@ public class InterimResultsParamsTests extends ESTestCase { } public void testFlushUpload_GivenValidAdvanceTime() { - InterimResultsParams params = InterimResultsParams.builder().advanceTime("2015-04-08T13:00:00.000Z").build(); + FlushJobParams params = FlushJobParams.builder().advanceTime("2015-04-08T13:00:00.000Z").build(); assertFalse(params.shouldCalculateInterim()); assertEquals("", params.getStart()); assertEquals("", params.getEnd()); @@ -157,7 +157,7 @@ public class InterimResultsParamsTests extends ESTestCase { } public void testFlushUpload_GivenCalcInterimAndAdvanceTime() { - InterimResultsParams params = InterimResultsParams.builder().calcInterim(true).advanceTime("3600").build(); + FlushJobParams params = FlushJobParams.builder().calcInterim(true).advanceTime("3600").build(); assertTrue(params.shouldCalculateInterim()); assertEquals("", params.getStart()); assertEquals("", params.getEnd()); @@ -166,7 +166,7 @@ public class InterimResultsParamsTests extends ESTestCase { } public void testFlushUpload_GivenCalcInterimWithTimeRangeAndAdvanceTime() { - InterimResultsParams params = InterimResultsParams.builder().calcInterim(true) + FlushJobParams params = FlushJobParams.builder().calcInterim(true) .forTimeRange(TimeRange.builder().startTime("150").endTime("300").build()) .advanceTime("200").build(); assertTrue(params.shouldCalculateInterim()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java index 15bc5749f1a..c3601f84dd5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java @@ -14,7 +14,7 @@ import org.elasticsearch.xpack.ml.job.config.Operator; import org.elasticsearch.xpack.ml.job.config.RuleCondition; import org.elasticsearch.xpack.ml.job.config.RuleConditionType; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; -import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.junit.Before; import org.mockito.InOrder; @@ -39,12 +39,12 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { lengthEncodedWriter = Mockito.mock(LengthEncodedWriter.class); } - public void testWriteCalcInterimMessage_GivenAdvanceTime() throws IOException { + public void testWriteFlushControlMessage_GivenAdvanceTime() throws IOException { ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); - InterimResultsParams interimResultsParams = InterimResultsParams.builder() + FlushJobParams flushJobParams = FlushJobParams.builder() .advanceTime("1234567890").build(); - writer.writeCalcInterimMessage(interimResultsParams); + writer.writeFlushControlMessage(flushJobParams); InOrder inOrder = inOrder(lengthEncodedWriter); inOrder.verify(lengthEncodedWriter).writeNumFields(4); @@ -53,12 +53,12 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { verifyNoMoreInteractions(lengthEncodedWriter); } - public void testWriteCalcInterimMessage_GivenCalcInterimResultsWithNoTimeParams() throws IOException { + public void testWriteFlushControlMessage_GivenCalcInterimResultsWithNoTimeParams() throws IOException { ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); - InterimResultsParams interimResultsParams = InterimResultsParams.builder() + FlushJobParams flushJobParams = FlushJobParams.builder() .calcInterim(true).build(); - writer.writeCalcInterimMessage(interimResultsParams); + writer.writeFlushControlMessage(flushJobParams); InOrder inOrder = inOrder(lengthEncodedWriter); inOrder.verify(lengthEncodedWriter).writeNumFields(4); @@ -67,23 +67,23 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { verifyNoMoreInteractions(lengthEncodedWriter); } - public void testWriteCalcInterimMessage_GivenNeitherCalcInterimNorAdvanceTime() throws IOException { + public void testWriteFlushControlMessage_GivenNeitherCalcInterimNorAdvanceTime() throws IOException { ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); - InterimResultsParams interimResultsParams = InterimResultsParams.builder().build(); + FlushJobParams flushJobParams = FlushJobParams.builder().build(); - writer.writeCalcInterimMessage(interimResultsParams); + writer.writeFlushControlMessage(flushJobParams); verifyNoMoreInteractions(lengthEncodedWriter); } - public void testWriteCalcInterimMessage_GivenCalcInterimResultsWithTimeParams() throws IOException { + public void testWriteFlushControlMessage_GivenCalcInterimResultsWithTimeParams() throws IOException { ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); - InterimResultsParams interimResultsParams = InterimResultsParams.builder() + FlushJobParams flushJobParams = FlushJobParams.builder() .calcInterim(true) .forTimeRange(TimeRange.builder().startTime("120").endTime("180").build()) .build(); - writer.writeCalcInterimMessage(interimResultsParams); + writer.writeFlushControlMessage(flushJobParams); InOrder inOrder = inOrder(lengthEncodedWriter); inOrder.verify(lengthEncodedWriter).writeNumFields(4); @@ -92,15 +92,15 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { verifyNoMoreInteractions(lengthEncodedWriter); } - public void testWriteCalcInterimMessage_GivenCalcInterimAndAdvanceTime() throws IOException { + public void testWriteFlushControlMessage_GivenCalcInterimAndAdvanceTime() throws IOException { ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); - InterimResultsParams interimResultsParams = InterimResultsParams.builder() + FlushJobParams flushJobParams = FlushJobParams.builder() .calcInterim(true) .forTimeRange(TimeRange.builder().startTime("50").endTime("100").build()) .advanceTime("180") .build(); - writer.writeCalcInterimMessage(interimResultsParams); + writer.writeFlushControlMessage(flushJobParams); InOrder inOrder = inOrder(lengthEncodedWriter); inOrder.verify(lengthEncodedWriter).writeNumFields(4);