[ML] Refactor control message writer to allow reuse for other processes (#36070)

This commit is contained in:
Dimitris Athanasiou 2018-11-30 09:25:35 +00:00 committed by GitHub
parent d128b38116
commit 54cf1f9d74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 95 additions and 72 deletions

View File

@ -18,7 +18,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResult
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AutodetectControlMsgWriter;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.process.AbstractNativeProcess;
@ -94,7 +94,7 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec
@Override
public String flushJob(FlushJobParams params) throws IOException {
ControlMsgToProcessWriter writer = newMessageWriter();
AutodetectControlMsgWriter writer = newMessageWriter();
writer.writeFlushControlMessage(params);
return writer.writeFlushMessage();
}
@ -114,7 +114,7 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec
return resultsParser.parseResults(processOutStream());
}
private ControlMsgToProcessWriter newMessageWriter() {
return new ControlMsgToProcessWriter(recordWriter(), numberOfFields());
private AutodetectControlMsgWriter newMessageWriter() {
return new AutodetectControlMsgWriter(recordWriter(), numberOfFields());
}
}

View File

@ -18,26 +18,20 @@ import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.ml.process.writer.AbstractControlMsgWriter;
import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
/**
* A writer for sending control messages to the C++ autodetect process.
* The data written to outputIndex is length encoded.
*/
public class ControlMsgToProcessWriter {
/**
* This should be the same size as the buffer in the C++ autodetect process.
*/
public static final int FLUSH_SPACES_LENGTH = 8192;
public class AutodetectControlMsgWriter extends AbstractControlMsgWriter {
/**
* This must match the code defined in the api::CAnomalyJob C++ class.
@ -85,18 +79,14 @@ public class ControlMsgToProcessWriter {
*/
private static AtomicLong ms_FlushNumber = new AtomicLong(1);
private final LengthEncodedWriter lengthEncodedWriter;
private final int numberOfFields;
/**
* Construct the control message writer with a LengthEncodedWriter
*
* @param lengthEncodedWriter The writer
* @param numberOfFields The number of fields the process expects in each record
*/
public ControlMsgToProcessWriter(LengthEncodedWriter lengthEncodedWriter, int numberOfFields) {
this.lengthEncodedWriter = Objects.requireNonNull(lengthEncodedWriter);
this.numberOfFields = numberOfFields;
public AutodetectControlMsgWriter(LengthEncodedWriter lengthEncodedWriter, int numberOfFields) {
super(lengthEncodedWriter, numberOfFields);
}
/**
@ -106,8 +96,8 @@ public class ControlMsgToProcessWriter {
* @param os The output stream
* @param numberOfFields The number of fields the process expects in each record
*/
public static ControlMsgToProcessWriter create(OutputStream os, int numberOfFields) {
return new ControlMsgToProcessWriter(new LengthEncodedWriter(os), numberOfFields);
public static AutodetectControlMsgWriter create(OutputStream os, int numberOfFields) {
return new AutodetectControlMsgWriter(new LengthEncodedWriter(os), numberOfFields);
}
/**
@ -175,14 +165,6 @@ public class ControlMsgToProcessWriter {
lengthEncodedWriter.flush();
}
// todo(hendrikm): workaround, see
// https://github.com/elastic/machine-learning-cpp/issues/123
private void fillCommandBuffer() throws IOException {
char[] spaces = new char[FLUSH_SPACES_LENGTH];
Arrays.fill(spaces, ' ');
writeMessage(new String(spaces));
}
public void writeResetBucketsMessage(DataLoadParams params) throws IOException {
writeControlCodeFollowedByTimeRange(RESET_BUCKETS_MESSAGE_CODE, params.getStart(), params.getEnd());
}
@ -247,25 +229,4 @@ public class ControlMsgToProcessWriter {
fillCommandBuffer();
lengthEncodedWriter.flush();
}
/**
* Transform the supplied control message to length encoded values and
* write to the OutputStream.
* The number of blank fields to make up a full record is deduced from
* <code>analysisConfig</code>.
*
* @param message The control message to write.
*/
private void writeMessage(String message) throws IOException {
lengthEncodedWriter.writeNumFields(numberOfFields);
// Write blank values for all fields other than the control field
for (int i = 1; i < numberOfFields; ++i) {
lengthEncodedWriter.writeField("");
}
// The control field comes last
lengthEncodedWriter.writeField(message);
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.process.writer;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
/**
* A writer for sending control messages to the a native C++ process.
*/
public abstract class AbstractControlMsgWriter {
/**
* This should be the same size as the buffer in the C++ native process.
*/
public static final int FLUSH_SPACES_LENGTH = 8192;
protected final LengthEncodedWriter lengthEncodedWriter;
private final int numberOfFields;
/**
* Construct the control message writer with a LengthEncodedWriter
*
* @param lengthEncodedWriter The writer
* @param numberOfFields The number of fields the process expects in each record
*/
public AbstractControlMsgWriter(LengthEncodedWriter lengthEncodedWriter, int numberOfFields) {
this.lengthEncodedWriter = Objects.requireNonNull(lengthEncodedWriter);
this.numberOfFields = numberOfFields;
}
// todo(hendrikm): workaround, see
// https://github.com/elastic/machine-learning-cpp/issues/123
protected void fillCommandBuffer() throws IOException {
char[] spaces = new char[FLUSH_SPACES_LENGTH];
Arrays.fill(spaces, ' ');
writeMessage(new String(spaces));
}
/**
* Transform the supplied control message to length encoded values and
* write to the OutputStream.
*
* @param message The control message to write.
*/
protected void writeMessage(String message) throws IOException {
lengthEncodedWriter.writeNumFields(numberOfFields);
// Write blank values for all fields other than the control field
for (int i = 1; i < numberOfFields; ++i) {
lengthEncodedWriter.writeField("");
}
// The control field comes last
lengthEncodedWriter.writeField(message);
}
}

View File

@ -12,7 +12,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectStateP
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AutodetectControlMsgWriter;
import org.junit.Assert;
import org.junit.Before;
@ -103,7 +103,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
public void testFlush() throws IOException {
InputStream logStream = mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1);
ByteArrayOutputStream bos = new ByteArrayOutputStream(ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH + 1024);
ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, mock(InputStream.class), mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new AutodetectResultsParser(), mock(Runnable.class))) {
@ -113,21 +113,21 @@ public class NativeAutodetectProcessTests extends ESTestCase {
process.flushJob(params);
ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
assertThat(bb.remaining(), is(greaterThan(ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH)));
assertThat(bb.remaining(), is(greaterThan(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH)));
}
}
public void testWriteResetBucketsControlMessage() throws IOException {
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), Optional.empty());
testWriteMessage(p -> p.writeResetBucketsControlMessage(params), ControlMsgToProcessWriter.RESET_BUCKETS_MESSAGE_CODE);
testWriteMessage(p -> p.writeResetBucketsControlMessage(params), AutodetectControlMsgWriter.RESET_BUCKETS_MESSAGE_CODE);
}
public void testWriteUpdateConfigMessage() throws IOException {
testWriteMessage(p -> p.writeUpdateModelPlotMessage(new ModelPlotConfig()), ControlMsgToProcessWriter.UPDATE_MESSAGE_CODE);
testWriteMessage(p -> p.writeUpdateModelPlotMessage(new ModelPlotConfig()), AutodetectControlMsgWriter.UPDATE_MESSAGE_CODE);
}
public void testPersistJob() throws IOException {
testWriteMessage(p -> p.persistState(), ControlMsgToProcessWriter.BACKGROUND_PERSIST_MESSAGE_CODE);
testWriteMessage(p -> p.persistState(), AutodetectControlMsgWriter.BACKGROUND_PERSIST_MESSAGE_CODE);
}
public void testWriteMessage(CheckedConsumer<NativeAutodetectProcess> writeFunction, String expectedMessageCode) throws IOException {

View File

@ -35,7 +35,7 @@ import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verifyNoMoreInteractions;
public class ControlMsgToProcessWriterTests extends ESTestCase {
public class AutodetectControlMsgWriterTests extends ESTestCase {
private LengthEncodedWriter lengthEncodedWriter;
@Before
@ -44,7 +44,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteFlushControlMessage_GivenAdvanceTime() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4);
FlushJobParams flushJobParams = FlushJobParams.builder().advanceTime("1234567890").build();
writer.writeFlushControlMessage(flushJobParams);
@ -57,7 +57,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteFlushControlMessage_GivenSkipTime() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4);
FlushJobParams flushJobParams = FlushJobParams.builder().skipTime("1234567890").build();
writer.writeFlushControlMessage(flushJobParams);
@ -70,7 +70,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteFlushControlMessage_GivenSkipAndAdvanceTime() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4);
FlushJobParams flushJobParams = FlushJobParams.builder().skipTime("1000").advanceTime("2000").build();
writer.writeFlushControlMessage(flushJobParams);
@ -81,7 +81,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteFlushControlMessage_GivenCalcInterimResultsWithNoTimeParams() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4);
FlushJobParams flushJobParams = FlushJobParams.builder()
.calcInterim(true).build();
@ -95,7 +95,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteFlushControlMessage_GivenPlainFlush() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4);
FlushJobParams flushJobParams = FlushJobParams.builder().build();
writer.writeFlushControlMessage(flushJobParams);
@ -104,7 +104,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteFlushControlMessage_GivenCalcInterimResultsWithTimeParams() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4);
FlushJobParams flushJobParams = FlushJobParams.builder()
.calcInterim(true)
.forTimeRange(TimeRange.builder().startTime("120").endTime("180").build())
@ -120,7 +120,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteFlushControlMessage_GivenCalcInterimAndAdvanceTime() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4);
FlushJobParams flushJobParams = FlushJobParams.builder()
.calcInterim(true)
.forTimeRange(TimeRange.builder().startTime("50").endTime("100").build())
@ -140,7 +140,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteFlushMessage() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4);
long firstId = Long.parseLong(writer.writeFlushMessage());
Mockito.reset(lengthEncodedWriter);
@ -163,7 +163,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteResetBucketsMessage() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4);
writer.writeResetBucketsMessage(
new DataLoadParams(TimeRange.builder().startTime("0").endTime("600").build(), Optional.empty()));
@ -176,7 +176,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteUpdateModelPlotMessage() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4);
writer.writeUpdateModelPlotMessage(new ModelPlotConfig(true, "foo,bar"));
@ -188,7 +188,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteUpdateDetectorRulesMessage() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 4);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4);
DetectionRule rule1 = new DetectionRule.Builder(createRule(5)).build();
DetectionRule rule2 = new DetectionRule.Builder(createRule(5)).build();
@ -206,7 +206,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteUpdateFiltersMessage() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 2);
MlFilter filter1 = MlFilter.builder("filter_1").setItems("a").build();
MlFilter filter2 = MlFilter.builder("filter_2").setItems("b", "c").build();
@ -221,7 +221,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteUpdateScheduledEventsMessage() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 2);
ScheduledEvent.Builder event1 = new ScheduledEvent.Builder();
event1.calendarId("moon");
@ -255,7 +255,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteUpdateScheduledEventsMessage_GivenEmpty() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 2);
writer.writeUpdateScheduledEventsMessage(Collections.emptyList(), TimeValue.timeValueHours(1));
@ -267,7 +267,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
}
public void testWriteStartBackgroundPersistMessage() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 2);
writer.writeStartBackgroundPersistMessage();
InOrder inOrder = inOrder(lengthEncodedWriter);
@ -278,7 +278,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
inOrder.verify(lengthEncodedWriter).writeNumFields(2);
inOrder.verify(lengthEncodedWriter).writeField("");
StringBuilder spaces = new StringBuilder();
IntStream.rangeClosed(1, ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH).forEach(i -> spaces.append(' '));
IntStream.rangeClosed(1, AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH).forEach(i -> spaces.append(' '));
inOrder.verify(lengthEncodedWriter).writeField(spaces.toString());
inOrder.verify(lengthEncodedWriter).flush();