[ML] Improve naming of flush related methods (elastic/x-pack-elasticsearch#1872)

Original commit: elastic/x-pack-elasticsearch@eb3eb80b6e
This commit is contained in:
Dimitris Athanasiou 2017-06-28 13:32:51 +01:00 committed by GitHub
parent 7fff1567fe
commit 1a076e2eb9
14 changed files with 81 additions and 73 deletions

View File

@ -28,7 +28,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import java.io.IOException; import java.io.IOException;
@ -256,7 +256,7 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
@Override @Override
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) { protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
InterimResultsParams.Builder paramsBuilder = InterimResultsParams.builder(); FlushJobParams.Builder paramsBuilder = FlushJobParams.builder();
paramsBuilder.calcInterim(request.getCalcInterim()); paramsBuilder.calcInterim(request.getCalcInterim());
if (request.getAdvanceTime() != null) { if (request.getAdvanceTime() != null) {
paramsBuilder.advanceTime(request.getAdvanceTime()); paramsBuilder.advanceTime(request.getAdvanceTime());

View File

@ -24,7 +24,7 @@ import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
@ -196,7 +196,7 @@ public class AutodetectCommunicator implements Closeable {
}, handler); }, handler);
} }
public void flushJob(InterimResultsParams params, BiConsumer<Void, Exception> handler) { public void flushJob(FlushJobParams params, BiConsumer<Void, Exception> handler) {
submitOperation(() -> { submitOperation(() -> {
String flushId = autodetectProcess.flushJob(params); String flushId = autodetectProcess.flushJob(params);
waitFlushToCompletion(flushId); waitFlushToCompletion(flushId);

View File

@ -9,7 +9,7 @@ import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
@ -78,11 +78,12 @@ public interface AutodetectProcess extends Closeable {
* in a flush acknowledgment by the autodetect process once the flush has * in a flush acknowledgment by the autodetect process once the flush has
* been processed. * been processed.
* *
* @param params Should interim results be generated * @param params Parameters describing the controls that will accompany the flushing
* (e.g. calculating interim results, time control, etc.)
* @return The flush Id * @return The flush Id
* @throws IOException If the flush failed * @throws IOException If the flush failed
*/ */
String flushJob(InterimResultsParams params) throws IOException; String flushJob(FlushJobParams params) throws IOException;
/** /**
* Flush the output data stream * Flush the output data stream
@ -107,7 +108,7 @@ public interface AutodetectProcess extends Closeable {
/** /**
* Returns true if the process still running. * Returns true if the process still running.
* Methods such as {@link #flushJob(InterimResultsParams)} are essentially * Methods such as {@link #flushJob(FlushJobParams)} are essentially
* asynchronous the command will be continue to execute in the process after * asynchronous the command will be continue to execute in the process after
* the call has returned. This method tests whether something catastrophic * the call has returned. This method tests whether something catastrophic
* occurred in the process during its execution. * occurred in the process during its execution.

View File

@ -39,7 +39,7 @@ import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
@ -198,10 +198,10 @@ public class AutodetectProcessManager extends AbstractComponent {
* sitting in buffers. * sitting in buffers.
* *
* @param jobTask The job task * @param jobTask The job task
* @param params Parameters about whether interim results calculation * @param params Parameters describing the controls that will accompany the flushing
* should occur and for which period of time * (e.g. calculating interim results, time control, etc.)
*/ */
public void flushJob(JobTask jobTask, InterimResultsParams params, Consumer<Exception> handler) { public void flushJob(JobTask jobTask, FlushJobParams params, Consumer<Exception> handler) {
logger.debug("Flushing job {}", jobTask.getJobId()); logger.debug("Flushing job {}", jobTask.getJobId());
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobTask.getAllocationId()); AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobTask.getAllocationId());
if (communicator == null) { if (communicator == null) {

View File

@ -10,7 +10,7 @@ import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
/** /**
* A placeholder class simulating the actions of the native Autodetect process. * A placeholder class simulating the actions of the native Autodetect process.
* Most methods consume data without performing any action however, after a call to * Most methods consume data without performing any action however, after a call to
* {@link #flushJob(InterimResultsParams)} a {@link org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement} * {@link #flushJob(FlushJobParams)} a {@link org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement}
* message is expected on the {@link #readAutodetectResults()} ()} stream. This class writes the flush * message is expected on the {@link #readAutodetectResults()} ()} stream. This class writes the flush
* acknowledgement immediately. * acknowledgement immediately.
*/ */
@ -76,7 +76,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
* @return {@link #FLUSH_ID} * @return {@link #FLUSH_ID}
*/ */
@Override @Override
public String flushJob(InterimResultsParams params) throws IOException { public String flushJob(FlushJobParams params) throws IOException {
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID); FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID);
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement); AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement);
results.add(result); results.add(result);

View File

@ -16,7 +16,7 @@ import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter;
@ -157,9 +157,9 @@ class NativeAutodetectProcess implements AutodetectProcess {
} }
@Override @Override
public String flushJob(InterimResultsParams params) throws IOException { public String flushJob(FlushJobParams params) throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfAnalysisFields); ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfAnalysisFields);
writer.writeCalcInterimMessage(params); writer.writeFlushControlMessage(params);
return writer.writeFlushMessage(); return writer.writeFlushMessage();
} }

View File

@ -12,12 +12,12 @@ import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
import java.util.Objects; import java.util.Objects;
public class InterimResultsParams { public class FlushJobParams {
private final boolean calcInterim; private final boolean calcInterim;
private final TimeRange timeRange; private final TimeRange timeRange;
private final Long advanceTimeSeconds; private final Long advanceTimeSeconds;
private InterimResultsParams(boolean calcInterim, TimeRange timeRange, Long advanceTimeSeconds) { private FlushJobParams(boolean calcInterim, TimeRange timeRange, Long advanceTimeSeconds) {
this.calcInterim = calcInterim; this.calcInterim = calcInterim;
this.timeRange = Objects.requireNonNull(timeRange); this.timeRange = Objects.requireNonNull(timeRange);
this.advanceTimeSeconds = advanceTimeSeconds; this.advanceTimeSeconds = advanceTimeSeconds;
@ -54,7 +54,7 @@ public class InterimResultsParams {
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
InterimResultsParams that = (InterimResultsParams) o; FlushJobParams that = (FlushJobParams) o;
return calcInterim == that.calcInterim && return calcInterim == that.calcInterim &&
Objects.equals(timeRange, that.timeRange) && Objects.equals(timeRange, that.timeRange) &&
Objects.equals(advanceTimeSeconds, that.advanceTimeSeconds); Objects.equals(advanceTimeSeconds, that.advanceTimeSeconds);
@ -91,10 +91,10 @@ public class InterimResultsParams {
return this; return this;
} }
public InterimResultsParams build() { public FlushJobParams build() {
checkValidFlushArgumentsCombination(); checkValidFlushArgumentsCombination();
Long advanceTimeSeconds = checkAdvanceTimeParam(); Long advanceTimeSeconds = checkAdvanceTimeParam();
return new InterimResultsParams(calcInterim, timeRange, advanceTimeSeconds); return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds);
} }
private void checkValidFlushArgumentsCombination() { private void checkValidFlushArgumentsCombination() {

View File

@ -11,7 +11,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -95,12 +95,19 @@ public class ControlMsgToProcessWriter {
} }
/** /**
* Send an instruction to calculate interim results to the C++ autodetect process. * Writes the control messages that are requested when flushing a job.
* Those control messages need to be followed by a flush message in order
* for them to reach the C++ process immediately. List of supported controls:
* *
* @param params Parameters indicating whether interim results should be written * <ul>
* and for which buckets * <li>advance time</li>
* <li>calculate interim results</li>
* </ul>
*
* @param params Parameters describing the controls that will accompany the flushing
* (e.g. calculating interim results, time control, etc.)
*/ */
public void writeCalcInterimMessage(InterimResultsParams params) throws IOException { public void writeFlushControlMessage(FlushJobParams params) throws IOException {
if (params.shouldAdvanceTime()) { if (params.shouldAdvanceTime()) {
writeMessage(ADVANCE_TIME_MESSAGE_CODE + params.getAdvanceTime()); writeMessage(ADVANCE_TIME_MESSAGE_CODE + params.getAdvanceTime());
} }

View File

@ -19,7 +19,7 @@ import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.junit.Before; import org.junit.Before;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -72,7 +72,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class); AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class);
when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(true); when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(true);
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor)) { try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor)) {
InterimResultsParams params = InterimResultsParams.builder().build(); FlushJobParams params = FlushJobParams.builder().build();
communicator.flushJob(params, (aVoid, e) -> {}); communicator.flushJob(params, (aVoid, e) -> {});
Mockito.verify(process).flushJob(params); Mockito.verify(process).flushJob(params);
} }
@ -93,7 +93,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
when(process.isProcessAlive()).thenReturn(false); when(process.isProcessAlive()).thenReturn(false);
when(process.readError()).thenReturn("Mock process is dead"); when(process.readError()).thenReturn("Mock process is dead");
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class));
InterimResultsParams params = InterimResultsParams.builder().build(); FlushJobParams params = FlushJobParams.builder().build();
Exception[] holder = new ElasticsearchException[1]; Exception[] holder = new ElasticsearchException[1];
communicator.flushJob(params, (aVoid, e1) -> holder[0] = e1); communicator.flushJob(params, (aVoid, e1) -> holder[0] = e1);
assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", holder[0].getMessage()); assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", holder[0].getMessage());
@ -105,7 +105,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class); AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class);
when(autoDetectResultProcessor.waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1)))) when(autoDetectResultProcessor.waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1))))
.thenReturn(false).thenReturn(true); .thenReturn(false).thenReturn(true);
InterimResultsParams params = InterimResultsParams.builder().build(); FlushJobParams params = FlushJobParams.builder().build();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autoDetectResultProcessor)) { try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autoDetectResultProcessor)) {
communicator.flushJob(params, (aVoid, e) -> {}); communicator.flushJob(params, (aVoid, e) -> {});

View File

@ -33,7 +33,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
@ -298,7 +298,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
manager.processData(jobTask, inputStream, randomFrom(XContentType.values()), manager.processData(jobTask, inputStream, randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {}); mock(DataLoadParams.class), (dataCounts1, e) -> {});
InterimResultsParams params = InterimResultsParams.builder().build(); FlushJobParams params = FlushJobParams.builder().build();
manager.flushJob(jobTask, params, e -> {}); manager.flushJob(jobTask, params, e -> {});
verify(communicator).flushJob(same(params), any()); verify(communicator).flushJob(same(params), any());
@ -308,7 +308,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo"); AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo");
InterimResultsParams params = InterimResultsParams.builder().build(); FlushJobParams params = FlushJobParams.builder().build();
doAnswer(invocationOnMock -> { doAnswer(invocationOnMock -> {
BiConsumer<Void, Exception> handler = (BiConsumer<Void, Exception>) invocationOnMock.getArguments()[1]; BiConsumer<Void, Exception> handler = (BiConsumer<Void, Exception>) invocationOnMock.getArguments()[1];
handler.accept(null, new IOException("blah")); handler.accept(null, new IOException("blah"));

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import java.util.Iterator; import java.util.Iterator;
@ -16,7 +16,7 @@ public class BlackHoleAutodetectProcessTests extends ESTestCase {
public void testFlushJob_writesAck() throws Exception { public void testFlushJob_writesAck() throws Exception {
try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess("foo")) { try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess("foo")) {
String flushId = process.flushJob(InterimResultsParams.builder().build()); String flushId = process.flushJob(FlushJobParams.builder().build());
Iterator<AutodetectResult> iterator = process.readAutodetectResults(); Iterator<AutodetectResult> iterator = process.readAutodetectResults();
iterator.hasNext(); iterator.hasNext();
AutodetectResult result = iterator.next(); AutodetectResult result = iterator.next();

View File

@ -11,7 +11,7 @@ import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter;
import org.junit.Assert; import org.junit.Assert;
@ -110,7 +110,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) { new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) {
process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
InterimResultsParams params = InterimResultsParams.builder().build(); FlushJobParams params = FlushJobParams.builder().build();
process.flushJob(params); process.flushJob(params);
ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray()); ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());

View File

@ -7,9 +7,9 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.params;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
public class InterimResultsParamsTests extends ESTestCase { public class FlushJobParamsTests extends ESTestCase {
public void testBuilder_GivenDefault() { public void testBuilder_GivenDefault() {
InterimResultsParams params = InterimResultsParams.builder().build(); FlushJobParams params = FlushJobParams.builder().build();
assertFalse(params.shouldCalculateInterim()); assertFalse(params.shouldCalculateInterim());
assertFalse(params.shouldAdvanceTime()); assertFalse(params.shouldAdvanceTime());
assertEquals("", params.getStart()); assertEquals("", params.getStart());
@ -18,7 +18,7 @@ public class InterimResultsParamsTests extends ESTestCase {
public void testBuilder_GivenCalcInterim() { public void testBuilder_GivenCalcInterim() {
InterimResultsParams params = InterimResultsParams.builder().calcInterim(true).build(); FlushJobParams params = FlushJobParams.builder().calcInterim(true).build();
assertTrue(params.shouldCalculateInterim()); assertTrue(params.shouldCalculateInterim());
assertFalse(params.shouldAdvanceTime()); assertFalse(params.shouldAdvanceTime());
assertEquals("", params.getStart()); assertEquals("", params.getStart());
@ -27,7 +27,7 @@ public class InterimResultsParamsTests extends ESTestCase {
public void testBuilder_GivenCalcInterimAndStart() { public void testBuilder_GivenCalcInterimAndStart() {
InterimResultsParams params = InterimResultsParams.builder() FlushJobParams params = FlushJobParams.builder()
.calcInterim(true) .calcInterim(true)
.forTimeRange(TimeRange.builder().startTime("42").build()) .forTimeRange(TimeRange.builder().startTime("42").build())
.build(); .build();
@ -39,7 +39,7 @@ public class InterimResultsParamsTests extends ESTestCase {
public void testBuilder_GivenCalcInterimAndEnd_throws() { public void testBuilder_GivenCalcInterimAndEnd_throws() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> InterimResultsParams.builder() () -> FlushJobParams.builder()
.calcInterim(true) .calcInterim(true)
.forTimeRange(TimeRange.builder().endTime("100").build()) .forTimeRange(TimeRange.builder().endTime("100").build())
.build()); .build());
@ -49,7 +49,7 @@ public class InterimResultsParamsTests extends ESTestCase {
public void testBuilder_GivenCalcInterimAndStartAndEnd() { public void testBuilder_GivenCalcInterimAndStartAndEnd() {
InterimResultsParams params = InterimResultsParams.builder() FlushJobParams params = FlushJobParams.builder()
.calcInterim(true) .calcInterim(true)
.forTimeRange(TimeRange.builder().startTime("3600").endTime("7200").build()) .forTimeRange(TimeRange.builder().startTime("3600").endTime("7200").build())
.build(); .build();
@ -61,7 +61,7 @@ public class InterimResultsParamsTests extends ESTestCase {
public void testBuilder_GivenAdvanceTime() { public void testBuilder_GivenAdvanceTime() {
InterimResultsParams params = InterimResultsParams.builder().advanceTime("1821").build(); FlushJobParams params = FlushJobParams.builder().advanceTime("1821").build();
assertFalse(params.shouldCalculateInterim()); assertFalse(params.shouldCalculateInterim());
assertEquals("", params.getStart()); assertEquals("", params.getStart());
assertEquals("", params.getEnd()); assertEquals("", params.getEnd());
@ -71,7 +71,7 @@ public class InterimResultsParamsTests extends ESTestCase {
public void testBuilder_GivenCalcInterimAndAdvanceTime() { public void testBuilder_GivenCalcInterimAndAdvanceTime() {
InterimResultsParams params = InterimResultsParams.builder() FlushJobParams params = FlushJobParams.builder()
.calcInterim(true) .calcInterim(true)
.advanceTime("1940") .advanceTime("1940")
.build(); .build();
@ -84,7 +84,7 @@ public class InterimResultsParamsTests extends ESTestCase {
public void testBuilder_GivenCalcInterimWithTimeRangeAndAdvanceTime() { public void testBuilder_GivenCalcInterimWithTimeRangeAndAdvanceTime() {
InterimResultsParams params = InterimResultsParams.builder() FlushJobParams params = FlushJobParams.builder()
.calcInterim(true) .calcInterim(true)
.forTimeRange(TimeRange.builder().startTime("1").endTime("2").build()) .forTimeRange(TimeRange.builder().startTime("1").endTime("2").build())
.advanceTime("1940") .advanceTime("1940")
@ -98,27 +98,27 @@ public class InterimResultsParamsTests extends ESTestCase {
public void testValidate_GivenOnlyStartSpecified() { public void testValidate_GivenOnlyStartSpecified() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> InterimResultsParams.builder().forTimeRange(TimeRange.builder().startTime("1").build()).build()); () -> FlushJobParams.builder().forTimeRange(TimeRange.builder().startTime("1").build()).build());
assertEquals("Invalid flush parameters: unexpected 'start'.", e.getMessage()); assertEquals("Invalid flush parameters: unexpected 'start'.", e.getMessage());
} }
public void testFlushUpload_GivenOnlyEndSpecified() { public void testFlushUpload_GivenOnlyEndSpecified() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> InterimResultsParams.builder().forTimeRange(TimeRange.builder().endTime("1").build()).build()); () -> FlushJobParams.builder().forTimeRange(TimeRange.builder().endTime("1").build()).build());
assertEquals("Invalid flush parameters: unexpected 'end'.", e.getMessage()); assertEquals("Invalid flush parameters: unexpected 'end'.", e.getMessage());
} }
public void testFlushUpload_GivenInterimResultsAndOnlyEndSpecified() { public void testFlushUpload_GivenInterimResultsAndOnlyEndSpecified() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> InterimResultsParams.builder().calcInterim(true).forTimeRange(TimeRange.builder().endTime("1").build()).build()); () -> FlushJobParams.builder().calcInterim(true).forTimeRange(TimeRange.builder().endTime("1").build()).build());
assertEquals("Invalid flush parameters: 'start' has not been specified.", e.getMessage()); assertEquals("Invalid flush parameters: 'start' has not been specified.", e.getMessage());
} }
public void testFlushUpload_GivenInterimResultsAndStartAndEndSpecifiedAsEpochs() { public void testFlushUpload_GivenInterimResultsAndStartAndEndSpecifiedAsEpochs() {
InterimResultsParams params = InterimResultsParams.builder().calcInterim(true) FlushJobParams params = FlushJobParams.builder().calcInterim(true)
.forTimeRange(TimeRange.builder().startTime("1428494400").endTime("1428498000").build()).build(); .forTimeRange(TimeRange.builder().startTime("1428494400").endTime("1428498000").build()).build();
assertTrue(params.shouldCalculateInterim()); assertTrue(params.shouldCalculateInterim());
assertFalse(params.shouldAdvanceTime()); assertFalse(params.shouldAdvanceTime());
@ -128,7 +128,7 @@ public class InterimResultsParamsTests extends ESTestCase {
public void testFlushUpload_GivenInterimResultsAndSameStartAndEnd() { public void testFlushUpload_GivenInterimResultsAndSameStartAndEnd() {
InterimResultsParams params = InterimResultsParams.builder().calcInterim(true) FlushJobParams params = FlushJobParams.builder().calcInterim(true)
.forTimeRange(TimeRange.builder().startTime("1428494400").endTime("1428494400").build()).build(); .forTimeRange(TimeRange.builder().startTime("1428494400").endTime("1428494400").build()).build();
assertTrue(params.shouldCalculateInterim()); assertTrue(params.shouldCalculateInterim());
@ -138,7 +138,7 @@ public class InterimResultsParamsTests extends ESTestCase {
} }
public void testFlushUpload_GivenInterimResultsAndOnlyStartSpecified() { public void testFlushUpload_GivenInterimResultsAndOnlyStartSpecified() {
InterimResultsParams params = InterimResultsParams.builder().calcInterim(true) FlushJobParams params = FlushJobParams.builder().calcInterim(true)
.forTimeRange(TimeRange.builder().startTime("1428494400").build()).build(); .forTimeRange(TimeRange.builder().startTime("1428494400").build()).build();
assertTrue(params.shouldCalculateInterim()); assertTrue(params.shouldCalculateInterim());
@ -148,7 +148,7 @@ public class InterimResultsParamsTests extends ESTestCase {
} }
public void testFlushUpload_GivenValidAdvanceTime() { public void testFlushUpload_GivenValidAdvanceTime() {
InterimResultsParams params = InterimResultsParams.builder().advanceTime("2015-04-08T13:00:00.000Z").build(); FlushJobParams params = FlushJobParams.builder().advanceTime("2015-04-08T13:00:00.000Z").build();
assertFalse(params.shouldCalculateInterim()); assertFalse(params.shouldCalculateInterim());
assertEquals("", params.getStart()); assertEquals("", params.getStart());
assertEquals("", params.getEnd()); assertEquals("", params.getEnd());
@ -157,7 +157,7 @@ public class InterimResultsParamsTests extends ESTestCase {
} }
public void testFlushUpload_GivenCalcInterimAndAdvanceTime() { public void testFlushUpload_GivenCalcInterimAndAdvanceTime() {
InterimResultsParams params = InterimResultsParams.builder().calcInterim(true).advanceTime("3600").build(); FlushJobParams params = FlushJobParams.builder().calcInterim(true).advanceTime("3600").build();
assertTrue(params.shouldCalculateInterim()); assertTrue(params.shouldCalculateInterim());
assertEquals("", params.getStart()); assertEquals("", params.getStart());
assertEquals("", params.getEnd()); assertEquals("", params.getEnd());
@ -166,7 +166,7 @@ public class InterimResultsParamsTests extends ESTestCase {
} }
public void testFlushUpload_GivenCalcInterimWithTimeRangeAndAdvanceTime() { public void testFlushUpload_GivenCalcInterimWithTimeRangeAndAdvanceTime() {
InterimResultsParams params = InterimResultsParams.builder().calcInterim(true) FlushJobParams params = FlushJobParams.builder().calcInterim(true)
.forTimeRange(TimeRange.builder().startTime("150").endTime("300").build()) .forTimeRange(TimeRange.builder().startTime("150").endTime("300").build())
.advanceTime("200").build(); .advanceTime("200").build();
assertTrue(params.shouldCalculateInterim()); assertTrue(params.shouldCalculateInterim());

View File

@ -14,7 +14,7 @@ import org.elasticsearch.xpack.ml.job.config.Operator;
import org.elasticsearch.xpack.ml.job.config.RuleCondition; import org.elasticsearch.xpack.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.ml.job.config.RuleConditionType; import org.elasticsearch.xpack.ml.job.config.RuleConditionType;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.junit.Before; import org.junit.Before;
import org.mockito.InOrder; import org.mockito.InOrder;
@ -39,12 +39,12 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
lengthEncodedWriter = Mockito.mock(LengthEncodedWriter.class); lengthEncodedWriter = Mockito.mock(LengthEncodedWriter.class);
} }
public void testWriteCalcInterimMessage_GivenAdvanceTime() throws IOException { public void testWriteFlushControlMessage_GivenAdvanceTime() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
InterimResultsParams interimResultsParams = InterimResultsParams.builder() FlushJobParams flushJobParams = FlushJobParams.builder()
.advanceTime("1234567890").build(); .advanceTime("1234567890").build();
writer.writeCalcInterimMessage(interimResultsParams); writer.writeFlushControlMessage(flushJobParams);
InOrder inOrder = inOrder(lengthEncodedWriter); InOrder inOrder = inOrder(lengthEncodedWriter);
inOrder.verify(lengthEncodedWriter).writeNumFields(4); inOrder.verify(lengthEncodedWriter).writeNumFields(4);
@ -53,12 +53,12 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
verifyNoMoreInteractions(lengthEncodedWriter); verifyNoMoreInteractions(lengthEncodedWriter);
} }
public void testWriteCalcInterimMessage_GivenCalcInterimResultsWithNoTimeParams() throws IOException { public void testWriteFlushControlMessage_GivenCalcInterimResultsWithNoTimeParams() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
InterimResultsParams interimResultsParams = InterimResultsParams.builder() FlushJobParams flushJobParams = FlushJobParams.builder()
.calcInterim(true).build(); .calcInterim(true).build();
writer.writeCalcInterimMessage(interimResultsParams); writer.writeFlushControlMessage(flushJobParams);
InOrder inOrder = inOrder(lengthEncodedWriter); InOrder inOrder = inOrder(lengthEncodedWriter);
inOrder.verify(lengthEncodedWriter).writeNumFields(4); inOrder.verify(lengthEncodedWriter).writeNumFields(4);
@ -67,23 +67,23 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
verifyNoMoreInteractions(lengthEncodedWriter); verifyNoMoreInteractions(lengthEncodedWriter);
} }
public void testWriteCalcInterimMessage_GivenNeitherCalcInterimNorAdvanceTime() throws IOException { public void testWriteFlushControlMessage_GivenNeitherCalcInterimNorAdvanceTime() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
InterimResultsParams interimResultsParams = InterimResultsParams.builder().build(); FlushJobParams flushJobParams = FlushJobParams.builder().build();
writer.writeCalcInterimMessage(interimResultsParams); writer.writeFlushControlMessage(flushJobParams);
verifyNoMoreInteractions(lengthEncodedWriter); verifyNoMoreInteractions(lengthEncodedWriter);
} }
public void testWriteCalcInterimMessage_GivenCalcInterimResultsWithTimeParams() throws IOException { public void testWriteFlushControlMessage_GivenCalcInterimResultsWithTimeParams() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
InterimResultsParams interimResultsParams = InterimResultsParams.builder() FlushJobParams flushJobParams = FlushJobParams.builder()
.calcInterim(true) .calcInterim(true)
.forTimeRange(TimeRange.builder().startTime("120").endTime("180").build()) .forTimeRange(TimeRange.builder().startTime("120").endTime("180").build())
.build(); .build();
writer.writeCalcInterimMessage(interimResultsParams); writer.writeFlushControlMessage(flushJobParams);
InOrder inOrder = inOrder(lengthEncodedWriter); InOrder inOrder = inOrder(lengthEncodedWriter);
inOrder.verify(lengthEncodedWriter).writeNumFields(4); inOrder.verify(lengthEncodedWriter).writeNumFields(4);
@ -92,15 +92,15 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
verifyNoMoreInteractions(lengthEncodedWriter); verifyNoMoreInteractions(lengthEncodedWriter);
} }
public void testWriteCalcInterimMessage_GivenCalcInterimAndAdvanceTime() throws IOException { public void testWriteFlushControlMessage_GivenCalcInterimAndAdvanceTime() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
InterimResultsParams interimResultsParams = InterimResultsParams.builder() FlushJobParams flushJobParams = FlushJobParams.builder()
.calcInterim(true) .calcInterim(true)
.forTimeRange(TimeRange.builder().startTime("50").endTime("100").build()) .forTimeRange(TimeRange.builder().startTime("50").endTime("100").build())
.advanceTime("180") .advanceTime("180")
.build(); .build();
writer.writeCalcInterimMessage(interimResultsParams); writer.writeFlushControlMessage(flushJobParams);
InOrder inOrder = inOrder(lengthEncodedWriter); InOrder inOrder = inOrder(lengthEncodedWriter);
inOrder.verify(lengthEncodedWriter).writeNumFields(4); inOrder.verify(lengthEncodedWriter).writeNumFields(4);