[ML] Fix native process threading protection and restructure ml threadpools

The native process can only handle one operation at a time, so in order the protect against multiple operation at a time (e.g. post data and flush or multiple post data operations) there should be protection in place to guarantee that at most only a single thread interacts with the native process. The current protection is broken when a job close is executed, more specifically the wait logic is broken here.

This commit changes the threading logic when interacting with the native process by using a custom `ExecutorService` that that uses a single worker thread from `ml_autodetect_process` thread pool to interact with the native process. Requests from the ml apis are initially being queued and this worker thread executes these requests one by one in the order they were specified.

Removed the general `ml` threadpool and replaced its usages with `ml_autodetect_process` or `management` threadpool.
Added a new threadpool just for (re)normalizer, so that these operations are isolated from other operations.

relates elastic/x-pack-elasticsearch#582

Original commit: elastic/x-pack-elasticsearch@ff0c8dce0b
This commit is contained in:
Martijn van Groningen 2017-03-16 11:36:17 +01:00
parent 7da4724b15
commit aba4760b02
12 changed files with 296 additions and 253 deletions

View File

@ -148,9 +148,9 @@ import static java.util.Collections.emptyList;
public class MachineLearning implements ActionPlugin {
public static final String NAME = "ml";
public static final String BASE_PATH = "/_xpack/ml/";
public static final String THREAD_POOL_NAME = NAME;
public static final String DATAFEED_RUNNER_THREAD_POOL_NAME = NAME + "_datafeed_runner";
public static final String AUTODETECT_PROCESS_THREAD_POOL_NAME = NAME + "_autodetect_process";
public static final String DATAFEED_THREAD_POOL_NAME = NAME + "_datafeed";
public static final String AUTODETECT_THREAD_POOL_NAME = NAME + "_autodetect";
public static final String NORMALIZER_THREAD_POOL_NAME = NAME + "_normalizer";
public static final Setting<Boolean> AUTODETECT_PROCESS =
Setting.boolSetting("xpack.ml.autodetect_process", true, Property.NodeScope);
@ -294,7 +294,7 @@ public class MachineLearning implements ActionPlugin {
executorService) -> new MultiplyingNormalizerProcess(settings, 1.0);
}
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
threadPool.executor(MachineLearning.THREAD_POOL_NAME));
threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME));
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool,
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, xContentRegistry);
@ -438,17 +438,20 @@ public class MachineLearning implements ActionPlugin {
return emptyList();
}
int maxNumberOfJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings);
FixedExecutorBuilder ml = new FixedExecutorBuilder(settings, THREAD_POOL_NAME,
maxNumberOfJobs * 2, 1000, "xpack.ml.thread_pool");
// 4 threads: for cpp logging, result processing, state processing and
// AutodetectProcessManager worker thread:
FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_THREAD_POOL_NAME,
maxNumberOfJobs * 4, 4, "xpack.ml.autodetect_thread_pool");
// 3 threads: for c++ logging, result processing, state processing
FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_PROCESS_THREAD_POOL_NAME,
maxNumberOfJobs * 3, 200, "xpack.ml.autodetect_process_thread_pool");
// 3 threads: normalization (cpp logging, result handling) and
// renormalization (ShortCircuitingRenormalizer):
FixedExecutorBuilder renormalizer = new FixedExecutorBuilder(settings, NORMALIZER_THREAD_POOL_NAME,
maxNumberOfJobs * 3, 200, "xpack.ml.normalizer_thread_pool");
// TODO: if datafeed and non datafeed jobs are considered more equal and the datafeed and
// autodetect process are created at the same time then these two different TPs can merge.
FixedExecutorBuilder datafeed = new FixedExecutorBuilder(settings, DATAFEED_RUNNER_THREAD_POOL_NAME,
FixedExecutorBuilder datafeed = new FixedExecutorBuilder(settings, DATAFEED_THREAD_POOL_NAME,
maxNumberOfJobs, 200, "xpack.ml.datafeed_thread_pool");
return Arrays.asList(ml, autoDetect, datafeed);
return Arrays.asList(autoDetect, renormalizer, datafeed);
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
import org.elasticsearch.xpack.ml.notifications.Auditor;
@ -134,7 +133,7 @@ public class DeleteExpiredDataAction extends Action<DeleteExpiredDataAction.Requ
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
logger.info("Deleting expired data");
threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener));
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> deleteExpiredData(listener));
}
private void deleteExpiredData(ActionListener<Response> listener) {

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
@ -244,7 +243,8 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutodetectProcessManager processManager) {
super(settings, FlushJobAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
FlushJobAction.Request::new, FlushJobAction.Response::new, MachineLearning.THREAD_POOL_NAME, processManager);
FlushJobAction.Request::new, FlushJobAction.Response::new, ThreadPool.Names.SAME, processManager);
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
}
@Override
@ -270,8 +270,13 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
timeRangeBuilder.endTime(request.getEnd());
}
paramsBuilder.forTimeRange(timeRangeBuilder.build());
processManager.flushJob(request.getJobId(), paramsBuilder.build());
listener.onResponse(new Response(true));
processManager.flushJob(request.getJobId(), paramsBuilder.build(), e -> {
if (e == null) {
listener.onResponse(new Response(true));
} else {
listener.onFailure(e);
}
});
}
}
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
@ -241,7 +240,8 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutodetectProcessManager processManager) {
super(settings, PostDataAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, processManager);
Request::new, Response::new, ThreadPool.Names.SAME, processManager);
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
}
@Override
@ -256,9 +256,14 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build();
DataLoadParams params = new DataLoadParams(timeRange, Optional.ofNullable(request.getDataDescription()));
try {
DataCounts dataCounts = processManager.processData(request.getJobId(),
request.content.streamInput(), request.getXContentType(), params);
listener.onResponse(new Response(dataCounts));
processManager.processData(request.getJobId(),
request.content.streamInput(), request.getXContentType(), params, (dataCounts, e) -> {
if (dataCounts != null) {
listener.onResponse(new Response(dataCounts));
} else {
listener.onFailure(e);
}
});
} catch (Exception e) {
listener.onFailure(e);
}

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
@ -225,7 +224,7 @@ public class StopDatafeedAction
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, InternalClient client) {
super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, Response::new, MachineLearning.THREAD_POOL_NAME);
indexNameExpressionResolver, Request::new, Response::new, ThreadPool.Names.MANAGEMENT);
this.client = client;
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
@ -187,7 +186,8 @@ public class UpdateProcessAction extends
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutodetectProcessManager processManager) {
super(settings, NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, processManager);
Request::new, Response::new, ThreadPool.Names.SAME, processManager);
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
}
@Override
@ -199,15 +199,18 @@ public class UpdateProcessAction extends
@Override
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener, ClusterState state) {
threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> {
try {
processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(),
request.getModelPlotConfig());
listener.onResponse(new Response());
} catch (Exception e) {
listener.onFailure(e);
}
});
try {
processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(),
request.getModelPlotConfig(), e -> {
if (e == null) {
listener.onResponse(new Response());
} else {
listener.onFailure(e);
}
});
} catch (Exception e) {
listener.onFailure(e);
}
}
}
}

View File

@ -129,7 +129,7 @@ public class DatafeedJobRunner extends AbstractComponent {
logger.info("Starting datafeed [{}] for job [{}] in [{}, {})", holder.datafeed.getId(), holder.datafeed.getJobId(),
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(startTime),
endTime == null ? INF_SYMBOL : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(endTime));
holder.future = threadPool.executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME).submit(new AbstractRunnable() {
holder.future = threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME).submit(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
@ -181,7 +181,7 @@ public class DatafeedJobRunner extends AbstractComponent {
if (holder.isRunning()) {
TimeValue delay = computeNextDelay(delayInMsSinceEpoch);
logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId);
holder.future = threadPool.schedule(delay, MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME, new AbstractRunnable() {
holder.future = threadPool.schedule(delay, MachineLearning.DATAFEED_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {

View File

@ -8,17 +8,16 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
@ -28,7 +27,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.Closeable;
import java.io.IOException;
@ -37,10 +35,11 @@ import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class AutodetectCommunicator implements Closeable {
@ -52,19 +51,20 @@ public class AutodetectCommunicator implements Closeable {
private final AutodetectProcess autodetectProcess;
private final AutoDetectResultProcessor autoDetectResultProcessor;
private final Consumer<Exception> handler;
final AtomicReference<CountDownLatch> inUse = new AtomicReference<>();
private final ExecutorService autodetectWorkerExecutor;
private final NamedXContentRegistry xContentRegistry;
AutodetectCommunicator(Job job, AutodetectProcess process, DataCountsReporter dataCountsReporter,
AutoDetectResultProcessor autoDetectResultProcessor, Consumer<Exception> handler,
NamedXContentRegistry xContentRegistry) {
AutodetectCommunicator(Job job, AutodetectProcess process,
DataCountsReporter dataCountsReporter,
AutoDetectResultProcessor autoDetectResultProcessor, Consumer<Exception> handler,
NamedXContentRegistry xContentRegistry, ExecutorService autodetectWorkerExecutor) {
this.job = job;
this.autodetectProcess = process;
this.dataCountsReporter = dataCountsReporter;
this.autoDetectResultProcessor = autoDetectResultProcessor;
this.handler = handler;
this.xContentRegistry = xContentRegistry;
this.autodetectWorkerExecutor = autodetectWorkerExecutor;
}
public void writeJobInputHeader() throws IOException {
@ -77,9 +77,9 @@ public class AutodetectCommunicator implements Closeable {
dataCountsReporter, xContentRegistry);
}
public DataCounts writeToJob(InputStream inputStream, XContentType xContentType,
DataLoadParams params) throws IOException {
return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, job.getId()), () -> {
public void writeToJob(InputStream inputStream, XContentType xContentType,
DataLoadParams params, BiConsumer<DataCounts, Exception> handler) throws IOException {
submitOperation(() -> {
if (params.isResettingBuckets()) {
autodetectProcess.writeResetBucketsControlMessage(params);
}
@ -89,7 +89,7 @@ public class AutodetectCommunicator implements Closeable {
DataCounts results = autoDetectWriter.write(countingStream, xContentType);
autoDetectWriter.flush();
return results;
}, false);
}, handler);
}
@Override
@ -104,38 +104,49 @@ public class AutodetectCommunicator implements Closeable {
* @param reason The reason for closing the job
*/
public void close(boolean restart, String reason) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_CLOSE, job.getId()), () -> {
LOGGER.info("[{}] job closing, reason [{}]", job.getId(), reason);
Future<?> future = autodetectWorkerExecutor.submit(() -> {
checkProcessIsAlive();
dataCountsReporter.close();
autodetectProcess.close();
autoDetectResultProcessor.awaitCompletion();
handler.accept(restart ? new ElasticsearchException(reason) : null);
LOGGER.info("[{}] job closed", job.getId());
return null;
}, true);
});
try {
future.get();
autodetectWorkerExecutor.shutdown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw ExceptionsHelper.convertToElastic(e);
}
}
public void writeUpdateModelPlotMessage(ModelPlotConfig config) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> {
autodetectProcess.writeUpdateModelPlotMessage(config);
public void writeUpdateProcessMessage(ModelPlotConfig config, List<JobUpdate.DetectorUpdate> updates,
BiConsumer<Void, Exception> handler) throws IOException {
submitOperation(() -> {
if (config != null) {
autodetectProcess.writeUpdateModelPlotMessage(config);
}
if (updates != null) {
for (JobUpdate.DetectorUpdate update : updates) {
if (update.getRules() != null) {
autodetectProcess.writeUpdateDetectorRulesMessage(update.getIndex(), update.getRules());
}
}
}
return null;
}, false);
}, handler);
}
public void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> {
autodetectProcess.writeUpdateDetectorRulesMessage(detectorIndex, rules);
return null;
}, false);
}
public void flushJob(InterimResultsParams params) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_FLUSH, job.getId()), () -> {
public void flushJob(InterimResultsParams params, BiConsumer<Void, Exception> handler) throws IOException {
submitOperation(() -> {
String flushId = autodetectProcess.flushJob(params);
waitFlushToCompletion(flushId);
return null;
}, false);
}, handler);
}
private void waitFlushToCompletion(String flushId) throws IOException {
@ -166,7 +177,7 @@ public class AutodetectCommunicator implements Closeable {
ParameterizedMessage message =
new ParameterizedMessage("[{}] Unexpected death of autodetect: {}", job.getId(), autodetectProcess.readError());
LOGGER.error(message);
throw ExceptionsHelper.serverError(message.getFormattedMessage());
throw new ElasticsearchException(message.getFormattedMessage());
}
}
@ -182,33 +193,19 @@ public class AutodetectCommunicator implements Closeable {
return dataCountsReporter.runningTotalStats();
}
private <T> T checkAndRun(Supplier<String> errorMessage, CheckedSupplier<T, IOException> callback, boolean wait) throws IOException {
CountDownLatch latch = new CountDownLatch(1);
if (inUse.compareAndSet(null, latch)) {
try {
checkProcessIsAlive();
return callback.get();
} finally {
latch.countDown();
inUse.set(null);
private <T> void submitOperation(CheckedSupplier<T, IOException> operation, BiConsumer<T, Exception> handler) throws IOException {
autodetectWorkerExecutor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
handler.accept(null, e);
}
} else {
if (wait) {
latch = inUse.get();
if (latch != null) {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS);
}
}
@Override
protected void doRun() throws Exception {
checkProcessIsAlive();
return callback.get();
} else {
throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS);
handler.accept(operation.get(), null);
}
}
});
}
}

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
@ -52,10 +53,17 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class AutodetectProcessManager extends AbstractComponent {
@ -133,16 +141,16 @@ public class AutodetectProcessManager extends AbstractComponent {
* @param input Data input stream
* @param xContentType the {@link XContentType} of the input
* @param params Data processing parameters
* @return Count of records, fields, bytes, etc written
* @param handler Delegate error or datacount results (Count of records, fields, bytes, etc written)
*/
public DataCounts processData(String jobId, InputStream input, XContentType xContentType,
DataLoadParams params) {
public void processData(String jobId, InputStream input, XContentType xContentType,
DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
throw new IllegalStateException("[" + jobId + "] Cannot process data: no active autodetect process for job");
}
try {
return communicator.writeToJob(input, xContentType, params);
communicator.writeToJob(input, xContentType, params, handler);
// TODO check for errors from autodetect
} catch (IOException e) {
String msg = String.format(Locale.ROOT, "Exception writing to process for job %s", jobId);
@ -165,7 +173,7 @@ public class AutodetectProcessManager extends AbstractComponent {
* @param params Parameters about whether interim results calculation
* should occur and for which period of time
*/
public void flushJob(String jobId, InterimResultsParams params) {
public void flushJob(String jobId, InterimResultsParams params, Consumer<Exception> handler) {
logger.debug("Flushing job {}", jobId);
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
@ -174,7 +182,15 @@ public class AutodetectProcessManager extends AbstractComponent {
throw new IllegalArgumentException(message);
}
try {
communicator.flushJob(params);
communicator.flushJob(params, (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobId);
logger.error(msg);
handler.accept(ExceptionsHelper.serverError(msg, e));
}
});
// TODO check for errors from autodetect
} catch (IOException ioe) {
String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobId);
@ -183,25 +199,21 @@ public class AutodetectProcessManager extends AbstractComponent {
}
}
public void writeUpdateProcessMessage(String jobId, List<JobUpdate.DetectorUpdate> updates, ModelPlotConfig config)
throws IOException {
public void writeUpdateProcessMessage(String jobId, List<JobUpdate.DetectorUpdate> updates, ModelPlotConfig config,
Consumer<Exception> handler) throws IOException {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId);
handler.accept(null);
return;
}
if (config != null) {
communicator.writeUpdateModelPlotMessage(config);
}
if (updates != null) {
for (JobUpdate.DetectorUpdate update : updates) {
if (update.getRules() != null) {
communicator.writeUpdateDetectorRulesMessage(update.getIndex(), update.getRules());
}
communicator.writeUpdateProcessMessage(config, updates, (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
handler.accept(e);
}
}
});
// TODO check for errors from autodetects
}
@ -257,22 +269,25 @@ public class AutodetectProcessManager extends AbstractComponent {
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
// A TP with no queue, so that we fail immediately if there are no threads available
ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_PROCESS_THREAD_POOL_NAME);
ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job,
autodetectParams.dataCounts(), jobDataCountsPersister)) {
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client),
normalizerFactory);
ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME);
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater,
threadPool.executor(MachineLearning.THREAD_POOL_NAME), job.getAnalysisConfig().getUsePerPartitionNormalization());
renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization());
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(),
autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime,
executorService, () -> setJobState(jobTask, JobState.FAILED));
autoDetectExecutorService, () -> setJobState(jobTask, JobState.FAILED));
boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization();
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(
client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats());
ExecutorService autodetectWorkerExecutor;
try {
executorService.submit(() -> processor.process(process, usePerPartitionNormalization));
autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService);
autoDetectExecutorService.submit(() -> processor.process(process, usePerPartitionNormalization));
} catch (EsRejectedExecutionException e) {
// If submitting the operation to read the results from the process fails we need to close
// the process too, so that other submitted operations to threadpool are stopped.
@ -284,7 +299,7 @@ public class AutodetectProcessManager extends AbstractComponent {
throw e;
}
return new AutodetectCommunicator(job, process, dataCountsReporter, processor,
handler, xContentRegistry);
handler, xContentRegistry, autodetectWorkerExecutor);
}
}
@ -376,4 +391,81 @@ public class AutodetectProcessManager extends AbstractComponent {
}
return Optional.of(new Tuple<>(communicator.getDataCounts(), communicator.getModelSizeStats()));
}
ExecutorService createAutodetectExecutorService(ExecutorService executorService) {
AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext());
executorService.submit(autoDetectWorkerExecutor::start);
return autoDetectWorkerExecutor;
}
/*
* The autodetect native process can only handle a single operation at a time. In order to guarantee that, all
* operations are initially added to a queue and a worker thread from ml autodetect threadpool will process each
* operation at a time.
*/
class AutodetectWorkerExecutorService extends AbstractExecutorService {
private final ThreadContext contextHolder;
private final CountDownLatch awaitTermination = new CountDownLatch(1);
private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(100);
private volatile boolean running = true;
AutodetectWorkerExecutorService(ThreadContext contextHolder) {
this.contextHolder = contextHolder;
}
@Override
public void shutdown() {
running = false;
}
@Override
public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException("not supported");
}
@Override
public boolean isShutdown() {
return running == false;
}
@Override
public boolean isTerminated() {
return awaitTermination.getCount() == 0;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return awaitTermination.await(timeout, unit);
}
@Override
public void execute(Runnable command) {
boolean added = queue.offer(contextHolder.preserveContext(command));
if (added == false) {
throw new ElasticsearchStatusException("Unable to submit operation", RestStatus.TOO_MANY_REQUESTS);
}
}
void start() {
try {
while (running) {
Runnable runnable = queue.poll(500, TimeUnit.MILLISECONDS);
if (runnable != null) {
try {
runnable.run();
} catch (Exception e) {
logger.error("error handeling job operation", e);
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
awaitTermination.countDown();
}
}
}
}

View File

@ -137,7 +137,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).submit(any(Runnable.class));
when(threadPool.executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME)).thenReturn(executorService);
when(threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME)).thenReturn(executorService);
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
when(client.execute(same(PostDataAction.INSTANCE), any())).thenReturn(jobDataFuture);
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
@ -166,7 +166,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedJobRunner.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client, never()).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo")));
verify(client, never()).execute(same(FlushJobAction.INSTANCE), any());
@ -188,7 +188,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedJobRunner.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client).execute(same(PostDataAction.INSTANCE),
eq(createExpectedPostDataRequest("job_id", contentBytes, xContentType)));
@ -218,7 +218,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedJobRunner.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client, never()).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo")));
verify(client, never()).execute(same(FlushJobAction.INSTANCE), any());
@ -254,7 +254,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
DatafeedJobRunner.Holder holder = datafeedJobRunner.createJobDatafeed(datafeedConfig, job, 100, 100, handler, task);
datafeedJobRunner.doDatafeedRealtime(10L, "foo", holder);
verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME), any());
verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any());
verify(auditor, times(1)).warning(eq("job_id"), anyString());
verify(client, never()).execute(same(PostDataAction.INSTANCE), any());
verify(client, never()).execute(same(FlushJobAction.INSTANCE), any());
@ -279,7 +279,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
task = spyDatafeedTask(task);
datafeedJobRunner.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
if (cancelled) {
task.stop("test");
verify(handler).accept(null);
@ -287,7 +287,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
verify(client).execute(same(PostDataAction.INSTANCE),
eq(createExpectedPostDataRequest("job_id", contentBytes, xContentType)));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME), any());
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any());
}
}

View File

@ -6,16 +6,13 @@
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
@ -25,18 +22,16 @@ import org.mockito.Mockito;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@ -51,30 +46,11 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) {
communicator.writeToJob(new ByteArrayInputStream(new byte[0]),
randomFrom(XContentType.values()), params);
randomFrom(XContentType.values()), params, (dataCounts, e) -> {});
Mockito.verify(process).writeResetBucketsControlMessage(params);
}
}
public void tesWriteUpdateModelPlotMessage() throws IOException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) {
ModelPlotConfig config = new ModelPlotConfig();
communicator.writeUpdateModelPlotMessage(config);
Mockito.verify(process).writeUpdateModelPlotMessage(config);
}
}
public void testWriteUpdateDetectorRulesMessage() throws IOException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) {
List<DetectionRule> rules = Collections.singletonList(mock(DetectionRule.class));
communicator.writeUpdateDetectorRulesMessage(1, rules);
Mockito.verify(process).writeUpdateDetectorRulesMessage(1, rules);
}
}
public void testFlushJob() throws IOException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(true);
@ -82,7 +58,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(true);
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor)) {
InterimResultsParams params = InterimResultsParams.builder().build();
communicator.flushJob(params);
communicator.flushJob(params, (aVoid, e) -> {});
Mockito.verify(process).flushJob(params);
}
}
@ -93,8 +69,9 @@ public class AutodetectCommunicatorTests extends ESTestCase {
when(process.readError()).thenReturn("Mock process is dead");
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class));
InterimResultsParams params = InterimResultsParams.builder().build();
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> communicator.flushJob(params));
assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", e.getMessage());
Exception[] holder = new ElasticsearchException[1];
communicator.flushJob(params, (aVoid, e1) -> holder[0] = e1);
assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", holder[0].getMessage());
}
public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException {
@ -106,7 +83,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
InterimResultsParams params = InterimResultsParams.builder().build();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autoDetectResultProcessor)) {
communicator.flushJob(params);
communicator.flushJob(params, (aVoid, e) -> {});
}
verify(autoDetectResultProcessor, times(2)).waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1)));
@ -146,6 +123,12 @@ public class AutodetectCommunicatorTests extends ESTestCase {
private AutodetectCommunicator createAutodetectCommunicator(AutodetectProcess autodetectProcess,
AutoDetectResultProcessor autoDetectResultProcessor) throws IOException {
ExecutorService executorService = mock(ExecutorService.class);
when(executorService.submit(any(Callable.class))).thenReturn(mock(Future.class));
doAnswer(invocationOnMock -> {
Callable runnable = (Callable) invocationOnMock.getArguments()[0];
runnable.call();
return mock(Future.class);
}).when(executorService).submit(any(Callable.class));
doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
return null;
@ -153,76 +136,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class);
return new AutodetectCommunicator(createJobDetails(), autodetectProcess,
dataCountsReporter, autoDetectResultProcessor, e -> {
}, new NamedXContentRegistry(Collections.emptyList()));
}, new NamedXContentRegistry(Collections.emptyList()), executorService);
}
public void testWriteToJobInUse() throws IOException {
InputStream in = mock(InputStream.class);
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class));
XContentType xContentType = randomFrom(XContentType.values());
communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class,
() -> communicator.writeToJob(in, xContentType, mock(DataLoadParams.class)));
communicator.inUse.set(null);
communicator.writeToJob(in, xContentType,
new DataLoadParams(TimeRange.builder().build(), Optional.empty()));
}
public void testFlushInUse() throws IOException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class);
when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
communicator.inUse.set(new CountDownLatch(1));
InterimResultsParams params = mock(InterimResultsParams.class);
expectThrows(ElasticsearchStatusException.class, () -> communicator.flushJob(params));
communicator.inUse.set(null);
communicator.flushJob(params);
}
public void testCloseInUse() throws Exception {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class);
when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
CountDownLatch latch = mock(CountDownLatch.class);
communicator.inUse.set(latch);
communicator.close();
verify(latch, times(1)).await();
communicator.inUse.set(null);
communicator.close();
}
public void testWriteUpdateModelPlotConfigMessageInUse() throws Exception {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateModelPlotMessage(mock(ModelPlotConfig.class)));
communicator.inUse.set(null);
communicator.writeUpdateModelPlotMessage(mock(ModelPlotConfig.class));
}
public void testWriteUpdateDetectorRulesMessageInUse() throws Exception {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
List<DetectionRule> rules = Collections.singletonList(mock(DetectionRule.class));
communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateDetectorRulesMessage(0, rules));
communicator.inUse.set(null);
communicator.writeUpdateDetectorRulesMessage(0, rules);
}
}

View File

@ -5,12 +5,12 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
@ -51,7 +51,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
@ -65,12 +68,13 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
/**
* Calling the
* {@link AutodetectProcessManager#processData(String, InputStream, XContentType, DataLoadParams)}
* {@link AutodetectProcessManager#processData(String, InputStream, XContentType, DataLoadParams, BiConsumer)}
* method causes an AutodetectCommunicator to be created on demand. Most of
* these tests have to do that before they can assert other things
*/
@ -127,8 +131,12 @@ public class AutodetectProcessManagerTests extends ESTestCase {
Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
ThreadPool.Cancellable cancellable = mock(ThreadPool.Cancellable.class);
when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(cancellable);
ExecutorService executorService = mock(ExecutorService.class);
Future future = mock(Future.class);
when(executorService.submit(any(Callable.class))).thenReturn(future);
when(threadPool.executor(anyString())).thenReturn(EsExecutors.newDirectExecutorService());
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
when(autodetectProcess.isProcessAlive()).thenReturn(true);
@ -140,6 +148,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList())));
doReturn(executorService).when(manager).createAutodetectExecutorService(any());
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
@ -174,7 +183,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty());
manager.openJob("foo", jobTask, false, e -> {});
manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()),
params);
params, (dataCounts1, e) -> {});
assertEquals(1, manager.numberOfOpenJobs());
}
@ -185,13 +194,18 @@ public class AutodetectProcessManagerTests extends ESTestCase {
DataLoadParams params = mock(DataLoadParams.class);
InputStream inputStream = createInputStream("");
XContentType xContentType = randomFrom(XContentType.values());
doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream,
xContentType, params);
doAnswer(invocationOnMock -> {
BiConsumer<DataCounts, Exception> handler = (BiConsumer<DataCounts, Exception>) invocationOnMock.getArguments()[3];
handler.accept(null, new IOException("blah"));
return null;
}).when(communicator).writeToJob(eq(inputStream), same(xContentType), eq(params), any());
JobTask jobTask = mock(JobTask.class);
manager.openJob("foo", jobTask, false, e -> {});
ESTestCase.expectThrows(ElasticsearchException.class,
() -> manager.processData("foo", inputStream, xContentType, params));
Exception[] holder = new Exception[1];
manager.processData("foo", inputStream, xContentType, params, (dataCounts1, e) -> holder[0] = e);
assertNotNull(holder[0]);
}
public void testCloseJob() {
@ -202,7 +216,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
manager.openJob("foo", jobTask, false, e -> {});
manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class));
mock(DataLoadParams.class), (dataCounts1, e) -> {});
// job is created
assertEquals(1, manager.numberOfOpenJobs());
@ -219,8 +233,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
InputStream inputStream = createInputStream("");
JobTask jobTask = mock(JobTask.class);
manager.openJob("foo", jobTask, false, e -> {});
manager.processData("foo", inputStream, xContentType, params);
verify(communicator).writeToJob(inputStream, xContentType, params);
manager.processData("foo", inputStream, xContentType, params, (dataCounts1, e) -> {});
verify(communicator).writeToJob(same(inputStream), same(xContentType), same(params), any());
}
public void testFlush() throws IOException {
@ -231,12 +245,12 @@ public class AutodetectProcessManagerTests extends ESTestCase {
InputStream inputStream = createInputStream("");
manager.openJob("foo", jobTask, false, e -> {});
manager.processData("foo", inputStream, randomFrom(XContentType.values()),
mock(DataLoadParams.class));
mock(DataLoadParams.class), (dataCounts1, e) -> {});
InterimResultsParams params = InterimResultsParams.builder().build();
manager.flushJob("foo", params);
manager.flushJob("foo", params, e -> {});
verify(communicator).flushJob(params);
verify(communicator).flushJob(same(params), any());
}
public void testFlushThrows() throws IOException {
@ -244,10 +258,15 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo");
InterimResultsParams params = InterimResultsParams.builder().build();
doThrow(new IOException("blah")).when(communicator).flushJob(params);
doAnswer(invocationOnMock -> {
BiConsumer<Void, Exception> handler = (BiConsumer<Void, Exception>) invocationOnMock.getArguments()[1];
handler.accept(null, new IOException("blah"));
return null;
}).when(communicator).flushJob(same(params), any());
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> manager.flushJob("foo", params));
assertEquals("[foo] exception while flushing job", e.getMessage());
Exception[] holder = new Exception[1];
manager.flushJob("foo", params, e -> holder[0] = e);
assertEquals("[foo] exception while flushing job", holder[0].getMessage());
}
public void testwriteUpdateProcessMessage() throws IOException {
@ -256,9 +275,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
ModelPlotConfig modelConfig = mock(ModelPlotConfig.class);
List<DetectionRule> rules = Collections.singletonList(mock(DetectionRule.class));
List<JobUpdate.DetectorUpdate> detectorUpdates = Collections.singletonList(new JobUpdate.DetectorUpdate(2, null, rules));
manager.writeUpdateProcessMessage("foo", detectorUpdates, modelConfig);
verify(communicator).writeUpdateModelPlotMessage(modelConfig);
verify(communicator).writeUpdateDetectorRulesMessage(2, rules);
manager.writeUpdateProcessMessage("foo", detectorUpdates, modelConfig, e -> {});
verify(communicator).writeUpdateProcessMessage(same(modelConfig), same(detectorUpdates), any());
}
public void testJobHasActiveAutodetectProcess() throws IOException {
@ -269,7 +287,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
manager.openJob("foo", jobTask, false, e -> {});
manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class));
mock(DataLoadParams.class), (dataCounts1, e) -> {});
assertTrue(manager.jobHasActiveAutodetectProcess("foo"));
assertFalse(manager.jobHasActiveAutodetectProcess("bar"));
@ -277,16 +295,24 @@ public class AutodetectProcessManagerTests extends ESTestCase {
public void testProcessData_GivenStateNotOpened() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
when(communicator.writeToJob(any(), any(), any())).thenReturn(new DataCounts("foo"));
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
BiConsumer<DataCounts, Exception> handler = (BiConsumer<DataCounts, Exception>) invocationOnMock.getArguments()[3];
handler.accept(new DataCounts("foo"), null);
return null;
}).when(communicator).writeToJob(any(), any(), any(), any());
AutodetectProcessManager manager = createManager(communicator);
JobTask jobTask = mock(JobTask.class);
manager.openJob("foo", jobTask, false, e -> {});
InputStream inputStream = createInputStream("");
DataCounts dataCounts = manager.processData("foo", inputStream,
randomFrom(XContentType.values()), mock(DataLoadParams.class));
DataCounts[] dataCounts = new DataCounts[1];
manager.processData("foo", inputStream,
randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {
dataCounts[0] = dataCounts1;
});
assertThat(dataCounts, equalTo(new DataCounts("foo")));
assertThat(dataCounts[0], equalTo(new DataCounts("foo")));
}
public void testCreate_notEnoughThreads() throws IOException {
@ -342,7 +368,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
manager.openJob(jobId, jobTask, false, e -> {});
manager.processData(jobId, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class));
mock(DataLoadParams.class), (dataCounts, e) -> {});
return manager;
}