[7.x][ML] Parse and index data frame analytics state (#46804) (#46820)

This commit reuses the same state processor that is used for autodetect
to parse state output from data frame analytics jobs. We then index the
state document into the state index.

Backport of #46804
This commit is contained in:
Dimitris Athanasiou 2019-09-18 20:37:40 +03:00 committed by GitHub
parent b08b3741db
commit 02a5e153dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 91 additions and 40 deletions

View File

@ -32,4 +32,9 @@ public interface DataFrameAnalysis extends ToXContentObject, NamedWriteable {
* @return {@code true} if this analysis supports data frame rows with missing values
*/
boolean supportsMissingValues();
/**
* @return {@code true} if this analysis persists state that can later be used to restore from a given point
*/
boolean persistsState();
}

View File

@ -169,6 +169,11 @@ public class OutlierDetection implements DataFrameAnalysis {
return false;
}
@Override
public boolean persistsState() {
return false;
}
public enum Method {
LOF, LDOF, DISTANCE_KTH_NN, DISTANCE_KNN;

View File

@ -210,6 +210,11 @@ public class Regression implements DataFrameAnalysis {
return true;
}
@Override
public boolean persistsState() {
return true;
}
@Override
public int hashCode() {
return Objects.hash(dependentVariable, lambda, gamma, eta, maximumNumberTrees, featureBagFraction, predictionFieldName,

View File

@ -11,10 +11,12 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.junit.After;
import java.util.Arrays;
@ -109,6 +111,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
"Creating destination index [regression_single_numeric_feature_and_mixed_data_set_source_index_results]",
"Finished reindexing to destination index [regression_single_numeric_feature_and_mixed_data_set_source_index_results]",
"Finished analysis");
assertModelStatePersisted(jobId);
}
public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Exception {
@ -175,6 +178,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
"Creating destination index [regression_only_training_data_and_training_percent_is_hundred_source_index_results]",
"Finished reindexing to destination index [regression_only_training_data_and_training_percent_is_hundred_source_index_results]",
"Finished analysis");
assertModelStatePersisted(jobId);
}
public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception {
@ -251,5 +255,14 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
"Creating destination index [regression_only_training_data_and_training_percent_is_fifty_source_index_results]",
"Finished reindexing to destination index [regression_only_training_data_and_training_percent_is_fifty_source_index_results]",
"Finished analysis");
assertModelStatePersisted(jobId);
}
private void assertModelStatePersisted(String jobId) {
String docId = jobId + "_regression_state#1";
SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
.setQuery(QueryBuilders.idsQuery().addIds(docId))
.get();
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
}
}

View File

@ -514,7 +514,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
client,
clusterService);
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController, clusterService);
analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController, clusterService);
analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, client, nativeController, clusterService);
memoryEstimationProcessFactory =
new NativeMemoryUsageEstimationProcessFactory(environment, nativeController, clusterService);
} catch (IOException e) {

View File

@ -16,6 +16,7 @@ import java.util.Set;
public class AnalyticsProcessConfig implements ToXContentObject {
private static final String JOB_ID = "job_id";
private static final String ROWS = "rows";
private static final String COLS = "cols";
private static final String MEMORY_LIMIT = "memory_limit";
@ -24,6 +25,7 @@ public class AnalyticsProcessConfig implements ToXContentObject {
private static final String RESULTS_FIELD = "results_field";
private static final String CATEGORICAL_FIELDS = "categorical_fields";
private final String jobId;
private final long rows;
private final int cols;
private final ByteSizeValue memoryLimit;
@ -32,8 +34,9 @@ public class AnalyticsProcessConfig implements ToXContentObject {
private final Set<String> categoricalFields;
private final DataFrameAnalysis analysis;
public AnalyticsProcessConfig(long rows, int cols, ByteSizeValue memoryLimit, int threads, String resultsField,
public AnalyticsProcessConfig(String jobId, long rows, int cols, ByteSizeValue memoryLimit, int threads, String resultsField,
Set<String> categoricalFields, DataFrameAnalysis analysis) {
this.jobId = Objects.requireNonNull(jobId);
this.rows = rows;
this.cols = cols;
this.memoryLimit = Objects.requireNonNull(memoryLimit);
@ -54,6 +57,7 @@ public class AnalyticsProcessConfig implements ToXContentObject {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(JOB_ID, jobId);
builder.field(ROWS, rows);
builder.field(COLS, cols);
builder.field(MEMORY_LIMIT, memoryLimit.getBytes());

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.ml.dataframe.process;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
@ -13,12 +15,12 @@ public interface AnalyticsProcessFactory<ProcessResult> {
/**
* Create an implementation of {@link AnalyticsProcess}
*
* @param jobId The job id
* @param config The data frame analytics config
* @param analyticsProcessConfig The process configuration
* @param executorService Executor service used to start the async tasks a job needs to operate the analytical process
* @param onProcessCrash Callback to execute if the process stops unexpectedly
* @param executorService Executor service used to start the async tasks a job needs to operate the analytical process
* @param onProcessCrash Callback to execute if the process stops unexpectedly
* @return The process
*/
AnalyticsProcess<ProcessResult> createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig,
AnalyticsProcess<ProcessResult> createAnalyticsProcess(DataFrameAnalyticsConfig config, AnalyticsProcessConfig analyticsProcessConfig,
ExecutorService executorService, Consumer<String> onProcessCrash);
}

View File

@ -172,9 +172,10 @@ public class AnalyticsProcessManager {
process.writeRecord(headerRecord);
}
private AnalyticsProcess<AnalyticsResult> createProcess(DataFrameAnalyticsTask task, AnalyticsProcessConfig analyticsProcessConfig) {
private AnalyticsProcess<AnalyticsResult> createProcess(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
AnalyticsProcessConfig analyticsProcessConfig) {
ExecutorService executorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);
AnalyticsProcess<AnalyticsResult> process = processFactory.createAnalyticsProcess(task.getParams().getId(), analyticsProcessConfig,
AnalyticsProcess<AnalyticsResult> process = processFactory.createAnalyticsProcess(config, analyticsProcessConfig,
executorService, onProcessCrash(task));
if (process.isProcessAlive() == false) {
throw ExceptionsHelper.serverError("Failed to start data frame analytics process");
@ -289,7 +290,7 @@ public class AnalyticsProcessManager {
LOGGER.info("[{}] no data found to analyze. Will not start analytics native process.", config.getId());
return false;
}
process = createProcess(task, analyticsProcessConfig);
process = createProcess(task, config, analyticsProcessConfig);
DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client,
dataExtractorFactory.newExtractor(true));
resultProcessor = new AnalyticsResultProcessor(id, dataFrameRowsJoiner, this::isProcessKilled, task.getProgressTracker());
@ -299,7 +300,7 @@ public class AnalyticsProcessManager {
private AnalyticsProcessConfig createProcessConfig(DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor) {
DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary();
Set<String> categoricalFields = dataExtractor.getCategoricalFields();
AnalyticsProcessConfig processConfig = new AnalyticsProcessConfig(dataSummary.rows, dataSummary.cols,
AnalyticsProcessConfig processConfig = new AnalyticsProcessConfig(config.getId(), dataSummary.rows, dataSummary.cols,
config.getModelMemoryLimit(), 1, config.getDest().getResultsField(), categoricalFields, config.getAnalysis());
return processConfig;
}

View File

@ -63,6 +63,7 @@ public class MemoryUsageEstimationProcessManager {
}
AnalyticsProcessConfig processConfig =
new AnalyticsProcessConfig(
jobId,
dataSummary.rows,
dataSummary.cols,
// For memory estimation the model memory limit here should be set high enough not to trigger an error when C++ code
@ -74,7 +75,7 @@ public class MemoryUsageEstimationProcessManager {
config.getAnalysis());
AnalyticsProcess<MemoryUsageEstimationResult> process =
processFactory.createAnalyticsProcess(
jobId,
config,
processConfig,
executorServiceForProcess,
// The handler passed here will never be called as AbstractNativeProcess.detectCrash method returns early when

View File

@ -7,14 +7,17 @@ package org.elasticsearch.xpack.ml.dataframe.process;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult;
import org.elasticsearch.xpack.ml.process.IndexingStateProcessor;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
@ -34,12 +37,14 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
private final Client client;
private final Environment env;
private final NativeController nativeController;
private volatile Duration processConnectTimeout;
public NativeAnalyticsProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
public NativeAnalyticsProcessFactory(Environment env, Client client, NativeController nativeController, ClusterService clusterService) {
this.env = Objects.requireNonNull(env);
this.client = Objects.requireNonNull(client);
this.nativeController = Objects.requireNonNull(nativeController);
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
@ -51,11 +56,12 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
}
@Override
public NativeAnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig,
ExecutorService executorService, Consumer<String> onProcessCrash) {
public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig config, AnalyticsProcessConfig analyticsProcessConfig,
ExecutorService executorService, Consumer<String> onProcessCrash) {
String jobId = config.getId();
List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId,
true, false, true, true, false, false);
true, false, true, true, false, config.getAnalysis().persistsState());
// The extra 2 are for the checksum and the control field
int numberOfFields = analyticsProcessConfig.cols() + 2;
@ -67,7 +73,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
filesToDelete, onProcessCrash, analyticsProcessConfig);
try {
analyticsProcess.start(executorService);
startProcess(config, executorService, processPipes, analyticsProcess);
return analyticsProcess;
} catch (EsRejectedExecutionException e) {
try {
@ -79,6 +85,16 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
}
}
private void startProcess(DataFrameAnalyticsConfig config, ExecutorService executorService, ProcessPipes processPipes,
NativeAnalyticsProcess process) {
if (config.getAnalysis().persistsState()) {
IndexingStateProcessor stateProcessor = new IndexingStateProcessor(client, config.getId());
process.start(executorService, stateProcessor, processPipes.getPersistStream().get());
} else {
process.start(executorService);
}
}
private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, List<Path> filesToDelete,
ProcessPipes processPipes) {
AnalyticsBuilder analyticsBuilder =

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult;
@ -52,18 +53,18 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce
@Override
public NativeMemoryUsageEstimationProcess createAnalyticsProcess(
String jobId,
DataFrameAnalyticsConfig config,
AnalyticsProcessConfig analyticsProcessConfig,
ExecutorService executorService,
Consumer<String> onProcessCrash) {
List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(
env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId, true, false, false, true, false, false);
env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, config.getId(), true, false, false, true, false, false);
createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes);
createNativeProcess(config.getId(), analyticsProcessConfig, filesToDelete, processPipes);
NativeMemoryUsageEstimationProcess process = new NativeMemoryUsageEstimationProcess(
jobId,
config.getId(),
processPipes.getLogStream().get(),
// Memory estimation process does not use the input pipe, hence null.
null,

View File

@ -17,7 +17,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectStateProcessor;
import org.elasticsearch.xpack.ml.process.IndexingStateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.process.NativeController;
@ -77,7 +77,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
// The extra 1 is the control field
int numberOfFields = job.allInputFields().size() + (includeTokensField ? 1 : 0) + 1;
AutodetectStateProcessor stateProcessor = new AutodetectStateProcessor(client, job.getId());
IndexingStateProcessor stateProcessor = new IndexingStateProcessor(client, job.getId());
ProcessResultsParser<AutodetectResult> resultsParser = new ProcessResultsParser<>(AutodetectResult.PARSER);
NativeAutodetectProcess autodetect = new NativeAutodetectProcess(
job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.output;
package org.elasticsearch.xpack.ml.process;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -15,7 +15,6 @@ import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.process.StateProcessor;
import java.io.IOException;
import java.io.InputStream;
@ -25,18 +24,18 @@ import java.util.List;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
/**
* Reads the autodetect state and persists via a bulk request
* Reads state documents of a stream, splits them and persists to an index via a bulk request
*/
public class AutodetectStateProcessor implements StateProcessor {
public class IndexingStateProcessor implements StateProcessor {
private static final Logger LOGGER = LogManager.getLogger(AutodetectStateProcessor.class);
private static final Logger LOGGER = LogManager.getLogger(IndexingStateProcessor.class);
private static final int READ_BUF_SIZE = 8192;
private final Client client;
private final String jobId;
public AutodetectStateProcessor(Client client, String jobId) {
public IndexingStateProcessor(Client client, String jobId) {
this.client = client;
this.jobId = jobId;
}

View File

@ -88,7 +88,7 @@ public class AnalyticsResultProcessorTests extends ESTestCase {
private void givenDataFrameRows(int rows) {
AnalyticsProcessConfig config = new AnalyticsProcessConfig(
rows, 1, ByteSizeValue.ZERO, 1, "ml", Collections.emptySet(), mock(DataFrameAnalysis.class));
"job_id", rows, 1, ByteSizeValue.ZERO, 1, "ml", Collections.emptySet(), mock(DataFrameAnalysis.class));
when(process.getConfig()).thenReturn(config);
}

View File

@ -29,7 +29,6 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@ -68,7 +67,7 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase {
process = mock(AnalyticsProcess.class);
when(process.readAnalyticsResults()).thenReturn(Arrays.asList(PROCESS_RESULT).iterator());
processFactory = mock(AnalyticsProcessFactory.class);
when(processFactory.createAnalyticsProcess(anyString(), any(), any(), any())).thenReturn(process);
when(processFactory.createAnalyticsProcess(any(), any(), any(), any())).thenReturn(process);
dataExtractor = mock(DataFrameDataExtractor.class);
when(dataExtractor.collectDataSummary()).thenReturn(new DataFrameDataExtractor.DataSummary(NUM_ROWS, NUM_COLS));
dataExtractorFactory = mock(DataFrameDataExtractorFactory.class);

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectStateProcessor;
import org.elasticsearch.xpack.ml.process.IndexingStateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
@ -62,7 +62,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
mock(OutputStream.class), outputStream, mock(OutputStream.class),
NUMBER_FIELDS, null,
new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Consumer.class))) {
process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
ZonedDateTime startTime = process.getProcessStartTime();
Thread.sleep(500);
@ -85,7 +85,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Consumer.class))) {
process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
process.writeRecord(record);
process.flushStream();
@ -120,7 +120,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Consumer.class))) {
process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
FlushJobParams params = FlushJobParams.builder().build();
process.flushJob(params);
@ -170,7 +170,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Consumer.class))) {
process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
writeFunction.accept(process);
process.writeUpdateModelPlotMessage(new ModelPlotConfig());

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.output;
package org.elasticsearch.xpack.ml.process;
import com.carrotsearch.randomizedtesting.annotations.Timeout;
import org.elasticsearch.action.ActionFuture;
@ -36,7 +36,7 @@ import static org.mockito.Mockito.when;
/**
* Tests for reading state from the native process.
*/
public class AutodetectStateProcessorTests extends ESTestCase {
public class IndexingStateProcessorTests extends ESTestCase {
private static final String STATE_SAMPLE = ""
+ "{\"index\": {\"_index\": \"test\", \"_id\": \"1\"}}\n"
@ -55,14 +55,14 @@ public class AutodetectStateProcessorTests extends ESTestCase {
private static final int LARGE_DOC_SIZE = 1000000;
private Client client;
private AutodetectStateProcessor stateProcessor;
private IndexingStateProcessor stateProcessor;
@Before
public void initialize() throws IOException {
client = mock(Client.class);
@SuppressWarnings("unchecked")
ActionFuture<BulkResponse> bulkResponseFuture = mock(ActionFuture.class);
stateProcessor = spy(new AutodetectStateProcessor(client, JOB_ID));
stateProcessor = spy(new IndexingStateProcessor(client, JOB_ID));
when(client.bulk(any(BulkRequest.class))).thenReturn(bulkResponseFuture);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);