From aba4760b02b6088091d81ba3691435cb7f5c161c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 16 Mar 2017 11:36:17 +0100 Subject: [PATCH] [ML] Fix native process threading protection and restructure ml threadpools The native process can only handle one operation at a time, so in order the protect against multiple operation at a time (e.g. post data and flush or multiple post data operations) there should be protection in place to guarantee that at most only a single thread interacts with the native process. The current protection is broken when a job close is executed, more specifically the wait logic is broken here. This commit changes the threading logic when interacting with the native process by using a custom `ExecutorService` that that uses a single worker thread from `ml_autodetect_process` thread pool to interact with the native process. Requests from the ml apis are initially being queued and this worker thread executes these requests one by one in the order they were specified. Removed the general `ml` threadpool and replaced its usages with `ml_autodetect_process` or `management` threadpool. Added a new threadpool just for (re)normalizer, so that these operations are isolated from other operations. relates elastic/x-pack-elasticsearch#582 Original commit: elastic/x-pack-elasticsearch@ff0c8dce0b60b8dbbbf7cd16ec4e0b1f39a223da --- .../xpack/ml/MachineLearning.java | 25 ++-- .../ml/action/DeleteExpiredDataAction.java | 3 +- .../xpack/ml/action/FlushJobAction.java | 13 +- .../xpack/ml/action/PostDataAction.java | 15 +- .../xpack/ml/action/StopDatafeedAction.java | 3 +- .../xpack/ml/action/UpdateProcessAction.java | 25 ++-- .../xpack/ml/datafeed/DatafeedJobRunner.java | 4 +- .../autodetect/AutodetectCommunicator.java | 115 +++++++------- .../autodetect/AutodetectProcessManager.java | 140 +++++++++++++++--- .../ml/datafeed/DatafeedJobRunnerTests.java | 14 +- .../AutodetectCommunicatorTests.java | 116 ++------------- .../AutodetectProcessManagerTests.java | 76 ++++++---- 12 files changed, 296 insertions(+), 253 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index ec9833c80d5..22fc52649f8 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -148,9 +148,9 @@ import static java.util.Collections.emptyList; public class MachineLearning implements ActionPlugin { public static final String NAME = "ml"; public static final String BASE_PATH = "/_xpack/ml/"; - public static final String THREAD_POOL_NAME = NAME; - public static final String DATAFEED_RUNNER_THREAD_POOL_NAME = NAME + "_datafeed_runner"; - public static final String AUTODETECT_PROCESS_THREAD_POOL_NAME = NAME + "_autodetect_process"; + public static final String DATAFEED_THREAD_POOL_NAME = NAME + "_datafeed"; + public static final String AUTODETECT_THREAD_POOL_NAME = NAME + "_autodetect"; + public static final String NORMALIZER_THREAD_POOL_NAME = NAME + "_normalizer"; public static final Setting AUTODETECT_PROCESS = Setting.boolSetting("xpack.ml.autodetect_process", true, Property.NodeScope); @@ -294,7 +294,7 @@ public class MachineLearning implements ActionPlugin { executorService) -> new MultiplyingNormalizerProcess(settings, 1.0); } NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, - threadPool.executor(MachineLearning.THREAD_POOL_NAME)); + threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME)); AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, xContentRegistry); @@ -438,17 +438,20 @@ public class MachineLearning implements ActionPlugin { return emptyList(); } int maxNumberOfJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings); - FixedExecutorBuilder ml = new FixedExecutorBuilder(settings, THREAD_POOL_NAME, - maxNumberOfJobs * 2, 1000, "xpack.ml.thread_pool"); + // 4 threads: for cpp logging, result processing, state processing and + // AutodetectProcessManager worker thread: + FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_THREAD_POOL_NAME, + maxNumberOfJobs * 4, 4, "xpack.ml.autodetect_thread_pool"); - // 3 threads: for c++ logging, result processing, state processing - FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_PROCESS_THREAD_POOL_NAME, - maxNumberOfJobs * 3, 200, "xpack.ml.autodetect_process_thread_pool"); + // 3 threads: normalization (cpp logging, result handling) and + // renormalization (ShortCircuitingRenormalizer): + FixedExecutorBuilder renormalizer = new FixedExecutorBuilder(settings, NORMALIZER_THREAD_POOL_NAME, + maxNumberOfJobs * 3, 200, "xpack.ml.normalizer_thread_pool"); // TODO: if datafeed and non datafeed jobs are considered more equal and the datafeed and // autodetect process are created at the same time then these two different TPs can merge. - FixedExecutorBuilder datafeed = new FixedExecutorBuilder(settings, DATAFEED_RUNNER_THREAD_POOL_NAME, + FixedExecutorBuilder datafeed = new FixedExecutorBuilder(settings, DATAFEED_THREAD_POOL_NAME, maxNumberOfJobs, 200, "xpack.ml.datafeed_thread_pool"); - return Arrays.asList(ml, autoDetect, datafeed); + return Arrays.asList(autoDetect, renormalizer, datafeed); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java index 40d13541313..d64df224043 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover; import org.elasticsearch.xpack.ml.notifications.Auditor; @@ -134,7 +133,7 @@ public class DeleteExpiredDataAction extends Action listener) { logger.info("Deleting expired data"); - threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener)); + threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> deleteExpiredData(listener)); } private void deleteExpiredData(ActionListener listener) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java index 7410e486047..0cdfb6db844 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; @@ -244,7 +243,8 @@ public class FlushJobAction extends Action { + if (e == null) { + listener.onResponse(new Response(true)); + } else { + listener.onFailure(e); + } + }); } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java index c9e3f037ef7..9f0300204b9 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -241,7 +240,8 @@ public class PostDataAction extends Action { + if (dataCounts != null) { + listener.onResponse(new Response(dataCounts)); + } else { + listener.onFailure(e); + } + }); } catch (Exception e) { listener.onFailure(e); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index 578fab67a03..4f01cc08a1e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; @@ -225,7 +224,7 @@ public class StopDatafeedAction ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, InternalClient client) { super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters, - indexNameExpressionResolver, Request::new, Response::new, MachineLearning.THREAD_POOL_NAME); + indexNameExpressionResolver, Request::new, Response::new, ThreadPool.Names.MANAGEMENT); this.client = client; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java index c06be6680a1..25c72d3a283 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -187,7 +186,8 @@ public class UpdateProcessAction extends ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, AutodetectProcessManager processManager) { super(settings, NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, - Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, processManager); + Request::new, Response::new, ThreadPool.Names.SAME, processManager); + // ThreadPool.Names.SAME, because operations is executed by autodetect worker thread } @Override @@ -199,15 +199,18 @@ public class UpdateProcessAction extends @Override protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener, ClusterState state) { - threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> { - try { - processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(), - request.getModelPlotConfig()); - listener.onResponse(new Response()); - } catch (Exception e) { - listener.onFailure(e); - } - }); + try { + processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(), + request.getModelPlotConfig(), e -> { + if (e == null) { + listener.onResponse(new Response()); + } else { + listener.onFailure(e); + } + }); + } catch (Exception e) { + listener.onFailure(e); + } } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index def8726588a..584224459cc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -129,7 +129,7 @@ public class DatafeedJobRunner extends AbstractComponent { logger.info("Starting datafeed [{}] for job [{}] in [{}, {})", holder.datafeed.getId(), holder.datafeed.getJobId(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(startTime), endTime == null ? INF_SYMBOL : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(endTime)); - holder.future = threadPool.executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME).submit(new AbstractRunnable() { + holder.future = threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME).submit(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -181,7 +181,7 @@ public class DatafeedJobRunner extends AbstractComponent { if (holder.isRunning()) { TimeValue delay = computeNextDelay(delayInMsSinceEpoch); logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId); - holder.future = threadPool.schedule(delay, MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME, new AbstractRunnable() { + holder.future = threadPool.schedule(delay, MachineLearning.DATAFEED_THREAD_POOL_NAME, new AbstractRunnable() { @Override public void onFailure(Exception e) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index e44b8b56e41..24048d08a0d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -8,17 +8,16 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.ml.job.config.DataDescription; -import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; -import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.process.CountingInputStream; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; @@ -28,7 +27,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory; -import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.Closeable; import java.io.IOException; @@ -37,10 +35,11 @@ import java.time.Duration; import java.time.ZonedDateTime; import java.util.List; import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.function.Supplier; public class AutodetectCommunicator implements Closeable { @@ -52,19 +51,20 @@ public class AutodetectCommunicator implements Closeable { private final AutodetectProcess autodetectProcess; private final AutoDetectResultProcessor autoDetectResultProcessor; private final Consumer handler; - - final AtomicReference inUse = new AtomicReference<>(); + private final ExecutorService autodetectWorkerExecutor; private final NamedXContentRegistry xContentRegistry; - AutodetectCommunicator(Job job, AutodetectProcess process, DataCountsReporter dataCountsReporter, - AutoDetectResultProcessor autoDetectResultProcessor, Consumer handler, - NamedXContentRegistry xContentRegistry) { + AutodetectCommunicator(Job job, AutodetectProcess process, + DataCountsReporter dataCountsReporter, + AutoDetectResultProcessor autoDetectResultProcessor, Consumer handler, + NamedXContentRegistry xContentRegistry, ExecutorService autodetectWorkerExecutor) { this.job = job; this.autodetectProcess = process; this.dataCountsReporter = dataCountsReporter; this.autoDetectResultProcessor = autoDetectResultProcessor; this.handler = handler; this.xContentRegistry = xContentRegistry; + this.autodetectWorkerExecutor = autodetectWorkerExecutor; } public void writeJobInputHeader() throws IOException { @@ -77,9 +77,9 @@ public class AutodetectCommunicator implements Closeable { dataCountsReporter, xContentRegistry); } - public DataCounts writeToJob(InputStream inputStream, XContentType xContentType, - DataLoadParams params) throws IOException { - return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, job.getId()), () -> { + public void writeToJob(InputStream inputStream, XContentType xContentType, + DataLoadParams params, BiConsumer handler) throws IOException { + submitOperation(() -> { if (params.isResettingBuckets()) { autodetectProcess.writeResetBucketsControlMessage(params); } @@ -89,7 +89,7 @@ public class AutodetectCommunicator implements Closeable { DataCounts results = autoDetectWriter.write(countingStream, xContentType); autoDetectWriter.flush(); return results; - }, false); + }, handler); } @Override @@ -104,38 +104,49 @@ public class AutodetectCommunicator implements Closeable { * @param reason The reason for closing the job */ public void close(boolean restart, String reason) throws IOException { - checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_CLOSE, job.getId()), () -> { - LOGGER.info("[{}] job closing, reason [{}]", job.getId(), reason); + Future future = autodetectWorkerExecutor.submit(() -> { + checkProcessIsAlive(); dataCountsReporter.close(); autodetectProcess.close(); autoDetectResultProcessor.awaitCompletion(); handler.accept(restart ? new ElasticsearchException(reason) : null); LOGGER.info("[{}] job closed", job.getId()); return null; - }, true); + }); + try { + future.get(); + autodetectWorkerExecutor.shutdown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + throw ExceptionsHelper.convertToElastic(e); + } } - public void writeUpdateModelPlotMessage(ModelPlotConfig config) throws IOException { - checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> { - autodetectProcess.writeUpdateModelPlotMessage(config); + public void writeUpdateProcessMessage(ModelPlotConfig config, List updates, + BiConsumer handler) throws IOException { + submitOperation(() -> { + if (config != null) { + autodetectProcess.writeUpdateModelPlotMessage(config); + } + if (updates != null) { + for (JobUpdate.DetectorUpdate update : updates) { + if (update.getRules() != null) { + autodetectProcess.writeUpdateDetectorRulesMessage(update.getIndex(), update.getRules()); + } + } + } return null; - }, false); + }, handler); } - public void writeUpdateDetectorRulesMessage(int detectorIndex, List rules) throws IOException { - checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> { - autodetectProcess.writeUpdateDetectorRulesMessage(detectorIndex, rules); - return null; - }, false); - } - - public void flushJob(InterimResultsParams params) throws IOException { - checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_FLUSH, job.getId()), () -> { + public void flushJob(InterimResultsParams params, BiConsumer handler) throws IOException { + submitOperation(() -> { String flushId = autodetectProcess.flushJob(params); waitFlushToCompletion(flushId); return null; - }, false); + }, handler); } private void waitFlushToCompletion(String flushId) throws IOException { @@ -166,7 +177,7 @@ public class AutodetectCommunicator implements Closeable { ParameterizedMessage message = new ParameterizedMessage("[{}] Unexpected death of autodetect: {}", job.getId(), autodetectProcess.readError()); LOGGER.error(message); - throw ExceptionsHelper.serverError(message.getFormattedMessage()); + throw new ElasticsearchException(message.getFormattedMessage()); } } @@ -182,33 +193,19 @@ public class AutodetectCommunicator implements Closeable { return dataCountsReporter.runningTotalStats(); } - private T checkAndRun(Supplier errorMessage, CheckedSupplier callback, boolean wait) throws IOException { - CountDownLatch latch = new CountDownLatch(1); - if (inUse.compareAndSet(null, latch)) { - try { - checkProcessIsAlive(); - return callback.get(); - } finally { - latch.countDown(); - inUse.set(null); + private void submitOperation(CheckedSupplier operation, BiConsumer handler) throws IOException { + autodetectWorkerExecutor.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + handler.accept(null, e); } - } else { - if (wait) { - latch = inUse.get(); - if (latch != null) { - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS); - } - } + + @Override + protected void doRun() throws Exception { checkProcessIsAlive(); - return callback.get(); - } else { - throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS); + handler.accept(operation.get(), null); } - } + }); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 931f0dde452..dd92c049493 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; @@ -52,10 +53,17 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; import java.util.function.Consumer; public class AutodetectProcessManager extends AbstractComponent { @@ -133,16 +141,16 @@ public class AutodetectProcessManager extends AbstractComponent { * @param input Data input stream * @param xContentType the {@link XContentType} of the input * @param params Data processing parameters - * @return Count of records, fields, bytes, etc written + * @param handler Delegate error or datacount results (Count of records, fields, bytes, etc written) */ - public DataCounts processData(String jobId, InputStream input, XContentType xContentType, - DataLoadParams params) { + public void processData(String jobId, InputStream input, XContentType xContentType, + DataLoadParams params, BiConsumer handler) { AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { throw new IllegalStateException("[" + jobId + "] Cannot process data: no active autodetect process for job"); } try { - return communicator.writeToJob(input, xContentType, params); + communicator.writeToJob(input, xContentType, params, handler); // TODO check for errors from autodetect } catch (IOException e) { String msg = String.format(Locale.ROOT, "Exception writing to process for job %s", jobId); @@ -165,7 +173,7 @@ public class AutodetectProcessManager extends AbstractComponent { * @param params Parameters about whether interim results calculation * should occur and for which period of time */ - public void flushJob(String jobId, InterimResultsParams params) { + public void flushJob(String jobId, InterimResultsParams params, Consumer handler) { logger.debug("Flushing job {}", jobId); AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { @@ -174,7 +182,15 @@ public class AutodetectProcessManager extends AbstractComponent { throw new IllegalArgumentException(message); } try { - communicator.flushJob(params); + communicator.flushJob(params, (aVoid, e) -> { + if (e == null) { + handler.accept(null); + } else { + String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobId); + logger.error(msg); + handler.accept(ExceptionsHelper.serverError(msg, e)); + } + }); // TODO check for errors from autodetect } catch (IOException ioe) { String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobId); @@ -183,25 +199,21 @@ public class AutodetectProcessManager extends AbstractComponent { } } - public void writeUpdateProcessMessage(String jobId, List updates, ModelPlotConfig config) - throws IOException { + public void writeUpdateProcessMessage(String jobId, List updates, ModelPlotConfig config, + Consumer handler) throws IOException { AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId); + handler.accept(null); return; } - - if (config != null) { - communicator.writeUpdateModelPlotMessage(config); - } - - if (updates != null) { - for (JobUpdate.DetectorUpdate update : updates) { - if (update.getRules() != null) { - communicator.writeUpdateDetectorRulesMessage(update.getIndex(), update.getRules()); - } + communicator.writeUpdateProcessMessage(config, updates, (aVoid, e) -> { + if (e == null) { + handler.accept(null); + } else { + handler.accept(e); } - } + }); // TODO check for errors from autodetects } @@ -257,22 +269,25 @@ public class AutodetectProcessManager extends AbstractComponent { Job job = jobManager.getJobOrThrowIfUnknown(jobId); // A TP with no queue, so that we fail immediately if there are no threads available - ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_PROCESS_THREAD_POOL_NAME); + ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME); try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, autodetectParams.dataCounts(), jobDataCountsPersister)) { ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), normalizerFactory); + ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME); Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, - threadPool.executor(MachineLearning.THREAD_POOL_NAME), job.getAnalysisConfig().getUsePerPartitionNormalization()); + renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization()); AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(), autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime, - executorService, () -> setJobState(jobTask, JobState.FAILED)); + autoDetectExecutorService, () -> setJobState(jobTask, JobState.FAILED)); boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization(); AutoDetectResultProcessor processor = new AutoDetectResultProcessor( client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats()); + ExecutorService autodetectWorkerExecutor; try { - executorService.submit(() -> processor.process(process, usePerPartitionNormalization)); + autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService); + autoDetectExecutorService.submit(() -> processor.process(process, usePerPartitionNormalization)); } catch (EsRejectedExecutionException e) { // If submitting the operation to read the results from the process fails we need to close // the process too, so that other submitted operations to threadpool are stopped. @@ -284,7 +299,7 @@ public class AutodetectProcessManager extends AbstractComponent { throw e; } return new AutodetectCommunicator(job, process, dataCountsReporter, processor, - handler, xContentRegistry); + handler, xContentRegistry, autodetectWorkerExecutor); } } @@ -376,4 +391,81 @@ public class AutodetectProcessManager extends AbstractComponent { } return Optional.of(new Tuple<>(communicator.getDataCounts(), communicator.getModelSizeStats())); } + + ExecutorService createAutodetectExecutorService(ExecutorService executorService) { + AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext()); + executorService.submit(autoDetectWorkerExecutor::start); + return autoDetectWorkerExecutor; + } + + /* + * The autodetect native process can only handle a single operation at a time. In order to guarantee that, all + * operations are initially added to a queue and a worker thread from ml autodetect threadpool will process each + * operation at a time. + */ + class AutodetectWorkerExecutorService extends AbstractExecutorService { + + private final ThreadContext contextHolder; + private final CountDownLatch awaitTermination = new CountDownLatch(1); + private final BlockingQueue queue = new LinkedBlockingQueue<>(100); + + private volatile boolean running = true; + + AutodetectWorkerExecutorService(ThreadContext contextHolder) { + this.contextHolder = contextHolder; + } + + @Override + public void shutdown() { + running = false; + } + + @Override + public List shutdownNow() { + throw new UnsupportedOperationException("not supported"); + } + + @Override + public boolean isShutdown() { + return running == false; + } + + @Override + public boolean isTerminated() { + return awaitTermination.getCount() == 0; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return awaitTermination.await(timeout, unit); + } + + @Override + public void execute(Runnable command) { + boolean added = queue.offer(contextHolder.preserveContext(command)); + if (added == false) { + throw new ElasticsearchStatusException("Unable to submit operation", RestStatus.TOO_MANY_REQUESTS); + } + } + + void start() { + try { + while (running) { + Runnable runnable = queue.poll(500, TimeUnit.MILLISECONDS); + if (runnable != null) { + try { + runnable.run(); + } catch (Exception e) { + logger.error("error handeling job operation", e); + } + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + awaitTermination.countDown(); + } + } + + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java index b965621fba8..8e7b1de8ce3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java @@ -137,7 +137,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { ((Runnable) invocation.getArguments()[0]).run(); return null; }).when(executorService).submit(any(Runnable.class)); - when(threadPool.executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME)).thenReturn(executorService); + when(threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME)).thenReturn(executorService); when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); when(client.execute(same(PostDataAction.INSTANCE), any())).thenReturn(jobDataFuture); when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); @@ -166,7 +166,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); datafeedJobRunner.run(task, handler); - verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); + verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); verify(client, never()).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo"))); verify(client, never()).execute(same(FlushJobAction.INSTANCE), any()); @@ -188,7 +188,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); datafeedJobRunner.run(task, handler); - verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); + verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest("job_id", contentBytes, xContentType))); @@ -218,7 +218,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); datafeedJobRunner.run(task, handler); - verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); + verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); verify(client, never()).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo"))); verify(client, never()).execute(same(FlushJobAction.INSTANCE), any()); @@ -254,7 +254,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { DatafeedJobRunner.Holder holder = datafeedJobRunner.createJobDatafeed(datafeedConfig, job, 100, 100, handler, task); datafeedJobRunner.doDatafeedRealtime(10L, "foo", holder); - verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME), any()); + verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); verify(auditor, times(1)).warning(eq("job_id"), anyString()); verify(client, never()).execute(same(PostDataAction.INSTANCE), any()); verify(client, never()).execute(same(FlushJobAction.INSTANCE), any()); @@ -279,7 +279,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { task = spyDatafeedTask(task); datafeedJobRunner.run(task, handler); - verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); + verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); if (cancelled) { task.stop("test"); verify(handler).accept(null); @@ -287,7 +287,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest("job_id", contentBytes, xContentType))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); - verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME), any()); + verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 8596ca0a72c..eb4ed73e112 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -6,16 +6,13 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; -import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -25,18 +22,16 @@ import org.mockito.Mockito; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; import java.time.Duration; import java.util.Collections; import java.util.Date; -import java.util.List; import java.util.Optional; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -51,30 +46,11 @@ public class AutodetectCommunicatorTests extends ESTestCase { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) { communicator.writeToJob(new ByteArrayInputStream(new byte[0]), - randomFrom(XContentType.values()), params); + randomFrom(XContentType.values()), params, (dataCounts, e) -> {}); Mockito.verify(process).writeResetBucketsControlMessage(params); } } - public void tesWriteUpdateModelPlotMessage() throws IOException { - AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) { - ModelPlotConfig config = new ModelPlotConfig(); - communicator.writeUpdateModelPlotMessage(config); - Mockito.verify(process).writeUpdateModelPlotMessage(config); - } - } - - public void testWriteUpdateDetectorRulesMessage() throws IOException { - AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) { - - List rules = Collections.singletonList(mock(DetectionRule.class)); - communicator.writeUpdateDetectorRulesMessage(1, rules); - Mockito.verify(process).writeUpdateDetectorRulesMessage(1, rules); - } - } - public void testFlushJob() throws IOException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); @@ -82,7 +58,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(true); try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor)) { InterimResultsParams params = InterimResultsParams.builder().build(); - communicator.flushJob(params); + communicator.flushJob(params, (aVoid, e) -> {}); Mockito.verify(process).flushJob(params); } } @@ -93,8 +69,9 @@ public class AutodetectCommunicatorTests extends ESTestCase { when(process.readError()).thenReturn("Mock process is dead"); AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); InterimResultsParams params = InterimResultsParams.builder().build(); - ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> communicator.flushJob(params)); - assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", e.getMessage()); + Exception[] holder = new ElasticsearchException[1]; + communicator.flushJob(params, (aVoid, e1) -> holder[0] = e1); + assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", holder[0].getMessage()); } public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException { @@ -106,7 +83,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { InterimResultsParams params = InterimResultsParams.builder().build(); try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autoDetectResultProcessor)) { - communicator.flushJob(params); + communicator.flushJob(params, (aVoid, e) -> {}); } verify(autoDetectResultProcessor, times(2)).waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1))); @@ -146,6 +123,12 @@ public class AutodetectCommunicatorTests extends ESTestCase { private AutodetectCommunicator createAutodetectCommunicator(AutodetectProcess autodetectProcess, AutoDetectResultProcessor autoDetectResultProcessor) throws IOException { ExecutorService executorService = mock(ExecutorService.class); + when(executorService.submit(any(Callable.class))).thenReturn(mock(Future.class)); + doAnswer(invocationOnMock -> { + Callable runnable = (Callable) invocationOnMock.getArguments()[0]; + runnable.call(); + return mock(Future.class); + }).when(executorService).submit(any(Callable.class)); doAnswer(invocation -> { ((Runnable) invocation.getArguments()[0]).run(); return null; @@ -153,76 +136,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class); return new AutodetectCommunicator(createJobDetails(), autodetectProcess, dataCountsReporter, autoDetectResultProcessor, e -> { - }, new NamedXContentRegistry(Collections.emptyList())); + }, new NamedXContentRegistry(Collections.emptyList()), executorService); } - public void testWriteToJobInUse() throws IOException { - InputStream in = mock(InputStream.class); - when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1); - AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); - XContentType xContentType = randomFrom(XContentType.values()); - - communicator.inUse.set(new CountDownLatch(1)); - expectThrows(ElasticsearchStatusException.class, - () -> communicator.writeToJob(in, xContentType, mock(DataLoadParams.class))); - - communicator.inUse.set(null); - communicator.writeToJob(in, xContentType, - new DataLoadParams(TimeRange.builder().build(), Optional.empty())); - } - - public void testFlushInUse() throws IOException { - AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); - when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - - communicator.inUse.set(new CountDownLatch(1)); - InterimResultsParams params = mock(InterimResultsParams.class); - expectThrows(ElasticsearchStatusException.class, () -> communicator.flushJob(params)); - - communicator.inUse.set(null); - communicator.flushJob(params); - } - - public void testCloseInUse() throws Exception { - AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); - when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - - CountDownLatch latch = mock(CountDownLatch.class); - communicator.inUse.set(latch); - communicator.close(); - verify(latch, times(1)).await(); - - communicator.inUse.set(null); - communicator.close(); - } - - public void testWriteUpdateModelPlotConfigMessageInUse() throws Exception { - AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - - communicator.inUse.set(new CountDownLatch(1)); - expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateModelPlotMessage(mock(ModelPlotConfig.class))); - - communicator.inUse.set(null); - communicator.writeUpdateModelPlotMessage(mock(ModelPlotConfig.class)); - } - - public void testWriteUpdateDetectorRulesMessageInUse() throws Exception { - AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - - List rules = Collections.singletonList(mock(DetectionRule.class)); - communicator.inUse.set(new CountDownLatch(1)); - expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateDetectorRulesMessage(0, rules)); - - communicator.inUse.set(null); - communicator.writeUpdateDetectorRulesMessage(0, rules); - } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index e4c6a493db0..257b02c3053 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -5,12 +5,12 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.client.Client; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; @@ -51,7 +51,10 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; import java.util.function.Consumer; import static org.elasticsearch.mock.orig.Mockito.doAnswer; @@ -65,12 +68,13 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; /** * Calling the - * {@link AutodetectProcessManager#processData(String, InputStream, XContentType, DataLoadParams)} + * {@link AutodetectProcessManager#processData(String, InputStream, XContentType, DataLoadParams, BiConsumer)} * method causes an AutodetectCommunicator to be created on demand. Most of * these tests have to do that before they can assert other things */ @@ -127,8 +131,12 @@ public class AutodetectProcessManagerTests extends ESTestCase { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); ThreadPool.Cancellable cancellable = mock(ThreadPool.Cancellable.class); when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(cancellable); + ExecutorService executorService = mock(ExecutorService.class); + Future future = mock(Future.class); + when(executorService.submit(any(Callable.class))).thenReturn(future); when(threadPool.executor(anyString())).thenReturn(EsExecutors.newDirectExecutorService()); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); when(autodetectProcess.isProcessAlive()).thenReturn(true); @@ -140,6 +148,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, new NamedXContentRegistry(Collections.emptyList()))); + doReturn(executorService).when(manager).createAutodetectExecutorService(any()); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") @@ -174,7 +183,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty()); manager.openJob("foo", jobTask, false, e -> {}); manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()), - params); + params, (dataCounts1, e) -> {}); assertEquals(1, manager.numberOfOpenJobs()); } @@ -185,13 +194,18 @@ public class AutodetectProcessManagerTests extends ESTestCase { DataLoadParams params = mock(DataLoadParams.class); InputStream inputStream = createInputStream(""); XContentType xContentType = randomFrom(XContentType.values()); - doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream, - xContentType, params); + doAnswer(invocationOnMock -> { + BiConsumer handler = (BiConsumer) invocationOnMock.getArguments()[3]; + handler.accept(null, new IOException("blah")); + return null; + }).when(communicator).writeToJob(eq(inputStream), same(xContentType), eq(params), any()); + JobTask jobTask = mock(JobTask.class); manager.openJob("foo", jobTask, false, e -> {}); - ESTestCase.expectThrows(ElasticsearchException.class, - () -> manager.processData("foo", inputStream, xContentType, params)); + Exception[] holder = new Exception[1]; + manager.processData("foo", inputStream, xContentType, params, (dataCounts1, e) -> holder[0] = e); + assertNotNull(holder[0]); } public void testCloseJob() { @@ -202,7 +216,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); manager.openJob("foo", jobTask, false, e -> {}); manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()), - mock(DataLoadParams.class)); + mock(DataLoadParams.class), (dataCounts1, e) -> {}); // job is created assertEquals(1, manager.numberOfOpenJobs()); @@ -219,8 +233,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { InputStream inputStream = createInputStream(""); JobTask jobTask = mock(JobTask.class); manager.openJob("foo", jobTask, false, e -> {}); - manager.processData("foo", inputStream, xContentType, params); - verify(communicator).writeToJob(inputStream, xContentType, params); + manager.processData("foo", inputStream, xContentType, params, (dataCounts1, e) -> {}); + verify(communicator).writeToJob(same(inputStream), same(xContentType), same(params), any()); } public void testFlush() throws IOException { @@ -231,12 +245,12 @@ public class AutodetectProcessManagerTests extends ESTestCase { InputStream inputStream = createInputStream(""); manager.openJob("foo", jobTask, false, e -> {}); manager.processData("foo", inputStream, randomFrom(XContentType.values()), - mock(DataLoadParams.class)); + mock(DataLoadParams.class), (dataCounts1, e) -> {}); InterimResultsParams params = InterimResultsParams.builder().build(); - manager.flushJob("foo", params); + manager.flushJob("foo", params, e -> {}); - verify(communicator).flushJob(params); + verify(communicator).flushJob(same(params), any()); } public void testFlushThrows() throws IOException { @@ -244,10 +258,15 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo"); InterimResultsParams params = InterimResultsParams.builder().build(); - doThrow(new IOException("blah")).when(communicator).flushJob(params); + doAnswer(invocationOnMock -> { + BiConsumer handler = (BiConsumer) invocationOnMock.getArguments()[1]; + handler.accept(null, new IOException("blah")); + return null; + }).when(communicator).flushJob(same(params), any()); - ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> manager.flushJob("foo", params)); - assertEquals("[foo] exception while flushing job", e.getMessage()); + Exception[] holder = new Exception[1]; + manager.flushJob("foo", params, e -> holder[0] = e); + assertEquals("[foo] exception while flushing job", holder[0].getMessage()); } public void testwriteUpdateProcessMessage() throws IOException { @@ -256,9 +275,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { ModelPlotConfig modelConfig = mock(ModelPlotConfig.class); List rules = Collections.singletonList(mock(DetectionRule.class)); List detectorUpdates = Collections.singletonList(new JobUpdate.DetectorUpdate(2, null, rules)); - manager.writeUpdateProcessMessage("foo", detectorUpdates, modelConfig); - verify(communicator).writeUpdateModelPlotMessage(modelConfig); - verify(communicator).writeUpdateDetectorRulesMessage(2, rules); + manager.writeUpdateProcessMessage("foo", detectorUpdates, modelConfig, e -> {}); + verify(communicator).writeUpdateProcessMessage(same(modelConfig), same(detectorUpdates), any()); } public void testJobHasActiveAutodetectProcess() throws IOException { @@ -269,7 +287,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); manager.openJob("foo", jobTask, false, e -> {}); manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()), - mock(DataLoadParams.class)); + mock(DataLoadParams.class), (dataCounts1, e) -> {}); assertTrue(manager.jobHasActiveAutodetectProcess("foo")); assertFalse(manager.jobHasActiveAutodetectProcess("bar")); @@ -277,16 +295,24 @@ public class AutodetectProcessManagerTests extends ESTestCase { public void testProcessData_GivenStateNotOpened() throws IOException { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); - when(communicator.writeToJob(any(), any(), any())).thenReturn(new DataCounts("foo")); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) invocationOnMock.getArguments()[3]; + handler.accept(new DataCounts("foo"), null); + return null; + }).when(communicator).writeToJob(any(), any(), any(), any()); AutodetectProcessManager manager = createManager(communicator); JobTask jobTask = mock(JobTask.class); manager.openJob("foo", jobTask, false, e -> {}); InputStream inputStream = createInputStream(""); - DataCounts dataCounts = manager.processData("foo", inputStream, - randomFrom(XContentType.values()), mock(DataLoadParams.class)); + DataCounts[] dataCounts = new DataCounts[1]; + manager.processData("foo", inputStream, + randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> { + dataCounts[0] = dataCounts1; + }); - assertThat(dataCounts, equalTo(new DataCounts("foo"))); + assertThat(dataCounts[0], equalTo(new DataCounts("foo"))); } public void testCreate_notEnoughThreads() throws IOException { @@ -342,7 +368,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); manager.openJob(jobId, jobTask, false, e -> {}); manager.processData(jobId, createInputStream(""), randomFrom(XContentType.values()), - mock(DataLoadParams.class)); + mock(DataLoadParams.class), (dataCounts, e) -> {}); return manager; }