[ML] Persist realtime datafeed job after lookback (elastic/x-pack-elasticsearch#4146)

* Wire in new Persist job action

* Persist after loopback if realtime job

Original commit: elastic/x-pack-elasticsearch@c24101e9cf
This commit is contained in:
David Kyle 2018-03-21 16:31:59 +00:00 committed by GitHub
parent 97703bf0da
commit aa566a55e3
21 changed files with 476 additions and 40 deletions

View File

@ -69,6 +69,7 @@ import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction;
import org.elasticsearch.xpack.core.ml.action.PostDataAction;
import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
@ -251,6 +252,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
UpdateCalendarJobAction.INSTANCE,
GetCalendarEventsAction.INSTANCE,
PostCalendarEventsAction.INSTANCE,
PersistJobAction.INSTANCE,
// licensing
StartPersistentTaskAction.INSTANCE,
UpdatePersistentTaskStatusAction.INSTANCE,

View File

@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.Objects;
public class PersistJobAction extends Action<PersistJobAction.Request, PersistJobAction.Response, PersistJobAction.RequestBuilder> {
public static final PersistJobAction INSTANCE = new PersistJobAction();
public static final String NAME = "cluster:admin/xpack/ml/job/persist";
private PersistJobAction() {
super(NAME);
}
@Override
public PersistJobAction.RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends JobTaskRequest<PersistJobAction.Request> {
public Request() {
}
public Request(String jobId) {
super(jobId);
}
public boolean isBackGround() {
return true;
}
public boolean isForeground() {
return !isBackGround();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
// isBackground for fwc
in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
// isBackground for fwc
out.writeBoolean(true);
}
@Override
public int hashCode() {
return Objects.hash(jobId, isBackGround());
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
PersistJobAction.Request other = (PersistJobAction.Request) obj;
return Objects.equals(jobId, other.jobId) && this.isBackGround() == other.isBackGround();
}
}
public static class Response extends BaseTasksResponse implements Writeable {
boolean persisted;
public Response() {
super(null, null);
}
public Response(boolean persisted) {
super(null, null);
this.persisted = persisted;
}
public boolean isPersisted() {
return persisted;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
persisted = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(persisted);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response that = (Response) o;
return this.persisted == that.persisted;
}
@Override
public int hashCode() {
return Objects.hash(persisted);
}
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
RequestBuilder(ElasticsearchClient client, PersistJobAction action) {
super(client, action, new PersistJobAction.Request());
}
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
public class PersistJobActionRequestTests extends AbstractStreamableTestCase<PersistJobAction.Request> {
@Override
protected PersistJobAction.Request createBlankInstance() {
return new PersistJobAction.Request();
}
@Override
protected PersistJobAction.Request createTestInstance() {
return new PersistJobAction.Request(randomAlphaOfLength(10));
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
public class PersistJobActionResponseTests extends AbstractStreamableTestCase<PersistJobAction.Response> {
@Override
protected PersistJobAction.Response createBlankInstance() {
return new PersistJobAction.Response();
}
@Override
protected PersistJobAction.Response createTestInstance() {
return new PersistJobAction.Response(randomBoolean());
}
}

View File

@ -84,6 +84,7 @@ import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction;
import org.elasticsearch.xpack.core.ml.action.PostDataAction;
import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
@ -134,6 +135,7 @@ import org.elasticsearch.xpack.ml.action.TransportGetRecordsAction;
import org.elasticsearch.xpack.ml.action.TransportIsolateDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportKillProcessAction;
import org.elasticsearch.xpack.ml.action.TransportOpenJobAction;
import org.elasticsearch.xpack.ml.action.TransportPersistJobAction;
import org.elasticsearch.xpack.ml.action.TransportPostCalendarEventsAction;
import org.elasticsearch.xpack.ml.action.TransportPostDataAction;
import org.elasticsearch.xpack.ml.action.TransportPreviewDatafeedAction;
@ -544,7 +546,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
new ActionHandler<>(DeleteCalendarEventAction.INSTANCE, TransportDeleteCalendarEventAction.class),
new ActionHandler<>(UpdateCalendarJobAction.INSTANCE, TransportUpdateCalendarJobAction.class),
new ActionHandler<>(GetCalendarEventsAction.INSTANCE, TransportGetCalendarEventsAction.class),
new ActionHandler<>(PostCalendarEventsAction.INSTANCE, TransportPostCalendarEventsAction.class)
new ActionHandler<>(PostCalendarEventsAction.INSTANCE, TransportPostCalendarEventsAction.class),
new ActionHandler<>(PersistJobAction.INSTANCE, TransportPersistJobAction.class)
);
}
@Override

View File

@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import java.io.IOException;
public class TransportPersistJobAction extends TransportJobTaskAction<PersistJobAction.Request, PersistJobAction.Response> {
@Inject
public TransportPersistJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AutodetectProcessManager processManager) {
super(settings, PersistJobAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
PersistJobAction.Request::new, PersistJobAction.Response::new, ThreadPool.Names.SAME, processManager);
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
}
@Override
protected PersistJobAction.Response readTaskResponse(StreamInput in) throws IOException {
PersistJobAction.Response response = new PersistJobAction.Response();
response.readFrom(in);
return response;
}
@Override
protected void taskOperation(PersistJobAction.Request request, TransportOpenJobAction.JobTask task,
ActionListener<PersistJobAction.Response> listener) {
processManager.persistJob(task, e -> {
if (e == null) {
listener.onResponse(new PersistJobAction.Response(true));
} else {
listener.onFailure(e);
}
});
}
@Override
protected void doExecute(Task task, PersistJobAction.Request request, ActionListener<PersistJobAction.Response> listener) {
// TODO Remove this overridden method in 7.0.0
DiscoveryNodes nodes = clusterService.state().nodes();
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(request.getJobId(), tasks);
if (jobTask == null || jobTask.getExecutorNode() == null) {
logger.debug("[{}] Cannot persist the job because the job is not open", request.getJobId());
listener.onResponse(new PersistJobAction.Response(false));
return;
}
DiscoveryNode executorNode = nodes.get(jobTask.getExecutorNode());
if (executorNode == null) {
listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot persist job [{}] as" +
"executor node [{}] cannot be found", request.getJobId(), jobTask.getExecutorNode()));
return;
}
Version nodeVersion = executorNode.getVersion();
if (nodeVersion.before(Version.V_6_3_0)) {
listener.onFailure(
new ElasticsearchException("Cannot persist job [" + request.getJobId() + "] on node with version " + nodeVersion));
return;
}
super.doExecute(task, request, listener);
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.action.PostDataAction;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
@ -106,6 +107,9 @@ class DatafeedJob {
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
request.setCalcInterim(true);
run(lookbackStartTimeMs, lookbackEnd, request);
if (shouldPersistAfterLookback(isLookbackOnly)) {
sendPersistRequest();
}
if (isRunning() && !isIsolated) {
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_LOOKBACK_COMPLETED));
@ -312,6 +316,21 @@ class DatafeedJob {
}
}
private boolean shouldPersistAfterLookback(boolean isLookbackOnly) {
return isLookbackOnly == false && isIsolated == false && isRunning();
}
private void sendPersistRequest() {
try {
LOGGER.trace("[" + jobId + "] Sending persist request");
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
client.execute(PersistJobAction.INSTANCE, new PersistJobAction.Request(jobId));
}
} catch (Exception e) {
LOGGER.debug("[" + jobId + "] error while persisting job", e);
}
}
/**
* Visible for testing
*/

View File

@ -166,7 +166,7 @@ public class DatafeedManager extends AbstractComponent {
}
@Override
protected void doRun() throws Exception {
protected void doRun() {
Long next = null;
try {
next = holder.executeLoopBack(startTime, endTime);
@ -224,7 +224,7 @@ public class DatafeedManager extends AbstractComponent {
}
@Override
protected void doRun() throws Exception {
protected void doRun() {
long nextDelayInMsSinceEpoch;
try {
nextDelayInMsSinceEpoch = holder.executeRealTime();

View File

@ -264,6 +264,13 @@ public class AutodetectCommunicator implements Closeable {
}, forecastConsumer);
}
public void persistJob(BiConsumer<Void, Exception> handler) {
submitOperation(() -> {
autodetectProcess.persistJob();
return null;
}, handler);
}
@Nullable
FlushAcknowledgement waitFlushToCompletion(String flushId) {
LOGGER.debug("[{}] waiting for flush", job.getId());

View File

@ -115,6 +115,12 @@ public interface AutodetectProcess extends Closeable {
*/
void forecastJob(ForecastParams params) throws IOException;
/**
* Ask the job to start persisting model state in the background
* @throws IOException
*/
void persistJob() throws IOException;
/**
* Flush the output data stream
*/

View File

@ -176,6 +176,16 @@ public class AutodetectProcessManager extends AbstractComponent {
}
}
/**
* Initiate background persistence of the job
* @param jobTask The job task
* @param handler Listener
*/
public void persistJob(JobTask jobTask, Consumer<Exception> handler) {
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
communicator.persistJob((aVoid, e) -> handler.accept(e));
}
/**
* Passes data to the native process.
* This is a blocking call that won't return until all the data has been

View File

@ -96,7 +96,11 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
}
@Override
public void flushStream() throws IOException {
public void persistJob() {
}
@Override
public void flushStream() {
}
@Override

View File

@ -187,6 +187,12 @@ class NativeAutodetectProcess implements AutodetectProcess {
writer.writeForecastMessage(params);
}
@Override
public void persistJob() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields);
writer.writeStartBackgroundPersistMessage();
}
@Override
public void flushStream() throws IOException {
recordWriter.flush();

View File

@ -39,40 +39,45 @@ public class ControlMsgToProcessWriter {
public static final int FLUSH_SPACES_LENGTH = 8192;
/**
* This must match the code defined in the api::CAnomalyDetector C++ class.
* This must match the code defined in the api::CAnomalyJob C++ class.
*/
private static final String FLUSH_MESSAGE_CODE = "f";
/**
* This must match the code defined in the api::CAnomalyDetector C++ class.
* This must match the code defined in the api::CAnomalyJob C++ class.
*/
private static final String FORECAST_MESSAGE_CODE = "p";
/**
* This must match the code defined in the api::CAnomalyDetector C++ class.
* This must match the code defined in the api::CAnomalyJob C++ class.
*/
private static final String INTERIM_MESSAGE_CODE = "i";
/**
* This must match the code defined in the api::CAnomalyDetector C++ class.
* This must match the code defined in the api::CAnomalyJob C++ class.
*/
public static final String RESET_BUCKETS_MESSAGE_CODE = "r";
/**
* This must match the code defined in the api::CAnomalyDetector C++ class.
* This must match the code defined in the api::CAnomalyJob C++ class.
*/
private static final String ADVANCE_TIME_MESSAGE_CODE = "t";
/**
* This must match the code defined in the api::CAnomalyDetector C++ class.
* This must match the code defined in the api::CAnomalyJob C++ class.
*/
private static final String SKIP_TIME_MESSAGE_CODE = "s";
/**
* This must match the code defined in the api::CAnomalyDetector C++ class.
* This must match the code defined in the api::CAnomalyJob C++ class.
*/
public static final String UPDATE_MESSAGE_CODE = "u";
/**
* This must match the code defined in the api::CAnomalyJob C++ class.
*/
public static final String BACKGROUND_PERSIST_MESSAGE_CODE = "w";
/**
* An number to uniquely identify each flush so that subsequent code can
* wait for acknowledgement of the correct flush.
@ -233,6 +238,12 @@ public class ControlMsgToProcessWriter {
writeMessage(stringBuilder.toString());
}
public void writeStartBackgroundPersistMessage() throws IOException {
writeMessage(BACKGROUND_PERSIST_MESSAGE_CODE);
fillCommandBuffer();
lengthEncodedWriter.flush();
}
/**
* Transform the supplied control message to length encoded values and
* write to the OutputStream.

View File

@ -15,6 +15,7 @@ import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.action.PostDataAction;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
@ -49,6 +50,8 @@ import static org.mockito.Mockito.when;
public class DatafeedJobTests extends ESTestCase {
private static final String jobId = "_job_id";
private Auditor auditor;
private DataExtractorFactory dataExtractorFactory;
private DataExtractor dataExtractor;
@ -83,10 +86,10 @@ public class DatafeedJobTests extends ESTestCase {
byte[] contentBytes = "content".getBytes(StandardCharsets.UTF_8);
InputStream inputStream = new ByteArrayInputStream(contentBytes);
when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0),
DataCounts dataCounts = new DataCounts(jobId, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0),
new Date(0), new Date(0), new Date(0));
PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id");
PostDataAction.Request expectedRequest = new PostDataAction.Request(jobId);
expectedRequest.setDataDescription(dataDescription.build());
expectedRequest.setContent(new BytesArray(contentBytes), xContentType);
when(client.execute(same(PostDataAction.INSTANCE), eq(expectedRequest))).thenReturn(postDataFuture);
@ -101,9 +104,10 @@ public class DatafeedJobTests extends ESTestCase {
assertNull(datafeedJob.runLookBack(0L, 1000L));
verify(dataExtractorFactory).newExtractor(0L, 1000L);
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId);
flushRequest.setCalcInterim(true);
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
}
public void testSetIsolated() throws Exception {
@ -114,6 +118,7 @@ public class DatafeedJobTests extends ESTestCase {
verify(dataExtractorFactory).newExtractor(0L, 1500L);
verify(client, never()).execute(same(FlushJobAction.INSTANCE), any());
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
}
public void testLookBackRunWithNoEndTime() throws Exception {
@ -125,9 +130,10 @@ public class DatafeedJobTests extends ESTestCase {
assertEquals(2000 + frequencyMs + queryDelayMs + 100, next);
verify(dataExtractorFactory).newExtractor(0L, 1500L);
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId);
flushRequest.setCalcInterim(true);
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
verify(client).execute(same(PersistJobAction.INSTANCE), eq(new PersistJobAction.Request(jobId)));
}
public void testLookBackRunWithStartTimeEarlierThanResumePoint() throws Exception {
@ -148,14 +154,15 @@ public class DatafeedJobTests extends ESTestCase {
verify(dataExtractorFactory).newExtractor(5000 + 1L, currentTime - queryDelayMs);
assertThat(flushJobRequests.getAllValues().size(), equalTo(1));
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId);
flushRequest.setCalcInterim(true);
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
verify(client).execute(same(PersistJobAction.INSTANCE), eq(new PersistJobAction.Request(jobId)));
}
public void testContinueFromNow() throws Exception {
// We need to return empty counts so that the lookback doesn't update the last end time
when(postDataFuture.actionGet()).thenReturn(new PostDataAction.Response(new DataCounts("_job_id")));
when(postDataFuture.actionGet()).thenReturn(new PostDataAction.Response(new DataCounts(jobId)));
currentTime = 9999L;
long latestFinalBucketEndTimeMs = 5000;
@ -194,10 +201,11 @@ public class DatafeedJobTests extends ESTestCase {
assertEquals(currentTime + frequencyMs + queryDelayMs + 100, next);
verify(dataExtractorFactory).newExtractor(1000L + 1L, currentTime - queryDelayMs);
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId);
flushRequest.setCalcInterim(true);
flushRequest.setAdvanceTime("59000");
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
}
public void testEmptyDataCountGivenlookback() throws Exception {
@ -206,6 +214,7 @@ public class DatafeedJobTests extends ESTestCase {
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1);
expectThrows(DatafeedJob.EmptyDataCountException.class, () -> datafeedJob.runLookBack(0L, 1000L));
verify(client, times(1)).execute(same(FlushJobAction.INSTANCE), any());
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
assertThat(flushJobRequests.getValue().getAdvanceTime(), is(nullValue()));
}
@ -227,6 +236,7 @@ public class DatafeedJobTests extends ESTestCase {
assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue());
assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue());
assertThat(flushJobRequests.getAllValues().isEmpty(), is(true));
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
}
public void testPostAnalysisProblem() throws Exception {
@ -253,6 +263,7 @@ public class DatafeedJobTests extends ESTestCase {
assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue());
assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue());
verify(client, times(1)).execute(same(FlushJobAction.INSTANCE), any());
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
}
public void testPostAnalysisProblemIsConflict() throws Exception {
@ -279,6 +290,7 @@ public class DatafeedJobTests extends ESTestCase {
assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue());
assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue());
verify(client, times(1)).execute(same(FlushJobAction.INSTANCE), any());
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
}
public void testFlushAnalysisProblem() throws Exception {
@ -308,7 +320,7 @@ public class DatafeedJobTests extends ESTestCase {
private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs,
long latestRecordTimeMs) {
Supplier<Long> currentTimeSupplier = () -> currentTime;
return new DatafeedJob("_job_id", dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor,
return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor,
currentTimeSupplier, latestFinalBucketEndTimeMs, latestRecordTimeMs);
}
}

View File

@ -119,24 +119,19 @@ public class NativeAutodetectProcessTests extends ESTestCase {
}
public void testWriteResetBucketsControlMessage() throws IOException {
InputStream logStream = mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1);
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, mock(InputStream.class), mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) {
process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), Optional.empty());
process.writeResetBucketsControlMessage(params);
process.flushStream();
String message = new String(bos.toByteArray(), StandardCharsets.UTF_8);
assertTrue(message.contains(ControlMsgToProcessWriter.RESET_BUCKETS_MESSAGE_CODE));
}
testWriteMessage(p -> p.writeResetBucketsControlMessage(params), ControlMsgToProcessWriter.RESET_BUCKETS_MESSAGE_CODE);
}
public void testWriteUpdateConfigMessage() throws IOException {
testWriteMessage(p -> p.writeUpdateModelPlotMessage(new ModelPlotConfig()), ControlMsgToProcessWriter.UPDATE_MESSAGE_CODE);
}
public void testPersistJob() throws IOException {
testWriteMessage(p -> p.persistJob(), ControlMsgToProcessWriter.BACKGROUND_PERSIST_MESSAGE_CODE);
}
public void testWriteMessage(CheckedConsumer<NativeAutodetectProcess> writeFunction, String expectedMessageCode) throws IOException {
InputStream logStream = mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1);
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
@ -145,11 +140,17 @@ public class NativeAutodetectProcessTests extends ESTestCase {
new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) {
process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
writeFunction.accept(process);
process.writeUpdateModelPlotMessage(new ModelPlotConfig());
process.flushStream();
String message = new String(bos.toByteArray(), StandardCharsets.UTF_8);
assertTrue(message.contains(ControlMsgToProcessWriter.UPDATE_MESSAGE_CODE));
assertTrue(message.contains(expectedMessageCode));
}
}
@FunctionalInterface
private interface CheckedConsumer<T> {
void accept(T t) throws IOException;
}
}

View File

@ -269,6 +269,25 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
verifyNoMoreInteractions(lengthEncodedWriter);
}
public void testWriteStartBackgroundPersistMessage() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
writer.writeStartBackgroundPersistMessage();
InOrder inOrder = inOrder(lengthEncodedWriter);
inOrder.verify(lengthEncodedWriter).writeNumFields(2);
inOrder.verify(lengthEncodedWriter).writeField("");
inOrder.verify(lengthEncodedWriter).writeField("w");
inOrder.verify(lengthEncodedWriter).writeNumFields(2);
inOrder.verify(lengthEncodedWriter).writeField("");
StringBuilder spaces = new StringBuilder();
IntStream.rangeClosed(1, ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH).forEach(i -> spaces.append(' '));
inOrder.verify(lengthEncodedWriter).writeField(spaces.toString());
inOrder.verify(lengthEncodedWriter).flush();
verifyNoMoreInteractions(lengthEncodedWriter);
}
private static List<RuleCondition> createRule(String value) {
Condition condition = new Condition(Operator.GT, value);
return Collections.singletonList(RuleCondition.createNumerical(RuleConditionType.NUMERICAL_ACTUAL, null, null, condition));

View File

@ -15,13 +15,8 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeSta
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.junit.After;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;

View File

@ -615,6 +615,19 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
}
});
// Model state should be persisted at the end of lookback
// test a model snapshot is present
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/model_snapshots");
String responseAsString = responseEntityToString(getJobResponse);
assertThat(responseAsString, containsString("\"count\":1"));
} catch (Exception e1) {
throw new RuntimeException(e1);
}
});
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId));
response = e.getResponse();

View File

@ -51,6 +51,7 @@ import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction;
import org.elasticsearch.xpack.core.ml.action.PostDataAction;
import org.elasticsearch.xpack.core.ml.action.PutCalendarAction;
@ -435,6 +436,11 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase {
return client().execute(PostCalendarEventsAction.INSTANCE, request).actionGet();
}
protected PersistJobAction.Response persistJob(String jobId) {
PersistJobAction.Request request = new PersistJobAction.Request(jobId);
return client().execute(PersistJobAction.INSTANCE, request).actionGet();
}
@Override
protected void ensureClusterStateConsistency() throws IOException {
if (cluster() != null && cluster().size() > 0) {

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.junit.After;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class PersistJobIT extends MlNativeAutodetectIntegTestCase {
@After
public void cleanUpJobs() {
cleanUp();
}
public void testPersistJob() throws Exception {
String jobId = "persist-job-test";
runJob(jobId);
PersistJobAction.Response r = persistJob(jobId);
assertTrue(r.isPersisted());
// Persisting the job will create a model snapshot
assertBusy(() -> {
List<ModelSnapshot> snapshots = getModelSnapshots(jobId);
assertFalse(snapshots.isEmpty());
});
}
private void runJob(String jobId) throws Exception {
TimeValue bucketSpan = TimeValue.timeValueMinutes(5);
Detector.Builder detector = new Detector.Builder("count", null);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
analysisConfig.setBucketSpan(bucketSpan);
Job.Builder job = new Job.Builder(jobId);
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(new DataDescription.Builder());
registerJob(job);
putJob(job);
openJob(job.getId());
List<String> data = generateData(System.currentTimeMillis(), bucketSpan, 10, bucketIndex -> randomIntBetween(10, 20));
postData(job.getId(), data.stream().collect(Collectors.joining()));
}
}