diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java index 2a3d2831d2c..41e19b69b8c 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java @@ -78,7 +78,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessFactor import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess; import org.elasticsearch.xpack.ml.job.process.autodetect.NativeAutodetectProcessFactory; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; import org.elasticsearch.xpack.ml.job.process.normalizer.MultiplyingNormalizerProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; @@ -214,7 +213,7 @@ public class MlPlugin extends Plugin implements ActionPlugin { try { NativeController nativeController = new NativeController(env, new NamedPipeHelper()); nativeController.tailLogsInThread(); - autodetectProcessFactory = new NativeAutodetectProcessFactory(jobProvider, env, settings, nativeController); + autodetectProcessFactory = new NativeAutodetectProcessFactory(jobProvider, env, settings, nativeController, client); normalizerProcessFactory = new NativeNormalizerProcessFactory(env, settings, nativeController); } catch (IOException e) { throw new ElasticsearchException("Failed to create native process factories", e); @@ -228,9 +227,8 @@ public class MlPlugin extends Plugin implements ActionPlugin { } NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, threadPool.executor(MlPlugin.THREAD_POOL_NAME)); - AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings); AutodetectProcessManager dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider, - jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, autodetectProcessFactory, normalizerFactory); + jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory); DatafeedJobRunner datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider, System::currentTimeMillis); PersistentActionService persistentActionService = new PersistentActionService(Settings.EMPTY, clusterService, client); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 1d6cc54af62..c501be6a633 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; @@ -280,22 +279,6 @@ public class JobResultsPersister extends AbstractComponent { // read again by this process } - /** - * Persist state sent from the native process - */ - public void persistBulkState(String jobId, BytesReference bytesRef) { - try { - // No validation - assume the native process has formatted the state correctly - byte[] bytes = bytesRef.toBytesRef().bytes; - logger.trace("[{}] ES API CALL: bulk index", jobId); - client.prepareBulk() - .add(bytes, 0, bytes.length) - .execute().actionGet(); - } catch (Exception e) { - logger.error(new ParameterizedMessage("[{}] Error persisting bulk state", jobId), e); - } - } - /** * Delete any existing interim results synchronously */ diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 1f1daacda3f..1bdccc6b548 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -11,14 +11,12 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.process.CountingInputStream; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; @@ -34,7 +32,6 @@ import java.time.Duration; import java.time.ZonedDateTime; import java.util.Optional; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; @@ -52,23 +49,13 @@ public class AutodetectCommunicator implements Closeable { final AtomicReference inUse = new AtomicReference<>(); - public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process, - DataCountsReporter dataCountsReporter, AutoDetectResultProcessor autoDetectResultProcessor, - StateProcessor stateProcessor, Consumer handler) { + public AutodetectCommunicator(Job job, AutodetectProcess process, DataCountsReporter dataCountsReporter, + AutoDetectResultProcessor autoDetectResultProcessor, Consumer handler) { this.job = job; this.autodetectProcess = process; this.dataCountsReporter = dataCountsReporter; this.autoDetectResultProcessor = autoDetectResultProcessor; this.handler = handler; - - AnalysisConfig analysisConfig = job.getAnalysisConfig(); - boolean usePerPartitionNormalization = analysisConfig.getUsePerPartitionNormalization(); - autoDetectExecutor.execute(() -> - autoDetectResultProcessor.process(process.getProcessOutStream(), usePerPartitionNormalization) - ); - autoDetectExecutor.execute(() -> - stateProcessor.process(job.getId(), process.getPersistStream()) - ); } public void writeJobInputHeader() throws IOException { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java index 467e8b7ec80..57b216cb726 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java @@ -7,11 +7,12 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; +import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import java.io.Closeable; import java.io.IOException; -import java.io.InputStream; import java.time.ZonedDateTime; +import java.util.Iterator; /** * Interface representing the native C++ autodetect process @@ -60,16 +61,9 @@ public interface AutodetectProcess extends Closeable { void flushStream() throws IOException; /** - * Autodetect's output stream - * @return output stream + * @return stream of autodetect results. */ - InputStream getProcessOutStream(); - - /** - * Autodetect's state persistence stream - * @return persist stream - */ - InputStream getPersistStream(); + Iterator readAutodetectResults(); /** * The time the process was started diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 632aa0daaa3..ff7a582701a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MlPlugin; @@ -28,8 +29,6 @@ import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersiste import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; @@ -66,11 +65,9 @@ public class AutodetectProcessManager extends AbstractComponent { private final ThreadPool threadPool; private final JobManager jobManager; private final JobProvider jobProvider; - private final AutodetectResultsParser parser; private final AutodetectProcessFactory autodetectProcessFactory; private final NormalizerFactory normalizerFactory; - private final StateProcessor stateProcessor; private final JobResultsPersister jobResultsPersister; private final JobDataCountsPersister jobDataCountsPersister; @@ -80,20 +77,18 @@ public class AutodetectProcessManager extends AbstractComponent { public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager, JobProvider jobProvider, JobResultsPersister jobResultsPersister, - JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser, + JobDataCountsPersister jobDataCountsPersister, AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory) { super(settings); this.client = client; this.threadPool = threadPool; this.maxAllowedRunningJobs = MAX_RUNNING_JOBS_PER_NODE.get(settings); - this.parser = parser; this.autodetectProcessFactory = autodetectProcessFactory; this.normalizerFactory = normalizerFactory; this.jobManager = jobManager; this.jobProvider = jobProvider; this.jobResultsPersister = jobResultsPersister; - this.stateProcessor = new StateProcessor(settings, jobResultsPersister); this.jobDataCountsPersister = jobDataCountsPersister; this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>(); @@ -227,26 +222,25 @@ public class AutodetectProcessManager extends AbstractComponent { RestStatus.CONFLICT); } - // TODO norelease, once we remove black hole process - // then we can remove this method and move not enough threads logic to the auto detect process factory Job job = jobManager.getJobOrThrowIfUnknown(jobId); // A TP with no queue, so that we fail immediately if there are no threads available ExecutorService executorService = threadPool.executor(MlPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME); - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job.getId(), dataCounts, jobDataCountsPersister)) { - ScoresUpdater scoresUpdator = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), + ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), normalizerFactory); - Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdator, + Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, threadPool.executor(MlPlugin.THREAD_POOL_NAME), job.getAnalysisConfig().getUsePerPartitionNormalization()); - AutoDetectResultProcessor processor = new AutoDetectResultProcessor(jobId, renormalizer, jobResultsPersister, parser); - AutodetectProcess process = null; + AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, modelSnapshot, quantiles, filters, + ignoreDowntime, executorService); + boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization(); + AutoDetectResultProcessor processor = new AutoDetectResultProcessor(jobId, renormalizer, jobResultsPersister); try { - process = autodetectProcessFactory.createAutodetectProcess(job, modelSnapshot, quantiles, filters, - ignoreDowntime, executorService); - return new AutodetectCommunicator(executorService, job, process, dataCountsReporter, processor, stateProcessor, handler); - } catch (Exception e) { + executorService.submit(() -> processor.process(process, usePerPartitionNormalization)); + } catch (EsRejectedExecutionException e) { + // If submitting the operation to read the results from the process fails we need to close + // the process too, so that other submitted operations to threadpool are stopped. try { IOUtils.close(process); } catch (IOException ioe) { @@ -254,6 +248,7 @@ public class AutodetectProcessManager extends AbstractComponent { } throw e; } + return new AutodetectCommunicator(job, process, dataCountsReporter, processor, handler); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index b3dc959f155..6a4cd6314d0 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -7,25 +7,22 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import java.io.IOException; -import java.io.InputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.nio.charset.StandardCharsets; import java.time.ZonedDateTime; +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; /** * A placeholder class simulating the actions of the native Autodetect process. * Most methods consume data without performing any action however, after a call to * {@link #flushJob(InterimResultsParams)} a {@link org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement} - * message is expected on the {@link #getProcessOutStream()} stream. This class writes the flush + * message is expected on the {@link #readAutodetectResults()} ()} stream. This class writes the flush * acknowledgement immediately. */ public class BlackHoleAutodetectProcess implements AutodetectProcess { @@ -33,28 +30,11 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { private static final Logger LOGGER = Loggers.getLogger(BlackHoleAutodetectProcess.class); private static final String FLUSH_ID = "flush-1"; - private final PipedInputStream processOutStream; - private final PipedInputStream persistStream; - private PipedOutputStream pipedProcessOutStream; - private PipedOutputStream pipedPersistStream; private final ZonedDateTime startTime; + private final BlockingQueue results = new ArrayBlockingQueue<>(128); + public BlackHoleAutodetectProcess() { - processOutStream = new PipedInputStream(); - persistStream = new PipedInputStream(); - try { - // jackson tries to read the first 4 bytes: - // if we don't do this the autodetect communication would fail starting - pipedProcessOutStream = new PipedOutputStream(processOutStream); - pipedProcessOutStream.write(' '); - pipedProcessOutStream.write(' '); - pipedProcessOutStream.write(' '); - pipedProcessOutStream.write('['); - pipedProcessOutStream.flush(); - pipedPersistStream = new PipedOutputStream(persistStream); - } catch (IOException e) { - LOGGER.error("Error connecting PipedOutputStream", e); - } startTime = ZonedDateTime.now(); } @@ -71,7 +51,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { } /** - * Accept the request do nothing with it but write the flush acknowledgement to {@link #getProcessOutStream()} + * Accept the request do nothing with it but write the flush acknowledgement to {@link #readAutodetectResults()} * @param params Should interim results be generated * @return {@link #FLUSH_ID} */ @@ -79,11 +59,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { public String flushJob(InterimResultsParams params) throws IOException { FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID); AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement); - XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent()); - builder.value(result); - pipedProcessOutStream.write(builder.string().getBytes(StandardCharsets.UTF_8)); - pipedProcessOutStream.flush(); - pipedProcessOutStream.write(','); + results.add(result); return FLUSH_ID; } @@ -93,22 +69,30 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { @Override public void close() throws IOException { - pipedProcessOutStream.write('{'); - pipedProcessOutStream.write('}'); - pipedProcessOutStream.write(']'); - pipedProcessOutStream.flush(); - pipedProcessOutStream.close(); - pipedPersistStream.close(); } @Override - public InputStream getProcessOutStream() { - return processOutStream; - } + public Iterator readAutodetectResults() { + // Create a custom iterator here, because ArrayBlockingQueue iterator and stream are not blocking when empty: + return new Iterator() { - @Override - public InputStream getPersistStream() { - return persistStream; + AutodetectResult result; + + @Override + public boolean hasNext() { + try { + result = results.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return true; + } + + @Override + public AutodetectResult next() { + return result; + } + }; } @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index 64a51b5d309..b3c60831d15 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -8,12 +8,14 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter; +import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler; +import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.BufferedOutputStream; @@ -23,6 +25,7 @@ import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.time.ZonedDateTime; +import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -40,25 +43,28 @@ class NativeAutodetectProcess implements AutodetectProcess { private final CppLogMessageHandler cppLogHandler; private final OutputStream processInStream; private final InputStream processOutStream; - private final InputStream persistStream; private final LengthEncodedWriter recordWriter; private final ZonedDateTime startTime; private final int numberOfAnalysisFields; private final List filesToDelete; private Future logTailFuture; + private Future stateProcessorFuture; + private AutodetectResultsParser resultsParser; NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, - InputStream persistStream, int numberOfAnalysisFields, List filesToDelete, - ExecutorService executorService) throws EsRejectedExecutionException { + int numberOfAnalysisFields, List filesToDelete, AutodetectResultsParser resultsParser) { this.jobId = jobId; cppLogHandler = new CppLogMessageHandler(jobId, logStream); this.processInStream = new BufferedOutputStream(processInStream); this.processOutStream = processOutStream; - this.persistStream = persistStream; this.recordWriter = new LengthEncodedWriter(this.processInStream); startTime = ZonedDateTime.now(); this.numberOfAnalysisFields = numberOfAnalysisFields; this.filesToDelete = filesToDelete; + this.resultsParser = resultsParser; + } + + public void start(ExecutorService executorService, StateProcessor stateProcessor, InputStream persistStream) { logTailFuture = executorService.submit(() -> { try (CppLogMessageHandler h = cppLogHandler) { h.tailStream(); @@ -66,6 +72,9 @@ class NativeAutodetectProcess implements AutodetectProcess { LOGGER.error(new ParameterizedMessage("[{}] Error tailing C++ process logs", new Object[] { jobId }), e); } }); + stateProcessorFuture = executorService.submit(() -> { + stateProcessor.process(jobId, persistStream); + }); } @Override @@ -105,6 +114,8 @@ class NativeAutodetectProcess implements AutodetectProcess { // wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger // this may take a long time as it persists the model state logTailFuture.get(30, TimeUnit.MINUTES); + // the state processor should have stopped by now as the process should have exit + stateProcessorFuture.get(1, TimeUnit.SECONDS); if (cppLogHandler.seenFatalError()) { throw ExceptionsHelper.serverError(cppLogHandler.getErrors()); } @@ -135,13 +146,8 @@ class NativeAutodetectProcess implements AutodetectProcess { } @Override - public InputStream getProcessOutStream() { - return processOutStream; - } - - @Override - public InputStream getPersistStream() { - return persistStream; + public Iterator readAutodetectResults() { + return resultsParser.parseResults(processOutStream); } @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 14683504036..22cca1d7eb5 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -7,18 +7,21 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.process.NativeController; import org.elasticsearch.xpack.ml.job.process.ProcessCtrl; import org.elasticsearch.xpack.ml.job.process.ProcessPipes; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; -import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; @@ -39,16 +42,19 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(2); + private final Client client; private final Environment env; private final Settings settings; private final JobProvider jobProvider; private final NativeController nativeController; - public NativeAutodetectProcessFactory(JobProvider jobProvider, Environment env, Settings settings, NativeController nativeController) { + public NativeAutodetectProcessFactory(JobProvider jobProvider, Environment env, Settings settings, + NativeController nativeController, Client client) { this.env = Objects.requireNonNull(env); this.settings = Objects.requireNonNull(settings); this.jobProvider = Objects.requireNonNull(jobProvider); this.nativeController = Objects.requireNonNull(nativeController); + this.client = client; } @Override @@ -60,11 +66,14 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory createNativeProcess(job, quantiles, filters, processPipes, ignoreDowntime, filesToDelete); int numberOfAnalysisFields = job.getAnalysisConfig().analysisFields().size(); - NativeAutodetectProcess autodetect = null; + StateProcessor stateProcessor = new StateProcessor(settings, client); + AutodetectResultsParser resultsParser = new AutodetectResultsParser(settings); + NativeAutodetectProcess autodetect = new NativeAutodetectProcess( + job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(), + processPipes.getProcessOutStream().get(), numberOfAnalysisFields, filesToDelete, resultsParser + ); try { - autodetect = new NativeAutodetectProcess(job.getId(), processPipes.getLogStream().get(), - processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), - processPipes.getPersistStream().get(), numberOfAnalysisFields, filesToDelete, executorService); + autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get()); if (modelSnapshot != null) { // TODO (norelease): I don't think we should do this in the background. If this happens then we should wait // until restore it is done before we can accept data. diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 8b405f4ed1c..7cc63c97086 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; @@ -21,16 +22,14 @@ import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput; import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; -import java.io.InputStream; import java.time.Duration; import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.stream.Stream; /** * A runnable class that reads the autodetect process output in the - * {@link #process(InputStream, boolean)} method and persists parsed + * {@link #process(AutodetectProcess, boolean)} method and persists parsed * results via the {@linkplain JobResultsPersister} passed in the constructor. *

* Has methods to register and remove alert observers. @@ -52,35 +51,31 @@ public class AutoDetectResultProcessor { private final String jobId; private final Renormalizer renormalizer; private final JobResultsPersister persister; - private final AutodetectResultsParser parser; final CountDownLatch completionLatch = new CountDownLatch(1); private final FlushListener flushListener; private volatile ModelSizeStats latestModelSizeStats; - public AutoDetectResultProcessor(String jobId, Renormalizer renormalizer, JobResultsPersister persister, - AutodetectResultsParser parser) { - this(jobId, renormalizer, persister, parser, new FlushListener()); + public AutoDetectResultProcessor(String jobId, Renormalizer renormalizer, JobResultsPersister persister) { + this(jobId, renormalizer, persister, new FlushListener()); } - AutoDetectResultProcessor(String jobId, Renormalizer renormalizer, JobResultsPersister persister, AutodetectResultsParser parser, - FlushListener flushListener) { + AutoDetectResultProcessor(String jobId, Renormalizer renormalizer, JobResultsPersister persister, FlushListener flushListener) { this.jobId = jobId; this.renormalizer = renormalizer; this.persister = persister; - this.parser = parser; this.flushListener = flushListener; ModelSizeStats.Builder builder = new ModelSizeStats.Builder(jobId); latestModelSizeStats = builder.build(); } - public void process(InputStream in, boolean isPerPartitionNormalization) { - try (Stream stream = parser.parseResults(in)) { + public void process(AutodetectProcess process, boolean isPerPartitionNormalization) { + Context context = new Context(jobId, isPerPartitionNormalization, persister.bulkPersisterBuilder(jobId)); + try { int bucketCount = 0; - Iterator iterator = stream.iterator(); - Context context = new Context(jobId, isPerPartitionNormalization, persister.bulkPersisterBuilder(jobId)); + Iterator iterator = process.readAutodetectResults(); while (iterator.hasNext()) { AutodetectResult result = iterator.next(); processResult(context, result); @@ -89,7 +84,6 @@ public class AutoDetectResultProcessor { LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount); } } - context.bulkResultsPersister.executeRequest(); LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount); LOGGER.info("[{}] Parse results Complete", jobId); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParser.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParser.java index e6a21d2e400..368edf2e07b 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParser.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParser.java @@ -17,10 +17,6 @@ import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; -import java.util.Spliterator; -import java.util.Spliterators; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; /** @@ -35,7 +31,7 @@ public class AutodetectResultsParser extends AbstractComponent { super(settings); } - public Stream parseResults(InputStream in) throws ElasticsearchParseException { + public Iterator parseResults(InputStream in) throws ElasticsearchParseException { try { XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, in); XContentParser.Token token = parser.nextToken(); @@ -43,9 +39,7 @@ public class AutodetectResultsParser extends AbstractComponent { if (token != XContentParser.Token.START_ARRAY) { throw new ElasticsearchParseException("unexpected token [" + token + "]"); } - Spliterator spliterator = Spliterators.spliterator(new AutodetectResultIterator(parser), Long.MAX_VALUE, 0); - return StreamSupport.stream(spliterator, false) - .onClose(() -> consumeAndCloseStream(in)); + return new AutodetectResultIterator(in, parser); } catch (IOException e) { consumeAndCloseStream(in); throw new ElasticsearchParseException(e.getMessage(), e); @@ -70,10 +64,12 @@ public class AutodetectResultsParser extends AbstractComponent { private class AutodetectResultIterator implements Iterator { + private final InputStream in; private final XContentParser parser; private XContentParser.Token token; - private AutodetectResultIterator(XContentParser parser) { + private AutodetectResultIterator(InputStream in, XContentParser parser) { + this.in = in; this.parser = parser; token = parser.currentToken(); } @@ -86,6 +82,7 @@ public class AutodetectResultsParser extends AbstractComponent { throw new ElasticsearchParseException(e.getMessage(), e); } if (token == XContentParser.Token.END_ARRAY) { + consumeAndCloseStream(in); return false; } else if (token != XContentParser.Token.START_OBJECT) { logger.error("Expecting Json Field name token after the Start Object token"); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java index 2b8f8d010bb..d3376adaa4d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; @@ -22,11 +24,11 @@ import java.io.InputStream; public class StateProcessor extends AbstractComponent { private static final int READ_BUF_SIZE = 8192; - private final JobResultsPersister persister; + private final Client client; - public StateProcessor(Settings settings, JobResultsPersister persister) { + public StateProcessor(Settings settings, Client client) { super(settings); - this.persister = persister; + this.client = client; } public void process(String jobId, InputStream in) { @@ -64,7 +66,8 @@ public class StateProcessor extends AbstractComponent { // No more zero bytes in this block break; } - persister.persistBulkState(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom)); + // No validation - assume the native process has formatted the state correctly + persist(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom)); splitFrom = nextZeroByte + 1; } if (splitFrom >= bytesRef.length()) { @@ -73,6 +76,17 @@ public class StateProcessor extends AbstractComponent { return bytesRef.slice(splitFrom, bytesRef.length() - splitFrom); } + void persist(String jobId, BytesReference bytes) { + try { + logger.trace("[{}] ES API CALL: bulk index", jobId); + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(bytes, null, null); + client.bulk(bulkRequest).actionGet(); + } catch (Exception e) { + logger.error(new ParameterizedMessage("[{}] Error persisting bulk state", jobId), e); + } + } + private static int findNextZeroByte(BytesReference bytesRef, int searchFrom, int splitFrom) { for (int i = Math.max(searchFrom, splitFrom); i < bytesRef.length(); ++i) { if (bytesRef.get(i) == 0) { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index f1d900183cf..3807389bb8a 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -8,8 +8,6 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; @@ -20,8 +18,8 @@ import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; @@ -29,6 +27,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.process.normalizer.noop.NoOpRenormalizer; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; +import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.BucketTests; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; @@ -38,77 +37,59 @@ import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput; import org.elasticsearch.xpack.ml.job.results.ModelDebugOutputTests; import org.junit.Before; -import java.io.IOException; -import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { private static final String JOB_ID = "foo"; private Renormalizer renormalizer; private JobResultsPersister jobResultsPersister; - private AutodetectResultsParser autodetectResultsParser; private JobProvider jobProvider; @Before public void createComponents() { renormalizer = new NoOpRenormalizer(); jobResultsPersister = new JobResultsPersister(nodeSettings(), client()); - autodetectResultsParser = new AutodetectResultsParser(nodeSettings()); jobProvider = new JobProvider(client(), 1); } public void testProcessResults() throws Exception { createJob(); + AutoDetectResultProcessor resultProcessor = new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister); - AutoDetectResultProcessor resultProcessor = - new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister, autodetectResultsParser); - - PipedOutputStream outputStream = new PipedOutputStream(); - PipedInputStream inputStream = new PipedInputStream(outputStream); - + ResultsBuilder builder = new ResultsBuilder(); Bucket bucket = createBucket(false); - assertNotNull(bucket); + builder.addBucket(bucket); List records = createRecords(false); + builder.addRecords(records); List influencers = createInfluencers(false); + builder.addInfluencers(influencers); CategoryDefinition categoryDefinition = createCategoryDefinition(); + builder.addCategoryDefinition(categoryDefinition); ModelDebugOutput modelDebugOutput = createModelDebugOutput(); + builder.addModelDebugOutput(modelDebugOutput); ModelSizeStats modelSizeStats = createModelSizeStats(); + builder.addModelSizeStats(modelSizeStats); ModelSnapshot modelSnapshot = createModelSnapshot(); + builder.addModelSnapshot(modelSnapshot); Quantiles quantiles = createQuantiles(); + builder.addQuantiles(quantiles); - // Add the bucket last as the bucket result triggers persistence - ResultsBuilder resultBuilder = new ResultsBuilder() - .start() - .addRecords(records) - .addInfluencers(influencers) - .addCategoryDefinition(categoryDefinition) - .addModelDebugOutput(modelDebugOutput) - .addModelSizeStats(modelSizeStats) - .addModelSnapshot(modelSnapshot) - .addQuantiles(quantiles) - .addBucket(bucket) - .end(); - - new Thread(() -> { - try { - writeResults(resultBuilder.build(), outputStream); - } catch (IOException e) { - } - }).start(); - - resultProcessor.process(inputStream, false); + resultProcessor.process(builder.buildTestProcess(), false); jobResultsPersister.commitResultWrites(JOB_ID); BucketsQueryBuilder.BucketsQuery bucketsQuery = new BucketsQueryBuilder().includeInterim(true).build(); @@ -125,7 +106,8 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { QueryPage persistedInfluencers = getInfluencers(); assertResultsAreSame(influencers, persistedInfluencers); - QueryPage persistedDefinition = getCategoryDefinition(Long.toString(categoryDefinition.getCategoryId())); + QueryPage persistedDefinition = + getCategoryDefinition(Long.toString(categoryDefinition.getCategoryId())); assertEquals(1, persistedDefinition.count()); assertEquals(categoryDefinition, persistedDefinition.results().get(0)); @@ -147,33 +129,18 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { public void testDeleteInterimResults() throws Exception { createJob(); - - AutoDetectResultProcessor resultProcessor = - new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister, autodetectResultsParser); - - PipedOutputStream outputStream = new PipedOutputStream(); - PipedInputStream inputStream = new PipedInputStream(outputStream); - + AutoDetectResultProcessor resultProcessor = new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister); Bucket nonInterimBucket = createBucket(false); Bucket interimBucket = createBucket(true); ResultsBuilder resultBuilder = new ResultsBuilder() - .start() .addRecords(createRecords(true)) .addInfluencers(createInfluencers(true)) .addBucket(interimBucket) // this will persist the interim results .addFlushAcknowledgement(createFlushAcknowledgement()) - .addBucket(nonInterimBucket) // and this will delete the interim results - .end(); + .addBucket(nonInterimBucket); // and this will delete the interim results - new Thread(() -> { - try { - writeResults(resultBuilder.build(), outputStream); - } catch (IOException e) { - } - }).start(); - - resultProcessor.process(inputStream, false); + resultProcessor.process(resultBuilder.buildTestProcess(), false); jobResultsPersister.commitResultWrites(JOB_ID); QueryPage persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build()); @@ -192,18 +159,11 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { public void testMultipleFlushesBetweenPersisting() throws Exception { createJob(); - - AutoDetectResultProcessor resultProcessor = - new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister, autodetectResultsParser); - - PipedOutputStream outputStream = new PipedOutputStream(); - PipedInputStream inputStream = new PipedInputStream(outputStream); - + AutoDetectResultProcessor resultProcessor = new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister); Bucket finalBucket = createBucket(true); List finalAnomalyRecords = createRecords(true); ResultsBuilder resultBuilder = new ResultsBuilder() - .start() .addRecords(createRecords(true)) .addInfluencers(createInfluencers(true)) .addBucket(createBucket(true)) // this will persist the interim results @@ -212,17 +172,9 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { .addBucket(createBucket(true)) // and this will delete the interim results and persist the new interim bucket & records .addFlushAcknowledgement(createFlushAcknowledgement()) .addRecords(finalAnomalyRecords) - .addBucket(finalBucket) // this deletes the previous interim and persists final bucket & records - .end(); + .addBucket(finalBucket); // this deletes the previous interim and persists final bucket & records - new Thread(() -> { - try { - writeResults(resultBuilder.build(), outputStream); - } catch (IOException e) { - } - }).start(); - - resultProcessor.process(inputStream, false); + resultProcessor.process(resultBuilder.buildTestProcess(), false); jobResultsPersister.commitResultWrites(JOB_ID); QueryPage persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build()); @@ -238,32 +190,17 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { public void testEndOfStreamTriggersPersisting() throws Exception { createJob(); - - AutoDetectResultProcessor resultProcessor = - new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister, autodetectResultsParser); - - PipedOutputStream outputStream = new PipedOutputStream(); - PipedInputStream inputStream = new PipedInputStream(outputStream); - + AutoDetectResultProcessor resultProcessor = new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister); Bucket bucket = createBucket(false); List firstSetOfRecords = createRecords(false); List secondSetOfRecords = createRecords(false); ResultsBuilder resultBuilder = new ResultsBuilder() - .start() .addRecords(firstSetOfRecords) .addBucket(bucket) // bucket triggers persistence - .addRecords(secondSetOfRecords) - .end(); // end of stream should persist the second bunch of records + .addRecords(secondSetOfRecords); - new Thread(() -> { - try { - writeResults(resultBuilder.build(), outputStream); - } catch (IOException e) { - } - }).start(); - - resultProcessor.process(inputStream, false); + resultProcessor.process(resultBuilder.buildTestProcess(), false); jobResultsPersister.commitResultWrites(JOB_ID); QueryPage persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build()); @@ -275,10 +212,6 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { assertResultsAreSame(allRecords, persistedRecords); } - private void writeResults(XContentBuilder builder, OutputStream out) throws IOException { - builder.bytes().writeTo(out); - } - private void createJob() { Detector.Builder detectorBuilder = new Detector.Builder("avg", "metric_field"); detectorBuilder.setByFieldName("by_instance"); @@ -369,72 +302,61 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { } private class ResultsBuilder { - private XContentBuilder contentBuilder; - private ResultsBuilder() throws IOException { - contentBuilder = XContentFactory.jsonBuilder(); - } + private List results = new ArrayList<>(); + FlushAcknowledgement flushAcknowledgement; - ResultsBuilder start() throws IOException { - contentBuilder.startArray(); + ResultsBuilder addBucket(Bucket bucket) { + results.add(new AutodetectResult(Objects.requireNonNull(bucket), null, null, null, null, null, null, null, null)); return this; } - ResultsBuilder addBucket(Bucket bucket) throws IOException { - contentBuilder.startObject().field(Bucket.RESULT_TYPE_FIELD.getPreferredName(), bucket).endObject(); + ResultsBuilder addRecords(List records) { + results.add(new AutodetectResult(null, records, null, null, null, null, null, null, null)); return this; } - ResultsBuilder addRecords(List records) throws IOException { - contentBuilder.startObject().field(AnomalyRecord.RESULTS_FIELD.getPreferredName(), records).endObject(); + ResultsBuilder addInfluencers(List influencers) { + results.add(new AutodetectResult(null, null, influencers, null, null, null, null, null, null)); return this; } - ResultsBuilder addInfluencers(List influencers) throws IOException { - contentBuilder.startObject().field(Influencer.RESULTS_FIELD.getPreferredName(), influencers).endObject(); + ResultsBuilder addCategoryDefinition(CategoryDefinition categoryDefinition) { + results.add(new AutodetectResult(null, null, null, null, null, null, null, categoryDefinition, null)); return this; } - ResultsBuilder addCategoryDefinition(CategoryDefinition definition) throws IOException { - contentBuilder.startObject().field(CategoryDefinition.TYPE.getPreferredName(), definition).endObject(); + ResultsBuilder addModelDebugOutput(ModelDebugOutput modelDebugOutput) { + results.add(new AutodetectResult(null, null, null, null, null, null, modelDebugOutput, null, null)); return this; } - ResultsBuilder addModelDebugOutput(ModelDebugOutput modelDebugOutput) throws IOException { - contentBuilder.startObject().field(ModelDebugOutput.RESULTS_FIELD.getPreferredName(), modelDebugOutput).endObject(); + ResultsBuilder addModelSizeStats(ModelSizeStats modelSizeStats) { + results.add(new AutodetectResult(null, null, null, null, null, modelSizeStats, null, null, null)); return this; } - ResultsBuilder addModelSizeStats(ModelSizeStats modelSizeStats) throws IOException { - contentBuilder.startObject().field(ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName(), modelSizeStats).endObject(); + ResultsBuilder addModelSnapshot(ModelSnapshot modelSnapshot) { + results.add(new AutodetectResult(null, null, null, null, modelSnapshot, null, null, null, null)); return this; } - ResultsBuilder addModelSnapshot(ModelSnapshot modelSnapshot) throws IOException { - contentBuilder.startObject().field(ModelSnapshot.TYPE.getPreferredName(), modelSnapshot).endObject(); + ResultsBuilder addQuantiles(Quantiles quantiles) { + results.add(new AutodetectResult(null, null, null, quantiles, null, null, null, null, null)); return this; } - ResultsBuilder addQuantiles(Quantiles quantiles) throws IOException { - contentBuilder.startObject().field(Quantiles.TYPE.getPreferredName(), quantiles).endObject(); - return this; - } - - ResultsBuilder addFlushAcknowledgement(FlushAcknowledgement flushAcknowledgement) throws IOException { - contentBuilder.startObject().field(FlushAcknowledgement.TYPE.getPreferredName(), flushAcknowledgement).endObject(); + ResultsBuilder addFlushAcknowledgement(FlushAcknowledgement flushAcknowledgement) { + results.add(new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement)); return this; } - ResultsBuilder end() throws IOException { - contentBuilder.endArray(); - return this; - } - - XContentBuilder build() throws IOException { - XContentBuilder result = contentBuilder; - contentBuilder = XContentFactory.jsonBuilder(); - return result; + AutodetectProcess buildTestProcess() { + AutodetectResult[] results = this.results.toArray(new AutodetectResult[0]); + AutodetectProcess process = mock(AutodetectProcess.class); + when(process.readAutodetectResults()).thenReturn(Arrays.asList(results).iterator()); + return process; } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 1fec8222ea1..3b66bb4fac2 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; @@ -115,11 +114,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { } private AutodetectProcess mockAutodetectProcessWithOutputStream() throws IOException { - InputStream io = Mockito.mock(InputStream.class); - when(io.read(any(byte [].class))).thenReturn(-1); AutodetectProcess process = Mockito.mock(AutodetectProcess.class); - when(process.getProcessOutStream()).thenReturn(io); - when(process.getPersistStream()).thenReturn(io); when(process.isProcessAlive()).thenReturn(true); return process; } @@ -132,9 +127,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { return null; }).when(executorService).execute(any(Runnable.class)); DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class); - StateProcessor stateProcessor = mock(StateProcessor.class); - return new AutodetectCommunicator(executorService, createJobDetails(), autodetectProcess, dataCountsReporter, - autoDetectResultProcessor, stateProcessor, e -> {}); + return new AutodetectCommunicator(createJobDetails(), autodetectProcess, dataCountsReporter, autoDetectResultProcessor, e -> {}); } public void testWriteToJobInUse() throws IOException { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 076817b6c88..0ee30257b76 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -26,7 +27,6 @@ import org.elasticsearch.xpack.ml.job.metadata.Allocation; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; @@ -34,7 +34,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; -import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.junit.Before; import org.mockito.Mockito; @@ -45,12 +44,10 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Date; import java.util.HashSet; -import java.util.Iterator; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; -import java.util.stream.Stream; import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.doReturn; @@ -90,6 +87,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { jobManager = mock(JobManager.class); jobProvider = mock(JobProvider.class); jobResultsPersister = mock(JobResultsPersister.class); + when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(mock(JobResultsPersister.Builder.class)); jobDataCountsPersister = mock(JobDataCountsPersister.class); normalizerFactory = mock(NormalizerFactory.class); givenAllocationWithState(JobState.OPENED); @@ -152,28 +150,16 @@ public class AutodetectProcessManagerTests extends ESTestCase { ThreadPool threadPool = mock(ThreadPool.class); ThreadPool.Cancellable cancellable = mock(ThreadPool.Cancellable.class); when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(cancellable); - ExecutorService executorService = mock(ExecutorService.class); - doAnswer(invocation -> { - ((Runnable) invocation.getArguments()[0]).run(); - return null; - }).when(executorService).execute(any(Runnable.class)); - when(threadPool.executor(MlPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME)).thenReturn(executorService); - AutodetectResultsParser parser = mock(AutodetectResultsParser.class); - @SuppressWarnings("unchecked") - Stream stream = mock(Stream.class); - @SuppressWarnings("unchecked") - Iterator iterator = mock(Iterator.class); - when(stream.iterator()).thenReturn(iterator); - when(iterator.hasNext()).thenReturn(false); - when(parser.parseResults(any())).thenReturn(stream); + when(threadPool.executor(MlPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME)) + .thenReturn(EsExecutors.newDirectExecutorService()); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); when(autodetectProcess.isProcessAlive()).thenReturn(true); - when(autodetectProcess.getPersistStream()).thenReturn(new ByteArrayInputStream(new byte[0])); + when(autodetectProcess.readAutodetectResults()).thenReturn(Collections.emptyIterator()); AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess; Settings.Builder settings = Settings.builder(); settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3); AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider, - jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory)); + jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory)); DataCounts dataCounts = new DataCounts("foo"); ModelSnapshot modelSnapshot = new ModelSnapshot("foo"); @@ -296,7 +282,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); when(communicator.writeToJob(any(), any())).thenReturn(new DataCounts("foo")); AutodetectProcessManager manager = createManager(communicator); - givenAllocationWithState(JobState.OPENED); InputStream inputStream = createInputStream(""); @@ -310,8 +295,9 @@ public class AutodetectProcessManagerTests extends ESTestCase { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executorService = mock(ExecutorService.class); - doThrow(new EsRejectedExecutionException("")).when(executorService).execute(any()); + doThrow(new EsRejectedExecutionException("")).when(executorService).submit(any(Runnable.class)); when(threadPool.executor(anyString())).thenReturn(executorService); + when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(mock(ThreadPool.Cancellable.class)); when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id")); doAnswer(invocationOnMock -> { String jobId = (String) invocationOnMock.getArguments()[0]; @@ -321,11 +307,10 @@ public class AutodetectProcessManagerTests extends ESTestCase { return null; }).when(jobProvider).dataCounts(eq("my_id"), any(), any()); - AutodetectResultsParser parser = mock(AutodetectResultsParser.class); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess; AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, - jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory); + jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory); expectThrows(EsRejectedExecutionException.class, () -> manager.create("my_id", dataCounts, modelSnapshot, quantiles, filters, false, e -> {})); @@ -345,10 +330,9 @@ public class AutodetectProcessManagerTests extends ESTestCase { private AutodetectProcessManager createManager(AutodetectCommunicator communicator, Client client) { ThreadPool threadPool = mock(ThreadPool.class); - AutodetectResultsParser parser = mock(AutodetectResultsParser.class); AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class); AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, - jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory); + jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory); manager = spy(manager); doReturn(communicator).when(manager) .create(any(), eq(dataCounts), eq(modelSnapshot), eq(quantiles), eq(filters), anyBoolean(), any()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java index 982bd3b06a2..c499ac25e13 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java @@ -5,27 +5,21 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; +import java.util.Iterator; + public class BlackHoleAutodetectProcessTests extends ESTestCase { public void testFlushJob_writesAck() throws Exception { try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess()) { - String flushId = process.flushJob(InterimResultsParams.builder().build()); - - XContentParser parser = XContentFactory.xContent(XContentType.JSON) - .createParser(NamedXContentRegistry.EMPTY, process.getProcessOutStream()); - parser.nextToken(); // FlushAcknowledgementParser expects this to be - // called first - AutodetectResult result = AutodetectResult.PARSER.apply(parser, null); + Iterator iterator = process.readAutodetectResults(); + iterator.hasNext(); + AutodetectResult result = iterator.next(); FlushAcknowledgement ack = result.getFlushAcknowledgement(); assertEquals(flushId, ack.getId()); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 4df0e784fce..a498fad7a93 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -5,13 +5,16 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect; -import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter; import org.junit.Assert; +import org.junit.Before; import org.mockito.Mockito; import java.io.ByteArrayOutputStream; @@ -24,21 +27,35 @@ import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class NativeAutodetectProcessTests extends ESTestCase { private static final int NUMBER_ANALYSIS_FIELDS = 3; + private ExecutorService executorService; + + @Before + @SuppressWarnings("unchecked") + public void initialize() { + executorService = mock(ExecutorService.class); + when(executorService.submit(any(Runnable.class))).thenReturn(mock(Future.class)); + } + public void testProcessStartTime() throws Exception { InputStream logStream = Mockito.mock(InputStream.class); when(logStream.read(new byte[1024])).thenReturn(-1); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, - Mockito.mock(OutputStream.class), Mockito.mock(InputStream.class), Mockito.mock(InputStream.class), - NUMBER_ANALYSIS_FIELDS, null, EsExecutors.newDirectExecutorService())) { + Mockito.mock(OutputStream.class), Mockito.mock(InputStream.class), + NUMBER_ANALYSIS_FIELDS, null, new AutodetectResultsParser(Settings.EMPTY))) { + process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); ZonedDateTime startTime = process.getProcessStartTime(); Thread.sleep(500); @@ -56,8 +73,9 @@ public class NativeAutodetectProcessTests extends ESTestCase { String[] record = {"r1", "r2", "r3", "r4", "r5"}; ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, - bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class), - NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) { + bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), + new AutodetectResultsParser(Settings.EMPTY))) { + process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); process.writeRecord(record); process.flushStream(); @@ -87,8 +105,9 @@ public class NativeAutodetectProcessTests extends ESTestCase { when(logStream.read(new byte[1024])).thenReturn(-1); ByteArrayOutputStream bos = new ByteArrayOutputStream(ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH + 1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, - bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class), - NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) { + bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), + new AutodetectResultsParser(Settings.EMPTY))) { + process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); InterimResultsParams params = InterimResultsParams.builder().build(); process.flushJob(params); @@ -103,8 +122,9 @@ public class NativeAutodetectProcessTests extends ESTestCase { when(logStream.read(new byte[1024])).thenReturn(-1); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, - bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class), - NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) { + bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), + new AutodetectResultsParser(Settings.EMPTY))) { + process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), Optional.empty()); process.writeResetBucketsControlMessage(params); @@ -120,8 +140,9 @@ public class NativeAutodetectProcessTests extends ESTestCase { when(logStream.read(new byte[1024])).thenReturn(-1); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, - bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class), - NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) { + bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), + new AutodetectResultsParser(Settings.EMPTY))) { + process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); process.writeUpdateConfigMessage(""); process.flushStream(); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index ae945ff1a03..f4adc32e199 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; @@ -20,12 +21,10 @@ import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput; import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.mockito.InOrder; -import java.io.InputStream; import java.util.Arrays; import java.util.Date; import java.util.Iterator; import java.util.List; -import java.util.stream.Stream; import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; @@ -43,19 +42,15 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcess() { AutodetectResult autodetectResult = mock(AutodetectResult.class); @SuppressWarnings("unchecked") - Stream stream = mock(Stream.class); - @SuppressWarnings("unchecked") Iterator iterator = mock(Iterator.class); - when(stream.iterator()).thenReturn(iterator); when(iterator.hasNext()).thenReturn(true).thenReturn(false); when(iterator.next()).thenReturn(autodetectResult); - AutodetectResultsParser parser = mock(AutodetectResultsParser.class); - when(parser.parseResults(any())).thenReturn(stream); - + AutodetectProcess process = mock(AutodetectProcess.class); + when(process.readAutodetectResults()).thenReturn(iterator); Renormalizer renormalizer = mock(Renormalizer.class); JobResultsPersister persister = mock(JobResultsPersister.class); - AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, parser); - processor.process(mock(InputStream.class), randomBoolean()); + AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister); + processor.process(process, randomBoolean()); verify(renormalizer, times(1)).waitUntilIdle(); assertEquals(0, processor.completionLatch.getCount()); } @@ -196,7 +191,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); FlushListener flushListener = mock(FlushListener.class); - AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null, flushListener); + AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, flushListener); AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); context.deleteInterimRequired = false; @@ -218,7 +213,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister persister = mock(JobResultsPersister.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); FlushListener flushListener = mock(FlushListener.class); - AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null, flushListener); + AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, flushListener); AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); context.deleteInterimRequired = false; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParserTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParserTests.java index bcf1042f1bb..0c5871bb18a 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParserTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParserTests.java @@ -233,7 +233,7 @@ public class AutodetectResultsParserTests extends ESTestCase { InputStream inputStream = new ByteArrayInputStream(METRIC_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8)); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY); List results = new ArrayList<>(); - parser.parseResults(inputStream).iterator().forEachRemaining(results::add); + parser.parseResults(inputStream).forEachRemaining(results::add); List buckets = results.stream().map(AutodetectResult::getBucket) .filter(b -> b != null) .collect(Collectors.toList()); @@ -328,7 +328,7 @@ public class AutodetectResultsParserTests extends ESTestCase { InputStream inputStream = new ByteArrayInputStream(POPULATION_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8)); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY); List results = new ArrayList<>(); - parser.parseResults(inputStream).iterator().forEachRemaining(results::add); + parser.parseResults(inputStream).forEachRemaining(results::add); List buckets = results.stream().map(AutodetectResult::getBucket) .filter(b -> b != null) .collect(Collectors.toList()); @@ -355,7 +355,7 @@ public class AutodetectResultsParserTests extends ESTestCase { String json = "[]"; InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY); - assertFalse(parser.parseResults(inputStream).iterator().hasNext()); + assertFalse(parser.parseResults(inputStream).hasNext()); } public void testParse_GivenModelSizeStats() throws ElasticsearchParseException, IOException { @@ -364,7 +364,7 @@ public class AutodetectResultsParserTests extends ESTestCase { AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY); List results = new ArrayList<>(); - parser.parseResults(inputStream).iterator().forEachRemaining(results::add); + parser.parseResults(inputStream).forEachRemaining(results::add); assertEquals(1, results.size()); assertEquals(300, results.get(0).getModelSizeStats().getModelBytes()); @@ -375,7 +375,7 @@ public class AutodetectResultsParserTests extends ESTestCase { InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY); List results = new ArrayList<>(); - parser.parseResults(inputStream).iterator().forEachRemaining(results::add); + parser.parseResults(inputStream).forEachRemaining(results::add); assertEquals(1, results.size()); assertEquals(18, results.get(0).getCategoryDefinition().getCategoryId()); @@ -386,7 +386,7 @@ public class AutodetectResultsParserTests extends ESTestCase { InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> parser.parseResults(inputStream).iterator().forEachRemaining(a -> {})); + () -> parser.parseResults(inputStream).forEachRemaining(a -> {})); assertEquals("[autodetect_result] unknown field [unknown], parser not found", e.getMessage()); } @@ -395,7 +395,7 @@ public class AutodetectResultsParserTests extends ESTestCase { InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, - () -> parser.parseResults(inputStream).iterator().forEachRemaining(a -> {})); + () -> parser.parseResults(inputStream).forEachRemaining(a -> {})); assertEquals("unexpected token [START_ARRAY]", e.getMessage()); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java index ca424b6ab2c..6239246694c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java @@ -6,12 +6,12 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output; import com.carrotsearch.randomizedtesting.annotations.Timeout; +import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.junit.Before; import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -20,6 +20,9 @@ import java.util.List; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -41,16 +44,19 @@ public class StateProcessorTests extends ESTestCase { private static final int NUM_LARGE_DOCS = 2; private static final int LARGE_DOC_SIZE = 16000000; + private StateProcessor stateProcessor; + + @Before + public void initialize() { + stateProcessor = spy(new StateProcessor(Settings.EMPTY, mock(Client.class))); + doNothing().when(stateProcessor).persist(any(), any()); + } + public void testStateRead() throws IOException { ByteArrayInputStream stream = new ByteArrayInputStream(STATE_SAMPLE.getBytes(StandardCharsets.UTF_8)); - + stateProcessor.process("_id", stream); ArgumentCaptor bytesRefCaptor = ArgumentCaptor.forClass(BytesReference.class); - JobResultsPersister persister = Mockito.mock(JobResultsPersister.class); - - StateProcessor stateParser = new StateProcessor(Settings.EMPTY, persister); - stateParser.process("_id", stream); - - verify(persister, times(3)).persistBulkState(eq("_id"), bytesRefCaptor.capture()); + verify(stateProcessor, times(3)).persist(eq("_id"), bytesRefCaptor.capture()); String[] threeStates = STATE_SAMPLE.split("\0"); List capturedBytes = bytesRefCaptor.getAllValues(); @@ -68,7 +74,7 @@ public class StateProcessorTests extends ESTestCase { public void testLargeStateRead() throws Exception { StringBuilder builder = new StringBuilder(NUM_LARGE_DOCS * (LARGE_DOC_SIZE + 10)); // 10 for header and separators for (int docNum = 1; docNum <= NUM_LARGE_DOCS; ++docNum) { - builder.append("header").append(docNum).append("\n"); + builder.append("{\"index\":{\"_index\":\"header").append(docNum).append("\",\"_type\":\"type\"}}\n"); for (int count = 0; count < (LARGE_DOC_SIZE / "data".length()); ++count) { builder.append("data"); } @@ -76,12 +82,7 @@ public class StateProcessorTests extends ESTestCase { } ByteArrayInputStream stream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8)); - - JobResultsPersister persister = Mockito.mock(JobResultsPersister.class); - - StateProcessor stateParser = new StateProcessor(Settings.EMPTY, persister); - stateParser.process("_id", stream); - - verify(persister, times(NUM_LARGE_DOCS)).persistBulkState(eq("_id"), any()); + stateProcessor.process("_id", stream); + verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(eq("_id"), any()); } }