diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 03e96e733a2..4853588bd3e 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -69,6 +69,7 @@ import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction; import org.elasticsearch.xpack.core.ml.action.KillProcessAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.PersistJobAction; import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction; import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction; @@ -251,6 +252,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl UpdateCalendarJobAction.INSTANCE, GetCalendarEventsAction.INSTANCE, PostCalendarEventsAction.INSTANCE, + PersistJobAction.INSTANCE, // licensing StartPersistentTaskAction.INSTANCE, UpdatePersistentTaskStatusAction.INSTANCE, diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PersistJobAction.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PersistJobAction.java new file mode 100644 index 00000000000..71f65051464 --- /dev/null +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PersistJobAction.java @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Objects; + +public class PersistJobAction extends Action { + + public static final PersistJobAction INSTANCE = new PersistJobAction(); + public static final String NAME = "cluster:admin/xpack/ml/job/persist"; + + private PersistJobAction() { + super(NAME); + } + + @Override + public PersistJobAction.RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends JobTaskRequest { + + public Request() { + } + + public Request(String jobId) { + super(jobId); + } + + public boolean isBackGround() { + return true; + } + + public boolean isForeground() { + return !isBackGround(); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + // isBackground for fwc + in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + // isBackground for fwc + out.writeBoolean(true); + } + + @Override + public int hashCode() { + return Objects.hash(jobId, isBackGround()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + PersistJobAction.Request other = (PersistJobAction.Request) obj; + return Objects.equals(jobId, other.jobId) && this.isBackGround() == other.isBackGround(); + } + } + + public static class Response extends BaseTasksResponse implements Writeable { + + boolean persisted; + + public Response() { + super(null, null); + } + + public Response(boolean persisted) { + super(null, null); + this.persisted = persisted; + } + + public boolean isPersisted() { + return persisted; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + persisted = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(persisted); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response that = (Response) o; + return this.persisted == that.persisted; + } + + @Override + public int hashCode() { + return Objects.hash(persisted); + } + } + + static class RequestBuilder extends ActionRequestBuilder { + RequestBuilder(ElasticsearchClient client, PersistJobAction action) { + super(client, action, new PersistJobAction.Request()); + } + } +} diff --git a/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PersistJobActionRequestTests.java b/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PersistJobActionRequestTests.java new file mode 100644 index 00000000000..cf210403dd6 --- /dev/null +++ b/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PersistJobActionRequestTests.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.test.AbstractStreamableTestCase; + +public class PersistJobActionRequestTests extends AbstractStreamableTestCase { + @Override + protected PersistJobAction.Request createBlankInstance() { + return new PersistJobAction.Request(); + } + + @Override + protected PersistJobAction.Request createTestInstance() { + return new PersistJobAction.Request(randomAlphaOfLength(10)); + } +} diff --git a/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PersistJobActionResponseTests.java b/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PersistJobActionResponseTests.java new file mode 100644 index 00000000000..746e40445af --- /dev/null +++ b/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PersistJobActionResponseTests.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.test.AbstractStreamableTestCase; + +public class PersistJobActionResponseTests extends AbstractStreamableTestCase { + @Override + protected PersistJobAction.Response createBlankInstance() { + return new PersistJobAction.Response(); + } + + @Override + protected PersistJobAction.Response createTestInstance() { + return new PersistJobAction.Response(randomBoolean()); + } +} diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 5d61816317a..4af6d4f530c 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -84,6 +84,7 @@ import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction; import org.elasticsearch.xpack.core.ml.action.KillProcessAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.PersistJobAction; import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction; import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction; @@ -134,6 +135,7 @@ import org.elasticsearch.xpack.ml.action.TransportGetRecordsAction; import org.elasticsearch.xpack.ml.action.TransportIsolateDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportKillProcessAction; import org.elasticsearch.xpack.ml.action.TransportOpenJobAction; +import org.elasticsearch.xpack.ml.action.TransportPersistJobAction; import org.elasticsearch.xpack.ml.action.TransportPostCalendarEventsAction; import org.elasticsearch.xpack.ml.action.TransportPostDataAction; import org.elasticsearch.xpack.ml.action.TransportPreviewDatafeedAction; @@ -544,7 +546,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu new ActionHandler<>(DeleteCalendarEventAction.INSTANCE, TransportDeleteCalendarEventAction.class), new ActionHandler<>(UpdateCalendarJobAction.INSTANCE, TransportUpdateCalendarJobAction.class), new ActionHandler<>(GetCalendarEventsAction.INSTANCE, TransportGetCalendarEventsAction.class), - new ActionHandler<>(PostCalendarEventsAction.INSTANCE, TransportPostCalendarEventsAction.class) + new ActionHandler<>(PostCalendarEventsAction.INSTANCE, TransportPostCalendarEventsAction.class), + new ActionHandler<>(PersistJobAction.INSTANCE, TransportPersistJobAction.class) ); } @Override diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPersistJobAction.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPersistJobAction.java new file mode 100644 index 00000000000..113a8da7be3 --- /dev/null +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPersistJobAction.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.PersistJobAction; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; + +import java.io.IOException; + +public class TransportPersistJobAction extends TransportJobTaskAction { + + @Inject + public TransportPersistJobAction(Settings settings, TransportService transportService, ThreadPool threadPool, + ClusterService clusterService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + AutodetectProcessManager processManager) { + super(settings, PersistJobAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, + PersistJobAction.Request::new, PersistJobAction.Response::new, ThreadPool.Names.SAME, processManager); + // ThreadPool.Names.SAME, because operations is executed by autodetect worker thread + } + + @Override + protected PersistJobAction.Response readTaskResponse(StreamInput in) throws IOException { + PersistJobAction.Response response = new PersistJobAction.Response(); + response.readFrom(in); + return response; + } + + @Override + protected void taskOperation(PersistJobAction.Request request, TransportOpenJobAction.JobTask task, + ActionListener listener) { + + processManager.persistJob(task, e -> { + if (e == null) { + listener.onResponse(new PersistJobAction.Response(true)); + } else { + listener.onFailure(e); + } + }); + } + + @Override + protected void doExecute(Task task, PersistJobAction.Request request, ActionListener listener) { + // TODO Remove this overridden method in 7.0.0 + DiscoveryNodes nodes = clusterService.state().nodes(); + PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData.PersistentTask jobTask = MlMetadata.getJobTask(request.getJobId(), tasks); + if (jobTask == null || jobTask.getExecutorNode() == null) { + logger.debug("[{}] Cannot persist the job because the job is not open", request.getJobId()); + listener.onResponse(new PersistJobAction.Response(false)); + return; + } + + DiscoveryNode executorNode = nodes.get(jobTask.getExecutorNode()); + if (executorNode == null) { + listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot persist job [{}] as" + + "executor node [{}] cannot be found", request.getJobId(), jobTask.getExecutorNode())); + return; + } + + Version nodeVersion = executorNode.getVersion(); + if (nodeVersion.before(Version.V_6_3_0)) { + listener.onFailure( + new ElasticsearchException("Cannot persist job [" + request.getJobId() + "] on node with version " + nodeVersion)); + return; + } + + super.doExecute(task, request, listener); + } +} diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index ef52adedd9f..883a22124e3 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.ml.action.FlushJobAction; +import org.elasticsearch.xpack.core.ml.action.PersistJobAction; import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; @@ -106,6 +107,9 @@ class DatafeedJob { FlushJobAction.Request request = new FlushJobAction.Request(jobId); request.setCalcInterim(true); run(lookbackStartTimeMs, lookbackEnd, request); + if (shouldPersistAfterLookback(isLookbackOnly)) { + sendPersistRequest(); + } if (isRunning() && !isIsolated) { auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_LOOKBACK_COMPLETED)); @@ -312,6 +316,21 @@ class DatafeedJob { } } + private boolean shouldPersistAfterLookback(boolean isLookbackOnly) { + return isLookbackOnly == false && isIsolated == false && isRunning(); + } + + private void sendPersistRequest() { + try { + LOGGER.trace("[" + jobId + "] Sending persist request"); + try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { + client.execute(PersistJobAction.INSTANCE, new PersistJobAction.Request(jobId)); + } + } catch (Exception e) { + LOGGER.debug("[" + jobId + "] error while persisting job", e); + } + } + /** * Visible for testing */ diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 79082f69b69..5daf8ce2896 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -166,7 +166,7 @@ public class DatafeedManager extends AbstractComponent { } @Override - protected void doRun() throws Exception { + protected void doRun() { Long next = null; try { next = holder.executeLoopBack(startTime, endTime); @@ -224,7 +224,7 @@ public class DatafeedManager extends AbstractComponent { } @Override - protected void doRun() throws Exception { + protected void doRun() { long nextDelayInMsSinceEpoch; try { nextDelayInMsSinceEpoch = holder.executeRealTime(); diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index c1812ec6242..6c1a3368133 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -264,6 +264,13 @@ public class AutodetectCommunicator implements Closeable { }, forecastConsumer); } + public void persistJob(BiConsumer handler) { + submitOperation(() -> { + autodetectProcess.persistJob(); + return null; + }, handler); + } + @Nullable FlushAcknowledgement waitFlushToCompletion(String flushId) { LOGGER.debug("[{}] waiting for flush", job.getId()); diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java index 97238a43045..049880b1ac2 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java @@ -115,6 +115,12 @@ public interface AutodetectProcess extends Closeable { */ void forecastJob(ForecastParams params) throws IOException; + /** + * Ask the job to start persisting model state in the background + * @throws IOException + */ + void persistJob() throws IOException; + /** * Flush the output data stream */ diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 66b139f22b2..15b4778249b 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -176,6 +176,16 @@ public class AutodetectProcessManager extends AbstractComponent { } } + /** + * Initiate background persistence of the job + * @param jobTask The job task + * @param handler Listener + */ + public void persistJob(JobTask jobTask, Consumer handler) { + AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); + communicator.persistJob((aVoid, e) -> handler.accept(e)); + } + /** * Passes data to the native process. * This is a blocking call that won't return until all the data has been diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index b4dd5aa0cb5..8ff54e80785 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -96,7 +96,11 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { } @Override - public void flushStream() throws IOException { + public void persistJob() { + } + + @Override + public void flushStream() { } @Override diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index 193ecda2826..faae29fd1eb 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -187,6 +187,12 @@ class NativeAutodetectProcess implements AutodetectProcess { writer.writeForecastMessage(params); } + @Override + public void persistJob() throws IOException { + ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields); + writer.writeStartBackgroundPersistMessage(); + } + @Override public void flushStream() throws IOException { recordWriter.flush(); diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java index 2936394f07f..2a91797d28d 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java @@ -39,40 +39,45 @@ public class ControlMsgToProcessWriter { public static final int FLUSH_SPACES_LENGTH = 8192; /** - * This must match the code defined in the api::CAnomalyDetector C++ class. + * This must match the code defined in the api::CAnomalyJob C++ class. */ private static final String FLUSH_MESSAGE_CODE = "f"; /** - * This must match the code defined in the api::CAnomalyDetector C++ class. + * This must match the code defined in the api::CAnomalyJob C++ class. */ private static final String FORECAST_MESSAGE_CODE = "p"; /** - * This must match the code defined in the api::CAnomalyDetector C++ class. + * This must match the code defined in the api::CAnomalyJob C++ class. */ private static final String INTERIM_MESSAGE_CODE = "i"; /** - * This must match the code defined in the api::CAnomalyDetector C++ class. + * This must match the code defined in the api::CAnomalyJob C++ class. */ public static final String RESET_BUCKETS_MESSAGE_CODE = "r"; /** - * This must match the code defined in the api::CAnomalyDetector C++ class. + * This must match the code defined in the api::CAnomalyJob C++ class. */ private static final String ADVANCE_TIME_MESSAGE_CODE = "t"; /** - * This must match the code defined in the api::CAnomalyDetector C++ class. + * This must match the code defined in the api::CAnomalyJob C++ class. */ private static final String SKIP_TIME_MESSAGE_CODE = "s"; /** - * This must match the code defined in the api::CAnomalyDetector C++ class. + * This must match the code defined in the api::CAnomalyJob C++ class. */ public static final String UPDATE_MESSAGE_CODE = "u"; + /** + * This must match the code defined in the api::CAnomalyJob C++ class. + */ + public static final String BACKGROUND_PERSIST_MESSAGE_CODE = "w"; + /** * An number to uniquely identify each flush so that subsequent code can * wait for acknowledgement of the correct flush. @@ -233,6 +238,12 @@ public class ControlMsgToProcessWriter { writeMessage(stringBuilder.toString()); } + public void writeStartBackgroundPersistMessage() throws IOException { + writeMessage(BACKGROUND_PERSIST_MESSAGE_CODE); + fillCommandBuffer(); + lengthEncodedWriter.flush(); + } + /** * Transform the supplied control message to length encoded values and * write to the OutputStream. diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 2b4fb816e7d..75106b8ee0c 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.mock.orig.Mockito; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.FlushJobAction; +import org.elasticsearch.xpack.core.ml.action.PersistJobAction; import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; @@ -49,6 +50,8 @@ import static org.mockito.Mockito.when; public class DatafeedJobTests extends ESTestCase { + private static final String jobId = "_job_id"; + private Auditor auditor; private DataExtractorFactory dataExtractorFactory; private DataExtractor dataExtractor; @@ -83,10 +86,10 @@ public class DatafeedJobTests extends ESTestCase { byte[] contentBytes = "content".getBytes(StandardCharsets.UTF_8); InputStream inputStream = new ByteArrayInputStream(contentBytes); when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); - DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), + DataCounts dataCounts = new DataCounts(jobId, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0), new Date(0), new Date(0)); - PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id"); + PostDataAction.Request expectedRequest = new PostDataAction.Request(jobId); expectedRequest.setDataDescription(dataDescription.build()); expectedRequest.setContent(new BytesArray(contentBytes), xContentType); when(client.execute(same(PostDataAction.INSTANCE), eq(expectedRequest))).thenReturn(postDataFuture); @@ -101,9 +104,10 @@ public class DatafeedJobTests extends ESTestCase { assertNull(datafeedJob.runLookBack(0L, 1000L)); verify(dataExtractorFactory).newExtractor(0L, 1000L); - FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); + FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId); flushRequest.setCalcInterim(true); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); + verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); } public void testSetIsolated() throws Exception { @@ -114,6 +118,7 @@ public class DatafeedJobTests extends ESTestCase { verify(dataExtractorFactory).newExtractor(0L, 1500L); verify(client, never()).execute(same(FlushJobAction.INSTANCE), any()); + verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); } public void testLookBackRunWithNoEndTime() throws Exception { @@ -125,9 +130,10 @@ public class DatafeedJobTests extends ESTestCase { assertEquals(2000 + frequencyMs + queryDelayMs + 100, next); verify(dataExtractorFactory).newExtractor(0L, 1500L); - FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); + FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId); flushRequest.setCalcInterim(true); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); + verify(client).execute(same(PersistJobAction.INSTANCE), eq(new PersistJobAction.Request(jobId))); } public void testLookBackRunWithStartTimeEarlierThanResumePoint() throws Exception { @@ -148,14 +154,15 @@ public class DatafeedJobTests extends ESTestCase { verify(dataExtractorFactory).newExtractor(5000 + 1L, currentTime - queryDelayMs); assertThat(flushJobRequests.getAllValues().size(), equalTo(1)); - FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); + FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId); flushRequest.setCalcInterim(true); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); + verify(client).execute(same(PersistJobAction.INSTANCE), eq(new PersistJobAction.Request(jobId))); } public void testContinueFromNow() throws Exception { // We need to return empty counts so that the lookback doesn't update the last end time - when(postDataFuture.actionGet()).thenReturn(new PostDataAction.Response(new DataCounts("_job_id"))); + when(postDataFuture.actionGet()).thenReturn(new PostDataAction.Response(new DataCounts(jobId))); currentTime = 9999L; long latestFinalBucketEndTimeMs = 5000; @@ -194,10 +201,11 @@ public class DatafeedJobTests extends ESTestCase { assertEquals(currentTime + frequencyMs + queryDelayMs + 100, next); verify(dataExtractorFactory).newExtractor(1000L + 1L, currentTime - queryDelayMs); - FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); + FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId); flushRequest.setCalcInterim(true); flushRequest.setAdvanceTime("59000"); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); + verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); } public void testEmptyDataCountGivenlookback() throws Exception { @@ -206,6 +214,7 @@ public class DatafeedJobTests extends ESTestCase { DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1); expectThrows(DatafeedJob.EmptyDataCountException.class, () -> datafeedJob.runLookBack(0L, 1000L)); verify(client, times(1)).execute(same(FlushJobAction.INSTANCE), any()); + verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); assertThat(flushJobRequests.getValue().getAdvanceTime(), is(nullValue())); } @@ -227,6 +236,7 @@ public class DatafeedJobTests extends ESTestCase { assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue()); assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue()); assertThat(flushJobRequests.getAllValues().isEmpty(), is(true)); + verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); } public void testPostAnalysisProblem() throws Exception { @@ -253,6 +263,7 @@ public class DatafeedJobTests extends ESTestCase { assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue()); assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue()); verify(client, times(1)).execute(same(FlushJobAction.INSTANCE), any()); + verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); } public void testPostAnalysisProblemIsConflict() throws Exception { @@ -279,6 +290,7 @@ public class DatafeedJobTests extends ESTestCase { assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue()); assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue()); verify(client, times(1)).execute(same(FlushJobAction.INSTANCE), any()); + verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); } public void testFlushAnalysisProblem() throws Exception { @@ -308,7 +320,7 @@ public class DatafeedJobTests extends ESTestCase { private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { Supplier currentTimeSupplier = () -> currentTime; - return new DatafeedJob("_job_id", dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor, + return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor, currentTimeSupplier, latestFinalBucketEndTimeMs, latestRecordTimeMs); } } diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 2ab27eedbac..93e79c8b970 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -119,24 +119,19 @@ public class NativeAutodetectProcessTests extends ESTestCase { } public void testWriteResetBucketsControlMessage() throws IOException { - InputStream logStream = mock(InputStream.class); - when(logStream.read(new byte[1024])).thenReturn(-1); - ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); - try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, - bos, mock(InputStream.class), mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) { - process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); - - DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), Optional.empty()); - process.writeResetBucketsControlMessage(params); - process.flushStream(); - - String message = new String(bos.toByteArray(), StandardCharsets.UTF_8); - assertTrue(message.contains(ControlMsgToProcessWriter.RESET_BUCKETS_MESSAGE_CODE)); - } + DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), Optional.empty()); + testWriteMessage(p -> p.writeResetBucketsControlMessage(params), ControlMsgToProcessWriter.RESET_BUCKETS_MESSAGE_CODE); } public void testWriteUpdateConfigMessage() throws IOException { + testWriteMessage(p -> p.writeUpdateModelPlotMessage(new ModelPlotConfig()), ControlMsgToProcessWriter.UPDATE_MESSAGE_CODE); + } + + public void testPersistJob() throws IOException { + testWriteMessage(p -> p.persistJob(), ControlMsgToProcessWriter.BACKGROUND_PERSIST_MESSAGE_CODE); + } + + public void testWriteMessage(CheckedConsumer writeFunction, String expectedMessageCode) throws IOException { InputStream logStream = mock(InputStream.class); when(logStream.read(new byte[1024])).thenReturn(-1); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); @@ -145,11 +140,17 @@ public class NativeAutodetectProcessTests extends ESTestCase { new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) { process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); + writeFunction.accept(process); process.writeUpdateModelPlotMessage(new ModelPlotConfig()); process.flushStream(); String message = new String(bos.toByteArray(), StandardCharsets.UTF_8); - assertTrue(message.contains(ControlMsgToProcessWriter.UPDATE_MESSAGE_CODE)); + assertTrue(message.contains(expectedMessageCode)); } } + + @FunctionalInterface + private interface CheckedConsumer { + void accept(T t) throws IOException; + } } diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java index fd92df1f6ab..130476951ab 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java @@ -269,6 +269,25 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { verifyNoMoreInteractions(lengthEncodedWriter); } + public void testWriteStartBackgroundPersistMessage() throws IOException { + ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); + writer.writeStartBackgroundPersistMessage(); + + InOrder inOrder = inOrder(lengthEncodedWriter); + inOrder.verify(lengthEncodedWriter).writeNumFields(2); + inOrder.verify(lengthEncodedWriter).writeField(""); + inOrder.verify(lengthEncodedWriter).writeField("w"); + + inOrder.verify(lengthEncodedWriter).writeNumFields(2); + inOrder.verify(lengthEncodedWriter).writeField(""); + StringBuilder spaces = new StringBuilder(); + IntStream.rangeClosed(1, ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH).forEach(i -> spaces.append(' ')); + inOrder.verify(lengthEncodedWriter).writeField(spaces.toString()); + inOrder.verify(lengthEncodedWriter).flush(); + + verifyNoMoreInteractions(lengthEncodedWriter); + } + private static List createRule(String value) { Condition condition = new Condition(Operator.GT, value); return Collections.singletonList(RuleCondition.createNumerical(RuleConditionType.NUMERICAL_ACTUAL, null, null, condition)); diff --git a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java index 76ad45c78f2..ea80673b1d2 100644 --- a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java +++ b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java @@ -15,13 +15,8 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeSta import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.junit.After; -import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.function.Function; import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; diff --git a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java index cfd4594ffa5..6731e27aaac 100644 --- a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java +++ b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java @@ -615,6 +615,19 @@ public class DatafeedJobsRestIT extends ESRestTestCase { } }); + // Model state should be persisted at the end of lookback + // test a model snapshot is present + assertBusy(() -> { + try { + Response getJobResponse = client().performRequest("get", + MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/model_snapshots"); + String responseAsString = responseEntityToString(getJobResponse); + assertThat(responseAsString, containsString("\"count\":1")); + } catch (Exception e1) { + throw new RuntimeException(e1); + } + }); + ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId)); response = e.getResponse(); diff --git a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index 0dfcf02d04f..7d81096676c 100644 --- a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -51,6 +51,7 @@ import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.PersistJobAction; import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction; import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.action.PutCalendarAction; @@ -435,6 +436,11 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase { return client().execute(PostCalendarEventsAction.INSTANCE, request).actionGet(); } + protected PersistJobAction.Response persistJob(String jobId) { + PersistJobAction.Request request = new PersistJobAction.Request(jobId); + return client().execute(PersistJobAction.INSTANCE, request).actionGet(); + } + @Override protected void ensureClusterStateConsistency() throws IOException { if (cluster() != null && cluster().size() > 0) { diff --git a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/PersistJobIT.java b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/PersistJobIT.java new file mode 100644 index 00000000000..6f885744b21 --- /dev/null +++ b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/PersistJobIT.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.action.PersistJobAction; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.junit.After; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class PersistJobIT extends MlNativeAutodetectIntegTestCase { + + @After + public void cleanUpJobs() { + cleanUp(); + } + + public void testPersistJob() throws Exception { + String jobId = "persist-job-test"; + runJob(jobId); + + PersistJobAction.Response r = persistJob(jobId); + assertTrue(r.isPersisted()); + + // Persisting the job will create a model snapshot + assertBusy(() -> { + List snapshots = getModelSnapshots(jobId); + assertFalse(snapshots.isEmpty()); + }); + } + + private void runJob(String jobId) throws Exception { + TimeValue bucketSpan = TimeValue.timeValueMinutes(5); + Detector.Builder detector = new Detector.Builder("count", null); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); + analysisConfig.setBucketSpan(bucketSpan); + Job.Builder job = new Job.Builder(jobId); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(new DataDescription.Builder()); + registerJob(job); + putJob(job); + + openJob(job.getId()); + List data = generateData(System.currentTimeMillis(), bucketSpan, 10, bucketIndex -> randomIntBetween(10, 20)); + postData(job.getId(), data.stream().collect(Collectors.joining())); + } +}