diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index 112805b2f74..3dc72cce157 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -18,7 +18,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResult import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; -import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter; +import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AutodetectControlMsgWriter; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.process.AbstractNativeProcess; @@ -94,7 +94,7 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec @Override public String flushJob(FlushJobParams params) throws IOException { - ControlMsgToProcessWriter writer = newMessageWriter(); + AutodetectControlMsgWriter writer = newMessageWriter(); writer.writeFlushControlMessage(params); return writer.writeFlushMessage(); } @@ -114,7 +114,7 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec return resultsParser.parseResults(processOutStream()); } - private ControlMsgToProcessWriter newMessageWriter() { - return new ControlMsgToProcessWriter(recordWriter(), numberOfFields()); + private AutodetectControlMsgWriter newMessageWriter() { + return new AutodetectControlMsgWriter(recordWriter(), numberOfFields()); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java similarity index 83% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java index fc98990d8d6..e0ed7458b1d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java @@ -18,26 +18,20 @@ import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; +import org.elasticsearch.xpack.ml.process.writer.AbstractControlMsgWriter; import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter; import java.io.IOException; import java.io.OutputStream; import java.io.StringWriter; -import java.util.Arrays; import java.util.List; -import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; /** * A writer for sending control messages to the C++ autodetect process. * The data written to outputIndex is length encoded. */ -public class ControlMsgToProcessWriter { - - /** - * This should be the same size as the buffer in the C++ autodetect process. - */ - public static final int FLUSH_SPACES_LENGTH = 8192; +public class AutodetectControlMsgWriter extends AbstractControlMsgWriter { /** * This must match the code defined in the api::CAnomalyJob C++ class. @@ -85,18 +79,14 @@ public class ControlMsgToProcessWriter { */ private static AtomicLong ms_FlushNumber = new AtomicLong(1); - private final LengthEncodedWriter lengthEncodedWriter; - private final int numberOfFields; - /** * Construct the control message writer with a LengthEncodedWriter * * @param lengthEncodedWriter The writer * @param numberOfFields The number of fields the process expects in each record */ - public ControlMsgToProcessWriter(LengthEncodedWriter lengthEncodedWriter, int numberOfFields) { - this.lengthEncodedWriter = Objects.requireNonNull(lengthEncodedWriter); - this.numberOfFields = numberOfFields; + public AutodetectControlMsgWriter(LengthEncodedWriter lengthEncodedWriter, int numberOfFields) { + super(lengthEncodedWriter, numberOfFields); } /** @@ -106,8 +96,8 @@ public class ControlMsgToProcessWriter { * @param os The output stream * @param numberOfFields The number of fields the process expects in each record */ - public static ControlMsgToProcessWriter create(OutputStream os, int numberOfFields) { - return new ControlMsgToProcessWriter(new LengthEncodedWriter(os), numberOfFields); + public static AutodetectControlMsgWriter create(OutputStream os, int numberOfFields) { + return new AutodetectControlMsgWriter(new LengthEncodedWriter(os), numberOfFields); } /** @@ -175,14 +165,6 @@ public class ControlMsgToProcessWriter { lengthEncodedWriter.flush(); } - // todo(hendrikm): workaround, see - // https://github.com/elastic/machine-learning-cpp/issues/123 - private void fillCommandBuffer() throws IOException { - char[] spaces = new char[FLUSH_SPACES_LENGTH]; - Arrays.fill(spaces, ' '); - writeMessage(new String(spaces)); - } - public void writeResetBucketsMessage(DataLoadParams params) throws IOException { writeControlCodeFollowedByTimeRange(RESET_BUCKETS_MESSAGE_CODE, params.getStart(), params.getEnd()); } @@ -247,25 +229,4 @@ public class ControlMsgToProcessWriter { fillCommandBuffer(); lengthEncodedWriter.flush(); } - - /** - * Transform the supplied control message to length encoded values and - * write to the OutputStream. - * The number of blank fields to make up a full record is deduced from - * analysisConfig. - * - * @param message The control message to write. - */ - private void writeMessage(String message) throws IOException { - - lengthEncodedWriter.writeNumFields(numberOfFields); - - // Write blank values for all fields other than the control field - for (int i = 1; i < numberOfFields; ++i) { - lengthEncodedWriter.writeField(""); - } - - // The control field comes last - lengthEncodedWriter.writeField(message); - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/AbstractControlMsgWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/AbstractControlMsgWriter.java new file mode 100644 index 00000000000..8d84c86f4a7 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/AbstractControlMsgWriter.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.process.writer; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; + +/** + * A writer for sending control messages to the a native C++ process. + */ +public abstract class AbstractControlMsgWriter { + + /** + * This should be the same size as the buffer in the C++ native process. + */ + public static final int FLUSH_SPACES_LENGTH = 8192; + + protected final LengthEncodedWriter lengthEncodedWriter; + private final int numberOfFields; + + /** + * Construct the control message writer with a LengthEncodedWriter + * + * @param lengthEncodedWriter The writer + * @param numberOfFields The number of fields the process expects in each record + */ + public AbstractControlMsgWriter(LengthEncodedWriter lengthEncodedWriter, int numberOfFields) { + this.lengthEncodedWriter = Objects.requireNonNull(lengthEncodedWriter); + this.numberOfFields = numberOfFields; + } + + // todo(hendrikm): workaround, see + // https://github.com/elastic/machine-learning-cpp/issues/123 + protected void fillCommandBuffer() throws IOException { + char[] spaces = new char[FLUSH_SPACES_LENGTH]; + Arrays.fill(spaces, ' '); + writeMessage(new String(spaces)); + } + + /** + * Transform the supplied control message to length encoded values and + * write to the OutputStream. + * + * @param message The control message to write. + */ + protected void writeMessage(String message) throws IOException { + + lengthEncodedWriter.writeNumFields(numberOfFields); + + // Write blank values for all fields other than the control field + for (int i = 1; i < numberOfFields; ++i) { + lengthEncodedWriter.writeField(""); + } + + // The control field comes last + lengthEncodedWriter.writeField(message); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 18ee9434f0d..52910fc6139 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -12,7 +12,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectStateP import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; -import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter; +import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AutodetectControlMsgWriter; import org.junit.Assert; import org.junit.Before; @@ -103,7 +103,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { public void testFlush() throws IOException { InputStream logStream = mock(InputStream.class); when(logStream.read(new byte[1024])).thenReturn(-1); - ByteArrayOutputStream bos = new ByteArrayOutputStream(ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH + 1024); + ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, mock(InputStream.class), mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), new AutodetectResultsParser(), mock(Runnable.class))) { @@ -113,21 +113,21 @@ public class NativeAutodetectProcessTests extends ESTestCase { process.flushJob(params); ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray()); - assertThat(bb.remaining(), is(greaterThan(ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH))); + assertThat(bb.remaining(), is(greaterThan(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH))); } } public void testWriteResetBucketsControlMessage() throws IOException { DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), Optional.empty()); - testWriteMessage(p -> p.writeResetBucketsControlMessage(params), ControlMsgToProcessWriter.RESET_BUCKETS_MESSAGE_CODE); + testWriteMessage(p -> p.writeResetBucketsControlMessage(params), AutodetectControlMsgWriter.RESET_BUCKETS_MESSAGE_CODE); } public void testWriteUpdateConfigMessage() throws IOException { - testWriteMessage(p -> p.writeUpdateModelPlotMessage(new ModelPlotConfig()), ControlMsgToProcessWriter.UPDATE_MESSAGE_CODE); + testWriteMessage(p -> p.writeUpdateModelPlotMessage(new ModelPlotConfig()), AutodetectControlMsgWriter.UPDATE_MESSAGE_CODE); } public void testPersistJob() throws IOException { - testWriteMessage(p -> p.persistState(), ControlMsgToProcessWriter.BACKGROUND_PERSIST_MESSAGE_CODE); + testWriteMessage(p -> p.persistState(), AutodetectControlMsgWriter.BACKGROUND_PERSIST_MESSAGE_CODE); } public void testWriteMessage(CheckedConsumer writeFunction, String expectedMessageCode) throws IOException { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java similarity index 88% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java index 57554227e9a..27de6633712 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java @@ -35,7 +35,7 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verifyNoMoreInteractions; -public class ControlMsgToProcessWriterTests extends ESTestCase { +public class AutodetectControlMsgWriterTests extends ESTestCase { private LengthEncodedWriter lengthEncodedWriter; @Before @@ -44,7 +44,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteFlushControlMessage_GivenAdvanceTime() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4); FlushJobParams flushJobParams = FlushJobParams.builder().advanceTime("1234567890").build(); writer.writeFlushControlMessage(flushJobParams); @@ -57,7 +57,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteFlushControlMessage_GivenSkipTime() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4); FlushJobParams flushJobParams = FlushJobParams.builder().skipTime("1234567890").build(); writer.writeFlushControlMessage(flushJobParams); @@ -70,7 +70,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteFlushControlMessage_GivenSkipAndAdvanceTime() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4); FlushJobParams flushJobParams = FlushJobParams.builder().skipTime("1000").advanceTime("2000").build(); writer.writeFlushControlMessage(flushJobParams); @@ -81,7 +81,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteFlushControlMessage_GivenCalcInterimResultsWithNoTimeParams() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4); FlushJobParams flushJobParams = FlushJobParams.builder() .calcInterim(true).build(); @@ -95,7 +95,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteFlushControlMessage_GivenPlainFlush() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4); FlushJobParams flushJobParams = FlushJobParams.builder().build(); writer.writeFlushControlMessage(flushJobParams); @@ -104,7 +104,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteFlushControlMessage_GivenCalcInterimResultsWithTimeParams() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4); FlushJobParams flushJobParams = FlushJobParams.builder() .calcInterim(true) .forTimeRange(TimeRange.builder().startTime("120").endTime("180").build()) @@ -120,7 +120,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteFlushControlMessage_GivenCalcInterimAndAdvanceTime() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4); FlushJobParams flushJobParams = FlushJobParams.builder() .calcInterim(true) .forTimeRange(TimeRange.builder().startTime("50").endTime("100").build()) @@ -140,7 +140,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteFlushMessage() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4); long firstId = Long.parseLong(writer.writeFlushMessage()); Mockito.reset(lengthEncodedWriter); @@ -163,7 +163,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteResetBucketsMessage() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4); writer.writeResetBucketsMessage( new DataLoadParams(TimeRange.builder().startTime("0").endTime("600").build(), Optional.empty())); @@ -176,7 +176,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteUpdateModelPlotMessage() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4); writer.writeUpdateModelPlotMessage(new ModelPlotConfig(true, "foo,bar")); @@ -188,7 +188,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteUpdateDetectorRulesMessage() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4); DetectionRule rule1 = new DetectionRule.Builder(createRule(5)).build(); DetectionRule rule2 = new DetectionRule.Builder(createRule(5)).build(); @@ -206,7 +206,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteUpdateFiltersMessage() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 2); MlFilter filter1 = MlFilter.builder("filter_1").setItems("a").build(); MlFilter filter2 = MlFilter.builder("filter_2").setItems("b", "c").build(); @@ -221,7 +221,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteUpdateScheduledEventsMessage() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 2); ScheduledEvent.Builder event1 = new ScheduledEvent.Builder(); event1.calendarId("moon"); @@ -255,7 +255,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteUpdateScheduledEventsMessage_GivenEmpty() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 2); writer.writeUpdateScheduledEventsMessage(Collections.emptyList(), TimeValue.timeValueHours(1)); @@ -267,7 +267,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { } public void testWriteStartBackgroundPersistMessage() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 2); writer.writeStartBackgroundPersistMessage(); InOrder inOrder = inOrder(lengthEncodedWriter); @@ -278,7 +278,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { inOrder.verify(lengthEncodedWriter).writeNumFields(2); inOrder.verify(lengthEncodedWriter).writeField(""); StringBuilder spaces = new StringBuilder(); - IntStream.rangeClosed(1, ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH).forEach(i -> spaces.append(' ')); + IntStream.rangeClosed(1, AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH).forEach(i -> spaces.append(' ')); inOrder.verify(lengthEncodedWriter).writeField(spaces.toString()); inOrder.verify(lengthEncodedWriter).flush();