From 02a5e153dc7cabb19663f0647b07bdf82d229ce4 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 18 Sep 2019 20:37:40 +0300 Subject: [PATCH] [7.x][ML] Parse and index data frame analytics state (#46804) (#46820) This commit reuses the same state processor that is used for autodetect to parse state output from data frame analytics jobs. We then index the state document into the state index. Backport of #46804 --- .../dataframe/analyses/DataFrameAnalysis.java | 5 ++++ .../dataframe/analyses/OutlierDetection.java | 5 ++++ .../ml/dataframe/analyses/Regression.java | 5 ++++ .../xpack/ml/integration/RegressionIT.java | 13 ++++++++++ .../xpack/ml/MachineLearning.java | 2 +- .../process/AnalyticsProcessConfig.java | 6 ++++- .../process/AnalyticsProcessFactory.java | 10 ++++--- .../process/AnalyticsProcessManager.java | 9 ++++--- .../MemoryUsageEstimationProcessManager.java | 3 ++- .../NativeAnalyticsProcessFactory.java | 26 +++++++++++++++---- ...veMemoryUsageEstimationProcessFactory.java | 9 ++++--- .../NativeAutodetectProcessFactory.java | 4 +-- .../IndexingStateProcessor.java} | 11 ++++---- .../AnalyticsResultProcessorTests.java | 2 +- ...oryUsageEstimationProcessManagerTests.java | 3 +-- .../NativeAutodetectProcessTests.java | 10 +++---- .../IndexingStateProcessorTests.java} | 8 +++--- 17 files changed, 91 insertions(+), 40 deletions(-) rename x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/{job/process/autodetect/output/AutodetectStateProcessor.java => process/IndexingStateProcessor.java} (91%) rename x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/{job/process/autodetect/output/AutodetectStateProcessorTests.java => process/IndexingStateProcessorTests.java} (95%) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/DataFrameAnalysis.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/DataFrameAnalysis.java index bc0e623cdeb..c1db48f0d97 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/DataFrameAnalysis.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/DataFrameAnalysis.java @@ -32,4 +32,9 @@ public interface DataFrameAnalysis extends ToXContentObject, NamedWriteable { * @return {@code true} if this analysis supports data frame rows with missing values */ boolean supportsMissingValues(); + + /** + * @return {@code true} if this analysis persists state that can later be used to restore from a given point + */ + boolean persistsState(); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/OutlierDetection.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/OutlierDetection.java index 35e3d234a7c..3ef5fa331bd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/OutlierDetection.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/OutlierDetection.java @@ -169,6 +169,11 @@ public class OutlierDetection implements DataFrameAnalysis { return false; } + @Override + public boolean persistsState() { + return false; + } + public enum Method { LOF, LDOF, DISTANCE_KTH_NN, DISTANCE_KNN; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java index 04a5801ffa2..fd7820217df 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java @@ -210,6 +210,11 @@ public class Regression implements DataFrameAnalysis { return true; } + @Override + public boolean persistsState() { + return true; + } + @Override public int hashCode() { return Objects.hash(dependentVariable, lambda, gamma, eta, maximumNumberTrees, featureBagFraction, predictionFieldName, diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index 057ccf9a103..4a9f682d2c2 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -11,10 +11,12 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.junit.After; import java.util.Arrays; @@ -109,6 +111,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { "Creating destination index [regression_single_numeric_feature_and_mixed_data_set_source_index_results]", "Finished reindexing to destination index [regression_single_numeric_feature_and_mixed_data_set_source_index_results]", "Finished analysis"); + assertModelStatePersisted(jobId); } public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Exception { @@ -175,6 +178,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { "Creating destination index [regression_only_training_data_and_training_percent_is_hundred_source_index_results]", "Finished reindexing to destination index [regression_only_training_data_and_training_percent_is_hundred_source_index_results]", "Finished analysis"); + assertModelStatePersisted(jobId); } public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception { @@ -251,5 +255,14 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { "Creating destination index [regression_only_training_data_and_training_percent_is_fifty_source_index_results]", "Finished reindexing to destination index [regression_only_training_data_and_training_percent_is_fifty_source_index_results]", "Finished analysis"); + assertModelStatePersisted(jobId); + } + + private void assertModelStatePersisted(String jobId) { + String docId = jobId + "_regression_state#1"; + SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) + .setQuery(QueryBuilders.idsQuery().addIds(docId)) + .get(); + assertThat(searchResponse.getHits().getHits().length, equalTo(1)); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 05eb543fe5f..bd43879792a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -514,7 +514,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu client, clusterService); normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController, clusterService); - analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController, clusterService); + analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, client, nativeController, clusterService); memoryEstimationProcessFactory = new NativeMemoryUsageEstimationProcessFactory(environment, nativeController, clusterService); } catch (IOException e) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessConfig.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessConfig.java index 5093404812a..ae05c61148f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessConfig.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessConfig.java @@ -16,6 +16,7 @@ import java.util.Set; public class AnalyticsProcessConfig implements ToXContentObject { + private static final String JOB_ID = "job_id"; private static final String ROWS = "rows"; private static final String COLS = "cols"; private static final String MEMORY_LIMIT = "memory_limit"; @@ -24,6 +25,7 @@ public class AnalyticsProcessConfig implements ToXContentObject { private static final String RESULTS_FIELD = "results_field"; private static final String CATEGORICAL_FIELDS = "categorical_fields"; + private final String jobId; private final long rows; private final int cols; private final ByteSizeValue memoryLimit; @@ -32,8 +34,9 @@ public class AnalyticsProcessConfig implements ToXContentObject { private final Set categoricalFields; private final DataFrameAnalysis analysis; - public AnalyticsProcessConfig(long rows, int cols, ByteSizeValue memoryLimit, int threads, String resultsField, + public AnalyticsProcessConfig(String jobId, long rows, int cols, ByteSizeValue memoryLimit, int threads, String resultsField, Set categoricalFields, DataFrameAnalysis analysis) { + this.jobId = Objects.requireNonNull(jobId); this.rows = rows; this.cols = cols; this.memoryLimit = Objects.requireNonNull(memoryLimit); @@ -54,6 +57,7 @@ public class AnalyticsProcessConfig implements ToXContentObject { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + builder.field(JOB_ID, jobId); builder.field(ROWS, rows); builder.field(COLS, cols); builder.field(MEMORY_LIMIT, memoryLimit.getBytes()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java index e72d1ad51a5..0be90b2e0ae 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.dataframe.process; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; + import java.util.concurrent.ExecutorService; import java.util.function.Consumer; @@ -13,12 +15,12 @@ public interface AnalyticsProcessFactory { /** * Create an implementation of {@link AnalyticsProcess} * - * @param jobId The job id + * @param config The data frame analytics config * @param analyticsProcessConfig The process configuration - * @param executorService Executor service used to start the async tasks a job needs to operate the analytical process - * @param onProcessCrash Callback to execute if the process stops unexpectedly + * @param executorService Executor service used to start the async tasks a job needs to operate the analytical process + * @param onProcessCrash Callback to execute if the process stops unexpectedly * @return The process */ - AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, + AnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig config, AnalyticsProcessConfig analyticsProcessConfig, ExecutorService executorService, Consumer onProcessCrash); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index 245afa1cbab..ccb65fa2156 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -172,9 +172,10 @@ public class AnalyticsProcessManager { process.writeRecord(headerRecord); } - private AnalyticsProcess createProcess(DataFrameAnalyticsTask task, AnalyticsProcessConfig analyticsProcessConfig) { + private AnalyticsProcess createProcess(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, + AnalyticsProcessConfig analyticsProcessConfig) { ExecutorService executorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME); - AnalyticsProcess process = processFactory.createAnalyticsProcess(task.getParams().getId(), analyticsProcessConfig, + AnalyticsProcess process = processFactory.createAnalyticsProcess(config, analyticsProcessConfig, executorService, onProcessCrash(task)); if (process.isProcessAlive() == false) { throw ExceptionsHelper.serverError("Failed to start data frame analytics process"); @@ -289,7 +290,7 @@ public class AnalyticsProcessManager { LOGGER.info("[{}] no data found to analyze. Will not start analytics native process.", config.getId()); return false; } - process = createProcess(task, analyticsProcessConfig); + process = createProcess(task, config, analyticsProcessConfig); DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client, dataExtractorFactory.newExtractor(true)); resultProcessor = new AnalyticsResultProcessor(id, dataFrameRowsJoiner, this::isProcessKilled, task.getProgressTracker()); @@ -299,7 +300,7 @@ public class AnalyticsProcessManager { private AnalyticsProcessConfig createProcessConfig(DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor) { DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary(); Set categoricalFields = dataExtractor.getCategoricalFields(); - AnalyticsProcessConfig processConfig = new AnalyticsProcessConfig(dataSummary.rows, dataSummary.cols, + AnalyticsProcessConfig processConfig = new AnalyticsProcessConfig(config.getId(), dataSummary.rows, dataSummary.cols, config.getModelMemoryLimit(), 1, config.getDest().getResultsField(), categoricalFields, config.getAnalysis()); return processConfig; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java index 6512dc075d7..ed42f86cc4b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java @@ -63,6 +63,7 @@ public class MemoryUsageEstimationProcessManager { } AnalyticsProcessConfig processConfig = new AnalyticsProcessConfig( + jobId, dataSummary.rows, dataSummary.cols, // For memory estimation the model memory limit here should be set high enough not to trigger an error when C++ code @@ -74,7 +75,7 @@ public class MemoryUsageEstimationProcessManager { config.getAnalysis()); AnalyticsProcess process = processFactory.createAnalyticsProcess( - jobId, + config, processConfig, executorServiceForProcess, // The handler passed here will never be called as AbstractNativeProcess.detectCrash method returns early when diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index 6aad810959f..9662298308e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -7,14 +7,17 @@ package org.elasticsearch.xpack.ml.dataframe.process; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult; +import org.elasticsearch.xpack.ml.process.IndexingStateProcessor; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; @@ -34,12 +37,14 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory onProcessCrash) { + public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig config, AnalyticsProcessConfig analyticsProcessConfig, + ExecutorService executorService, Consumer onProcessCrash) { + String jobId = config.getId(); List filesToDelete = new ArrayList<>(); ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId, - true, false, true, true, false, false); + true, false, true, true, false, config.getAnalysis().persistsState()); // The extra 2 are for the checksum and the control field int numberOfFields = analyticsProcessConfig.cols() + 2; @@ -67,7 +73,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory filesToDelete, ProcessPipes processPipes) { AnalyticsBuilder analyticsBuilder = diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java index 3c573701f36..f635e43a63b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult; @@ -52,18 +53,18 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce @Override public NativeMemoryUsageEstimationProcess createAnalyticsProcess( - String jobId, + DataFrameAnalyticsConfig config, AnalyticsProcessConfig analyticsProcessConfig, ExecutorService executorService, Consumer onProcessCrash) { List filesToDelete = new ArrayList<>(); ProcessPipes processPipes = new ProcessPipes( - env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId, true, false, false, true, false, false); + env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, config.getId(), true, false, false, true, false, false); - createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes); + createNativeProcess(config.getId(), analyticsProcessConfig, filesToDelete, processPipes); NativeMemoryUsageEstimationProcess process = new NativeMemoryUsageEstimationProcess( - jobId, + config.getId(), processPipes.getLogStream().get(), // Memory estimation process does not use the input pipe, hence null. null, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index a683f856c8c..26f9353639c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -17,7 +17,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectStateProcessor; +import org.elasticsearch.xpack.ml.process.IndexingStateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.process.NativeController; @@ -77,7 +77,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory // The extra 1 is the control field int numberOfFields = job.allInputFields().size() + (includeTokensField ? 1 : 0) + 1; - AutodetectStateProcessor stateProcessor = new AutodetectStateProcessor(client, job.getId()); + IndexingStateProcessor stateProcessor = new IndexingStateProcessor(client, job.getId()); ProcessResultsParser resultsParser = new ProcessResultsParser<>(AutodetectResult.PARSER); NativeAutodetectProcess autodetect = new NativeAutodetectProcess( job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java similarity index 91% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessor.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java index 1a418bfb2a1..9bfd22500e0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process.autodetect.output; +package org.elasticsearch.xpack.ml.process; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -15,7 +15,6 @@ import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.ml.process.StateProcessor; import java.io.IOException; import java.io.InputStream; @@ -25,18 +24,18 @@ import java.util.List; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; /** - * Reads the autodetect state and persists via a bulk request + * Reads state documents of a stream, splits them and persists to an index via a bulk request */ -public class AutodetectStateProcessor implements StateProcessor { +public class IndexingStateProcessor implements StateProcessor { - private static final Logger LOGGER = LogManager.getLogger(AutodetectStateProcessor.class); + private static final Logger LOGGER = LogManager.getLogger(IndexingStateProcessor.class); private static final int READ_BUF_SIZE = 8192; private final Client client; private final String jobId; - public AutodetectStateProcessor(Client client, String jobId) { + public IndexingStateProcessor(Client client, String jobId) { this.client = client; this.jobId = jobId; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java index 7f26293cabf..60bfd0eb3e3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java @@ -88,7 +88,7 @@ public class AnalyticsResultProcessorTests extends ESTestCase { private void givenDataFrameRows(int rows) { AnalyticsProcessConfig config = new AnalyticsProcessConfig( - rows, 1, ByteSizeValue.ZERO, 1, "ml", Collections.emptySet(), mock(DataFrameAnalysis.class)); + "job_id", rows, 1, ByteSizeValue.ZERO, 1, "ml", Collections.emptySet(), mock(DataFrameAnalysis.class)); when(process.getConfig()).thenReturn(config); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java index 9790e0618da..5be7a03b851 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java @@ -29,7 +29,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -68,7 +67,7 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase { process = mock(AnalyticsProcess.class); when(process.readAnalyticsResults()).thenReturn(Arrays.asList(PROCESS_RESULT).iterator()); processFactory = mock(AnalyticsProcessFactory.class); - when(processFactory.createAnalyticsProcess(anyString(), any(), any(), any())).thenReturn(process); + when(processFactory.createAnalyticsProcess(any(), any(), any(), any())).thenReturn(process); dataExtractor = mock(DataFrameDataExtractor.class); when(dataExtractor.collectDataSummary()).thenReturn(new DataFrameDataExtractor.DataSummary(NUM_ROWS, NUM_COLS)); dataExtractorFactory = mock(DataFrameDataExtractorFactory.class); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 0fc7c83b54c..9cd6343eab7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -7,7 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectStateProcessor; +import org.elasticsearch.xpack.ml.process.IndexingStateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; @@ -62,7 +62,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { mock(OutputStream.class), outputStream, mock(OutputStream.class), NUMBER_FIELDS, null, new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Consumer.class))) { - process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); + process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); ZonedDateTime startTime = process.getProcessStartTime(); Thread.sleep(500); @@ -85,7 +85,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Consumer.class))) { - process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); + process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); process.writeRecord(record); process.flushStream(); @@ -120,7 +120,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Consumer.class))) { - process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); + process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); FlushJobParams params = FlushJobParams.builder().build(); process.flushJob(params); @@ -170,7 +170,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Consumer.class))) { - process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); + process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); writeFunction.accept(process); process.writeUpdateModelPlotMessage(new ModelPlotConfig()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessorTests.java similarity index 95% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessorTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessorTests.java index 4f5477a75f8..f574782746c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessorTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process.autodetect.output; +package org.elasticsearch.xpack.ml.process; import com.carrotsearch.randomizedtesting.annotations.Timeout; import org.elasticsearch.action.ActionFuture; @@ -36,7 +36,7 @@ import static org.mockito.Mockito.when; /** * Tests for reading state from the native process. */ -public class AutodetectStateProcessorTests extends ESTestCase { +public class IndexingStateProcessorTests extends ESTestCase { private static final String STATE_SAMPLE = "" + "{\"index\": {\"_index\": \"test\", \"_id\": \"1\"}}\n" @@ -55,14 +55,14 @@ public class AutodetectStateProcessorTests extends ESTestCase { private static final int LARGE_DOC_SIZE = 1000000; private Client client; - private AutodetectStateProcessor stateProcessor; + private IndexingStateProcessor stateProcessor; @Before public void initialize() throws IOException { client = mock(Client.class); @SuppressWarnings("unchecked") ActionFuture bulkResponseFuture = mock(ActionFuture.class); - stateProcessor = spy(new AutodetectStateProcessor(client, JOB_ID)); + stateProcessor = spy(new IndexingStateProcessor(client, JOB_ID)); when(client.bulk(any(BulkRequest.class))).thenReturn(bulkResponseFuture); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool);