diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index ec9833c80d5..22fc52649f8 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -148,9 +148,9 @@ import static java.util.Collections.emptyList; public class MachineLearning implements ActionPlugin { public static final String NAME = "ml"; public static final String BASE_PATH = "/_xpack/ml/"; - public static final String THREAD_POOL_NAME = NAME; - public static final String DATAFEED_RUNNER_THREAD_POOL_NAME = NAME + "_datafeed_runner"; - public static final String AUTODETECT_PROCESS_THREAD_POOL_NAME = NAME + "_autodetect_process"; + public static final String DATAFEED_THREAD_POOL_NAME = NAME + "_datafeed"; + public static final String AUTODETECT_THREAD_POOL_NAME = NAME + "_autodetect"; + public static final String NORMALIZER_THREAD_POOL_NAME = NAME + "_normalizer"; public static final Setting AUTODETECT_PROCESS = Setting.boolSetting("xpack.ml.autodetect_process", true, Property.NodeScope); @@ -294,7 +294,7 @@ public class MachineLearning implements ActionPlugin { executorService) -> new MultiplyingNormalizerProcess(settings, 1.0); } NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, - threadPool.executor(MachineLearning.THREAD_POOL_NAME)); + threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME)); AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, xContentRegistry); @@ -438,17 +438,20 @@ public class MachineLearning implements ActionPlugin { return emptyList(); } int maxNumberOfJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings); - FixedExecutorBuilder ml = new FixedExecutorBuilder(settings, THREAD_POOL_NAME, - maxNumberOfJobs * 2, 1000, "xpack.ml.thread_pool"); + // 4 threads: for cpp logging, result processing, state processing and + // AutodetectProcessManager worker thread: + FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_THREAD_POOL_NAME, + maxNumberOfJobs * 4, 4, "xpack.ml.autodetect_thread_pool"); - // 3 threads: for c++ logging, result processing, state processing - FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_PROCESS_THREAD_POOL_NAME, - maxNumberOfJobs * 3, 200, "xpack.ml.autodetect_process_thread_pool"); + // 3 threads: normalization (cpp logging, result handling) and + // renormalization (ShortCircuitingRenormalizer): + FixedExecutorBuilder renormalizer = new FixedExecutorBuilder(settings, NORMALIZER_THREAD_POOL_NAME, + maxNumberOfJobs * 3, 200, "xpack.ml.normalizer_thread_pool"); // TODO: if datafeed and non datafeed jobs are considered more equal and the datafeed and // autodetect process are created at the same time then these two different TPs can merge. - FixedExecutorBuilder datafeed = new FixedExecutorBuilder(settings, DATAFEED_RUNNER_THREAD_POOL_NAME, + FixedExecutorBuilder datafeed = new FixedExecutorBuilder(settings, DATAFEED_THREAD_POOL_NAME, maxNumberOfJobs, 200, "xpack.ml.datafeed_thread_pool"); - return Arrays.asList(ml, autoDetect, datafeed); + return Arrays.asList(autoDetect, renormalizer, datafeed); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java index 40d13541313..d64df224043 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover; import org.elasticsearch.xpack.ml.notifications.Auditor; @@ -134,7 +133,7 @@ public class DeleteExpiredDataAction extends Action listener) { logger.info("Deleting expired data"); - threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener)); + threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> deleteExpiredData(listener)); } private void deleteExpiredData(ActionListener listener) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java index 7410e486047..0cdfb6db844 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; @@ -244,7 +243,8 @@ public class FlushJobAction extends Action { + if (e == null) { + listener.onResponse(new Response(true)); + } else { + listener.onFailure(e); + } + }); } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java index c9e3f037ef7..9f0300204b9 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -241,7 +240,8 @@ public class PostDataAction extends Action { + if (dataCounts != null) { + listener.onResponse(new Response(dataCounts)); + } else { + listener.onFailure(e); + } + }); } catch (Exception e) { listener.onFailure(e); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index 578fab67a03..4f01cc08a1e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; @@ -225,7 +224,7 @@ public class StopDatafeedAction ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, InternalClient client) { super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters, - indexNameExpressionResolver, Request::new, Response::new, MachineLearning.THREAD_POOL_NAME); + indexNameExpressionResolver, Request::new, Response::new, ThreadPool.Names.MANAGEMENT); this.client = client; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java index c06be6680a1..25c72d3a283 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -187,7 +186,8 @@ public class UpdateProcessAction extends ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, AutodetectProcessManager processManager) { super(settings, NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, - Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, processManager); + Request::new, Response::new, ThreadPool.Names.SAME, processManager); + // ThreadPool.Names.SAME, because operations is executed by autodetect worker thread } @Override @@ -199,15 +199,18 @@ public class UpdateProcessAction extends @Override protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener, ClusterState state) { - threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> { - try { - processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(), - request.getModelPlotConfig()); - listener.onResponse(new Response()); - } catch (Exception e) { - listener.onFailure(e); - } - }); + try { + processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(), + request.getModelPlotConfig(), e -> { + if (e == null) { + listener.onResponse(new Response()); + } else { + listener.onFailure(e); + } + }); + } catch (Exception e) { + listener.onFailure(e); + } } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index def8726588a..584224459cc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -129,7 +129,7 @@ public class DatafeedJobRunner extends AbstractComponent { logger.info("Starting datafeed [{}] for job [{}] in [{}, {})", holder.datafeed.getId(), holder.datafeed.getJobId(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(startTime), endTime == null ? INF_SYMBOL : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(endTime)); - holder.future = threadPool.executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME).submit(new AbstractRunnable() { + holder.future = threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME).submit(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -181,7 +181,7 @@ public class DatafeedJobRunner extends AbstractComponent { if (holder.isRunning()) { TimeValue delay = computeNextDelay(delayInMsSinceEpoch); logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId); - holder.future = threadPool.schedule(delay, MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME, new AbstractRunnable() { + holder.future = threadPool.schedule(delay, MachineLearning.DATAFEED_THREAD_POOL_NAME, new AbstractRunnable() { @Override public void onFailure(Exception e) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index e44b8b56e41..24048d08a0d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -8,17 +8,16 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.ml.job.config.DataDescription; -import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; -import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.process.CountingInputStream; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; @@ -28,7 +27,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory; -import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.Closeable; import java.io.IOException; @@ -37,10 +35,11 @@ import java.time.Duration; import java.time.ZonedDateTime; import java.util.List; import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.function.Supplier; public class AutodetectCommunicator implements Closeable { @@ -52,19 +51,20 @@ public class AutodetectCommunicator implements Closeable { private final AutodetectProcess autodetectProcess; private final AutoDetectResultProcessor autoDetectResultProcessor; private final Consumer handler; - - final AtomicReference inUse = new AtomicReference<>(); + private final ExecutorService autodetectWorkerExecutor; private final NamedXContentRegistry xContentRegistry; - AutodetectCommunicator(Job job, AutodetectProcess process, DataCountsReporter dataCountsReporter, - AutoDetectResultProcessor autoDetectResultProcessor, Consumer handler, - NamedXContentRegistry xContentRegistry) { + AutodetectCommunicator(Job job, AutodetectProcess process, + DataCountsReporter dataCountsReporter, + AutoDetectResultProcessor autoDetectResultProcessor, Consumer handler, + NamedXContentRegistry xContentRegistry, ExecutorService autodetectWorkerExecutor) { this.job = job; this.autodetectProcess = process; this.dataCountsReporter = dataCountsReporter; this.autoDetectResultProcessor = autoDetectResultProcessor; this.handler = handler; this.xContentRegistry = xContentRegistry; + this.autodetectWorkerExecutor = autodetectWorkerExecutor; } public void writeJobInputHeader() throws IOException { @@ -77,9 +77,9 @@ public class AutodetectCommunicator implements Closeable { dataCountsReporter, xContentRegistry); } - public DataCounts writeToJob(InputStream inputStream, XContentType xContentType, - DataLoadParams params) throws IOException { - return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, job.getId()), () -> { + public void writeToJob(InputStream inputStream, XContentType xContentType, + DataLoadParams params, BiConsumer handler) throws IOException { + submitOperation(() -> { if (params.isResettingBuckets()) { autodetectProcess.writeResetBucketsControlMessage(params); } @@ -89,7 +89,7 @@ public class AutodetectCommunicator implements Closeable { DataCounts results = autoDetectWriter.write(countingStream, xContentType); autoDetectWriter.flush(); return results; - }, false); + }, handler); } @Override @@ -104,38 +104,49 @@ public class AutodetectCommunicator implements Closeable { * @param reason The reason for closing the job */ public void close(boolean restart, String reason) throws IOException { - checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_CLOSE, job.getId()), () -> { - LOGGER.info("[{}] job closing, reason [{}]", job.getId(), reason); + Future future = autodetectWorkerExecutor.submit(() -> { + checkProcessIsAlive(); dataCountsReporter.close(); autodetectProcess.close(); autoDetectResultProcessor.awaitCompletion(); handler.accept(restart ? new ElasticsearchException(reason) : null); LOGGER.info("[{}] job closed", job.getId()); return null; - }, true); + }); + try { + future.get(); + autodetectWorkerExecutor.shutdown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + throw ExceptionsHelper.convertToElastic(e); + } } - public void writeUpdateModelPlotMessage(ModelPlotConfig config) throws IOException { - checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> { - autodetectProcess.writeUpdateModelPlotMessage(config); + public void writeUpdateProcessMessage(ModelPlotConfig config, List updates, + BiConsumer handler) throws IOException { + submitOperation(() -> { + if (config != null) { + autodetectProcess.writeUpdateModelPlotMessage(config); + } + if (updates != null) { + for (JobUpdate.DetectorUpdate update : updates) { + if (update.getRules() != null) { + autodetectProcess.writeUpdateDetectorRulesMessage(update.getIndex(), update.getRules()); + } + } + } return null; - }, false); + }, handler); } - public void writeUpdateDetectorRulesMessage(int detectorIndex, List rules) throws IOException { - checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> { - autodetectProcess.writeUpdateDetectorRulesMessage(detectorIndex, rules); - return null; - }, false); - } - - public void flushJob(InterimResultsParams params) throws IOException { - checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_FLUSH, job.getId()), () -> { + public void flushJob(InterimResultsParams params, BiConsumer handler) throws IOException { + submitOperation(() -> { String flushId = autodetectProcess.flushJob(params); waitFlushToCompletion(flushId); return null; - }, false); + }, handler); } private void waitFlushToCompletion(String flushId) throws IOException { @@ -166,7 +177,7 @@ public class AutodetectCommunicator implements Closeable { ParameterizedMessage message = new ParameterizedMessage("[{}] Unexpected death of autodetect: {}", job.getId(), autodetectProcess.readError()); LOGGER.error(message); - throw ExceptionsHelper.serverError(message.getFormattedMessage()); + throw new ElasticsearchException(message.getFormattedMessage()); } } @@ -182,33 +193,19 @@ public class AutodetectCommunicator implements Closeable { return dataCountsReporter.runningTotalStats(); } - private T checkAndRun(Supplier errorMessage, CheckedSupplier callback, boolean wait) throws IOException { - CountDownLatch latch = new CountDownLatch(1); - if (inUse.compareAndSet(null, latch)) { - try { - checkProcessIsAlive(); - return callback.get(); - } finally { - latch.countDown(); - inUse.set(null); + private void submitOperation(CheckedSupplier operation, BiConsumer handler) throws IOException { + autodetectWorkerExecutor.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + handler.accept(null, e); } - } else { - if (wait) { - latch = inUse.get(); - if (latch != null) { - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS); - } - } + + @Override + protected void doRun() throws Exception { checkProcessIsAlive(); - return callback.get(); - } else { - throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS); + handler.accept(operation.get(), null); } - } + }); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 931f0dde452..dd92c049493 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; @@ -52,10 +53,17 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; import java.util.function.Consumer; public class AutodetectProcessManager extends AbstractComponent { @@ -133,16 +141,16 @@ public class AutodetectProcessManager extends AbstractComponent { * @param input Data input stream * @param xContentType the {@link XContentType} of the input * @param params Data processing parameters - * @return Count of records, fields, bytes, etc written + * @param handler Delegate error or datacount results (Count of records, fields, bytes, etc written) */ - public DataCounts processData(String jobId, InputStream input, XContentType xContentType, - DataLoadParams params) { + public void processData(String jobId, InputStream input, XContentType xContentType, + DataLoadParams params, BiConsumer handler) { AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { throw new IllegalStateException("[" + jobId + "] Cannot process data: no active autodetect process for job"); } try { - return communicator.writeToJob(input, xContentType, params); + communicator.writeToJob(input, xContentType, params, handler); // TODO check for errors from autodetect } catch (IOException e) { String msg = String.format(Locale.ROOT, "Exception writing to process for job %s", jobId); @@ -165,7 +173,7 @@ public class AutodetectProcessManager extends AbstractComponent { * @param params Parameters about whether interim results calculation * should occur and for which period of time */ - public void flushJob(String jobId, InterimResultsParams params) { + public void flushJob(String jobId, InterimResultsParams params, Consumer handler) { logger.debug("Flushing job {}", jobId); AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { @@ -174,7 +182,15 @@ public class AutodetectProcessManager extends AbstractComponent { throw new IllegalArgumentException(message); } try { - communicator.flushJob(params); + communicator.flushJob(params, (aVoid, e) -> { + if (e == null) { + handler.accept(null); + } else { + String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobId); + logger.error(msg); + handler.accept(ExceptionsHelper.serverError(msg, e)); + } + }); // TODO check for errors from autodetect } catch (IOException ioe) { String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobId); @@ -183,25 +199,21 @@ public class AutodetectProcessManager extends AbstractComponent { } } - public void writeUpdateProcessMessage(String jobId, List updates, ModelPlotConfig config) - throws IOException { + public void writeUpdateProcessMessage(String jobId, List updates, ModelPlotConfig config, + Consumer handler) throws IOException { AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId); + handler.accept(null); return; } - - if (config != null) { - communicator.writeUpdateModelPlotMessage(config); - } - - if (updates != null) { - for (JobUpdate.DetectorUpdate update : updates) { - if (update.getRules() != null) { - communicator.writeUpdateDetectorRulesMessage(update.getIndex(), update.getRules()); - } + communicator.writeUpdateProcessMessage(config, updates, (aVoid, e) -> { + if (e == null) { + handler.accept(null); + } else { + handler.accept(e); } - } + }); // TODO check for errors from autodetects } @@ -257,22 +269,25 @@ public class AutodetectProcessManager extends AbstractComponent { Job job = jobManager.getJobOrThrowIfUnknown(jobId); // A TP with no queue, so that we fail immediately if there are no threads available - ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_PROCESS_THREAD_POOL_NAME); + ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME); try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, autodetectParams.dataCounts(), jobDataCountsPersister)) { ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), normalizerFactory); + ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME); Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, - threadPool.executor(MachineLearning.THREAD_POOL_NAME), job.getAnalysisConfig().getUsePerPartitionNormalization()); + renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization()); AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(), autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime, - executorService, () -> setJobState(jobTask, JobState.FAILED)); + autoDetectExecutorService, () -> setJobState(jobTask, JobState.FAILED)); boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization(); AutoDetectResultProcessor processor = new AutoDetectResultProcessor( client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats()); + ExecutorService autodetectWorkerExecutor; try { - executorService.submit(() -> processor.process(process, usePerPartitionNormalization)); + autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService); + autoDetectExecutorService.submit(() -> processor.process(process, usePerPartitionNormalization)); } catch (EsRejectedExecutionException e) { // If submitting the operation to read the results from the process fails we need to close // the process too, so that other submitted operations to threadpool are stopped. @@ -284,7 +299,7 @@ public class AutodetectProcessManager extends AbstractComponent { throw e; } return new AutodetectCommunicator(job, process, dataCountsReporter, processor, - handler, xContentRegistry); + handler, xContentRegistry, autodetectWorkerExecutor); } } @@ -376,4 +391,81 @@ public class AutodetectProcessManager extends AbstractComponent { } return Optional.of(new Tuple<>(communicator.getDataCounts(), communicator.getModelSizeStats())); } + + ExecutorService createAutodetectExecutorService(ExecutorService executorService) { + AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext()); + executorService.submit(autoDetectWorkerExecutor::start); + return autoDetectWorkerExecutor; + } + + /* + * The autodetect native process can only handle a single operation at a time. In order to guarantee that, all + * operations are initially added to a queue and a worker thread from ml autodetect threadpool will process each + * operation at a time. + */ + class AutodetectWorkerExecutorService extends AbstractExecutorService { + + private final ThreadContext contextHolder; + private final CountDownLatch awaitTermination = new CountDownLatch(1); + private final BlockingQueue queue = new LinkedBlockingQueue<>(100); + + private volatile boolean running = true; + + AutodetectWorkerExecutorService(ThreadContext contextHolder) { + this.contextHolder = contextHolder; + } + + @Override + public void shutdown() { + running = false; + } + + @Override + public List shutdownNow() { + throw new UnsupportedOperationException("not supported"); + } + + @Override + public boolean isShutdown() { + return running == false; + } + + @Override + public boolean isTerminated() { + return awaitTermination.getCount() == 0; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return awaitTermination.await(timeout, unit); + } + + @Override + public void execute(Runnable command) { + boolean added = queue.offer(contextHolder.preserveContext(command)); + if (added == false) { + throw new ElasticsearchStatusException("Unable to submit operation", RestStatus.TOO_MANY_REQUESTS); + } + } + + void start() { + try { + while (running) { + Runnable runnable = queue.poll(500, TimeUnit.MILLISECONDS); + if (runnable != null) { + try { + runnable.run(); + } catch (Exception e) { + logger.error("error handeling job operation", e); + } + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + awaitTermination.countDown(); + } + } + + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java index b965621fba8..8e7b1de8ce3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java @@ -137,7 +137,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { ((Runnable) invocation.getArguments()[0]).run(); return null; }).when(executorService).submit(any(Runnable.class)); - when(threadPool.executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME)).thenReturn(executorService); + when(threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME)).thenReturn(executorService); when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); when(client.execute(same(PostDataAction.INSTANCE), any())).thenReturn(jobDataFuture); when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); @@ -166,7 +166,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); datafeedJobRunner.run(task, handler); - verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); + verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); verify(client, never()).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo"))); verify(client, never()).execute(same(FlushJobAction.INSTANCE), any()); @@ -188,7 +188,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); datafeedJobRunner.run(task, handler); - verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); + verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest("job_id", contentBytes, xContentType))); @@ -218,7 +218,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); datafeedJobRunner.run(task, handler); - verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); + verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); verify(client, never()).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo"))); verify(client, never()).execute(same(FlushJobAction.INSTANCE), any()); @@ -254,7 +254,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { DatafeedJobRunner.Holder holder = datafeedJobRunner.createJobDatafeed(datafeedConfig, job, 100, 100, handler, task); datafeedJobRunner.doDatafeedRealtime(10L, "foo", holder); - verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME), any()); + verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); verify(auditor, times(1)).warning(eq("job_id"), anyString()); verify(client, never()).execute(same(PostDataAction.INSTANCE), any()); verify(client, never()).execute(same(FlushJobAction.INSTANCE), any()); @@ -279,7 +279,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { task = spyDatafeedTask(task); datafeedJobRunner.run(task, handler); - verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); + verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); if (cancelled) { task.stop("test"); verify(handler).accept(null); @@ -287,7 +287,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest("job_id", contentBytes, xContentType))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); - verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME), any()); + verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 8596ca0a72c..eb4ed73e112 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -6,16 +6,13 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; -import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -25,18 +22,16 @@ import org.mockito.Mockito; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; import java.time.Duration; import java.util.Collections; import java.util.Date; -import java.util.List; import java.util.Optional; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -51,30 +46,11 @@ public class AutodetectCommunicatorTests extends ESTestCase { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) { communicator.writeToJob(new ByteArrayInputStream(new byte[0]), - randomFrom(XContentType.values()), params); + randomFrom(XContentType.values()), params, (dataCounts, e) -> {}); Mockito.verify(process).writeResetBucketsControlMessage(params); } } - public void tesWriteUpdateModelPlotMessage() throws IOException { - AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) { - ModelPlotConfig config = new ModelPlotConfig(); - communicator.writeUpdateModelPlotMessage(config); - Mockito.verify(process).writeUpdateModelPlotMessage(config); - } - } - - public void testWriteUpdateDetectorRulesMessage() throws IOException { - AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) { - - List rules = Collections.singletonList(mock(DetectionRule.class)); - communicator.writeUpdateDetectorRulesMessage(1, rules); - Mockito.verify(process).writeUpdateDetectorRulesMessage(1, rules); - } - } - public void testFlushJob() throws IOException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); @@ -82,7 +58,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(true); try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor)) { InterimResultsParams params = InterimResultsParams.builder().build(); - communicator.flushJob(params); + communicator.flushJob(params, (aVoid, e) -> {}); Mockito.verify(process).flushJob(params); } } @@ -93,8 +69,9 @@ public class AutodetectCommunicatorTests extends ESTestCase { when(process.readError()).thenReturn("Mock process is dead"); AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); InterimResultsParams params = InterimResultsParams.builder().build(); - ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> communicator.flushJob(params)); - assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", e.getMessage()); + Exception[] holder = new ElasticsearchException[1]; + communicator.flushJob(params, (aVoid, e1) -> holder[0] = e1); + assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", holder[0].getMessage()); } public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException { @@ -106,7 +83,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { InterimResultsParams params = InterimResultsParams.builder().build(); try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autoDetectResultProcessor)) { - communicator.flushJob(params); + communicator.flushJob(params, (aVoid, e) -> {}); } verify(autoDetectResultProcessor, times(2)).waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1))); @@ -146,6 +123,12 @@ public class AutodetectCommunicatorTests extends ESTestCase { private AutodetectCommunicator createAutodetectCommunicator(AutodetectProcess autodetectProcess, AutoDetectResultProcessor autoDetectResultProcessor) throws IOException { ExecutorService executorService = mock(ExecutorService.class); + when(executorService.submit(any(Callable.class))).thenReturn(mock(Future.class)); + doAnswer(invocationOnMock -> { + Callable runnable = (Callable) invocationOnMock.getArguments()[0]; + runnable.call(); + return mock(Future.class); + }).when(executorService).submit(any(Callable.class)); doAnswer(invocation -> { ((Runnable) invocation.getArguments()[0]).run(); return null; @@ -153,76 +136,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class); return new AutodetectCommunicator(createJobDetails(), autodetectProcess, dataCountsReporter, autoDetectResultProcessor, e -> { - }, new NamedXContentRegistry(Collections.emptyList())); + }, new NamedXContentRegistry(Collections.emptyList()), executorService); } - public void testWriteToJobInUse() throws IOException { - InputStream in = mock(InputStream.class); - when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1); - AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); - XContentType xContentType = randomFrom(XContentType.values()); - - communicator.inUse.set(new CountDownLatch(1)); - expectThrows(ElasticsearchStatusException.class, - () -> communicator.writeToJob(in, xContentType, mock(DataLoadParams.class))); - - communicator.inUse.set(null); - communicator.writeToJob(in, xContentType, - new DataLoadParams(TimeRange.builder().build(), Optional.empty())); - } - - public void testFlushInUse() throws IOException { - AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); - when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - - communicator.inUse.set(new CountDownLatch(1)); - InterimResultsParams params = mock(InterimResultsParams.class); - expectThrows(ElasticsearchStatusException.class, () -> communicator.flushJob(params)); - - communicator.inUse.set(null); - communicator.flushJob(params); - } - - public void testCloseInUse() throws Exception { - AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); - when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - - CountDownLatch latch = mock(CountDownLatch.class); - communicator.inUse.set(latch); - communicator.close(); - verify(latch, times(1)).await(); - - communicator.inUse.set(null); - communicator.close(); - } - - public void testWriteUpdateModelPlotConfigMessageInUse() throws Exception { - AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - - communicator.inUse.set(new CountDownLatch(1)); - expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateModelPlotMessage(mock(ModelPlotConfig.class))); - - communicator.inUse.set(null); - communicator.writeUpdateModelPlotMessage(mock(ModelPlotConfig.class)); - } - - public void testWriteUpdateDetectorRulesMessageInUse() throws Exception { - AutodetectProcess process = mockAutodetectProcessWithOutputStream(); - AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - - List rules = Collections.singletonList(mock(DetectionRule.class)); - communicator.inUse.set(new CountDownLatch(1)); - expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateDetectorRulesMessage(0, rules)); - - communicator.inUse.set(null); - communicator.writeUpdateDetectorRulesMessage(0, rules); - } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index e4c6a493db0..257b02c3053 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -5,12 +5,12 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.client.Client; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; @@ -51,7 +51,10 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; import java.util.function.Consumer; import static org.elasticsearch.mock.orig.Mockito.doAnswer; @@ -65,12 +68,13 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; /** * Calling the - * {@link AutodetectProcessManager#processData(String, InputStream, XContentType, DataLoadParams)} + * {@link AutodetectProcessManager#processData(String, InputStream, XContentType, DataLoadParams, BiConsumer)} * method causes an AutodetectCommunicator to be created on demand. Most of * these tests have to do that before they can assert other things */ @@ -127,8 +131,12 @@ public class AutodetectProcessManagerTests extends ESTestCase { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); ThreadPool.Cancellable cancellable = mock(ThreadPool.Cancellable.class); when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(cancellable); + ExecutorService executorService = mock(ExecutorService.class); + Future future = mock(Future.class); + when(executorService.submit(any(Callable.class))).thenReturn(future); when(threadPool.executor(anyString())).thenReturn(EsExecutors.newDirectExecutorService()); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); when(autodetectProcess.isProcessAlive()).thenReturn(true); @@ -140,6 +148,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, new NamedXContentRegistry(Collections.emptyList()))); + doReturn(executorService).when(manager).createAutodetectExecutorService(any()); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") @@ -174,7 +183,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty()); manager.openJob("foo", jobTask, false, e -> {}); manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()), - params); + params, (dataCounts1, e) -> {}); assertEquals(1, manager.numberOfOpenJobs()); } @@ -185,13 +194,18 @@ public class AutodetectProcessManagerTests extends ESTestCase { DataLoadParams params = mock(DataLoadParams.class); InputStream inputStream = createInputStream(""); XContentType xContentType = randomFrom(XContentType.values()); - doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream, - xContentType, params); + doAnswer(invocationOnMock -> { + BiConsumer handler = (BiConsumer) invocationOnMock.getArguments()[3]; + handler.accept(null, new IOException("blah")); + return null; + }).when(communicator).writeToJob(eq(inputStream), same(xContentType), eq(params), any()); + JobTask jobTask = mock(JobTask.class); manager.openJob("foo", jobTask, false, e -> {}); - ESTestCase.expectThrows(ElasticsearchException.class, - () -> manager.processData("foo", inputStream, xContentType, params)); + Exception[] holder = new Exception[1]; + manager.processData("foo", inputStream, xContentType, params, (dataCounts1, e) -> holder[0] = e); + assertNotNull(holder[0]); } public void testCloseJob() { @@ -202,7 +216,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); manager.openJob("foo", jobTask, false, e -> {}); manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()), - mock(DataLoadParams.class)); + mock(DataLoadParams.class), (dataCounts1, e) -> {}); // job is created assertEquals(1, manager.numberOfOpenJobs()); @@ -219,8 +233,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { InputStream inputStream = createInputStream(""); JobTask jobTask = mock(JobTask.class); manager.openJob("foo", jobTask, false, e -> {}); - manager.processData("foo", inputStream, xContentType, params); - verify(communicator).writeToJob(inputStream, xContentType, params); + manager.processData("foo", inputStream, xContentType, params, (dataCounts1, e) -> {}); + verify(communicator).writeToJob(same(inputStream), same(xContentType), same(params), any()); } public void testFlush() throws IOException { @@ -231,12 +245,12 @@ public class AutodetectProcessManagerTests extends ESTestCase { InputStream inputStream = createInputStream(""); manager.openJob("foo", jobTask, false, e -> {}); manager.processData("foo", inputStream, randomFrom(XContentType.values()), - mock(DataLoadParams.class)); + mock(DataLoadParams.class), (dataCounts1, e) -> {}); InterimResultsParams params = InterimResultsParams.builder().build(); - manager.flushJob("foo", params); + manager.flushJob("foo", params, e -> {}); - verify(communicator).flushJob(params); + verify(communicator).flushJob(same(params), any()); } public void testFlushThrows() throws IOException { @@ -244,10 +258,15 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo"); InterimResultsParams params = InterimResultsParams.builder().build(); - doThrow(new IOException("blah")).when(communicator).flushJob(params); + doAnswer(invocationOnMock -> { + BiConsumer handler = (BiConsumer) invocationOnMock.getArguments()[1]; + handler.accept(null, new IOException("blah")); + return null; + }).when(communicator).flushJob(same(params), any()); - ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> manager.flushJob("foo", params)); - assertEquals("[foo] exception while flushing job", e.getMessage()); + Exception[] holder = new Exception[1]; + manager.flushJob("foo", params, e -> holder[0] = e); + assertEquals("[foo] exception while flushing job", holder[0].getMessage()); } public void testwriteUpdateProcessMessage() throws IOException { @@ -256,9 +275,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { ModelPlotConfig modelConfig = mock(ModelPlotConfig.class); List rules = Collections.singletonList(mock(DetectionRule.class)); List detectorUpdates = Collections.singletonList(new JobUpdate.DetectorUpdate(2, null, rules)); - manager.writeUpdateProcessMessage("foo", detectorUpdates, modelConfig); - verify(communicator).writeUpdateModelPlotMessage(modelConfig); - verify(communicator).writeUpdateDetectorRulesMessage(2, rules); + manager.writeUpdateProcessMessage("foo", detectorUpdates, modelConfig, e -> {}); + verify(communicator).writeUpdateProcessMessage(same(modelConfig), same(detectorUpdates), any()); } public void testJobHasActiveAutodetectProcess() throws IOException { @@ -269,7 +287,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); manager.openJob("foo", jobTask, false, e -> {}); manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()), - mock(DataLoadParams.class)); + mock(DataLoadParams.class), (dataCounts1, e) -> {}); assertTrue(manager.jobHasActiveAutodetectProcess("foo")); assertFalse(manager.jobHasActiveAutodetectProcess("bar")); @@ -277,16 +295,24 @@ public class AutodetectProcessManagerTests extends ESTestCase { public void testProcessData_GivenStateNotOpened() throws IOException { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); - when(communicator.writeToJob(any(), any(), any())).thenReturn(new DataCounts("foo")); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) invocationOnMock.getArguments()[3]; + handler.accept(new DataCounts("foo"), null); + return null; + }).when(communicator).writeToJob(any(), any(), any(), any()); AutodetectProcessManager manager = createManager(communicator); JobTask jobTask = mock(JobTask.class); manager.openJob("foo", jobTask, false, e -> {}); InputStream inputStream = createInputStream(""); - DataCounts dataCounts = manager.processData("foo", inputStream, - randomFrom(XContentType.values()), mock(DataLoadParams.class)); + DataCounts[] dataCounts = new DataCounts[1]; + manager.processData("foo", inputStream, + randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> { + dataCounts[0] = dataCounts1; + }); - assertThat(dataCounts, equalTo(new DataCounts("foo"))); + assertThat(dataCounts[0], equalTo(new DataCounts("foo"))); } public void testCreate_notEnoughThreads() throws IOException { @@ -342,7 +368,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); manager.openJob(jobId, jobTask, false, e -> {}); manager.processData(jobId, createInputStream(""), randomFrom(XContentType.values()), - mock(DataLoadParams.class)); + mock(DataLoadParams.class), (dataCounts, e) -> {}); return manager; }