Simplified AutodetectProcess interface:

* Removed getPersistStream() method from this interface and let the NativeAutodetectProcess implementation deal with this. The persist stream is an implementation detail and BlackHoleAutodetectProcess doesn't deal with this too.
* Replaced getProcessOutStream() method with readAutodetectResults() method. This method now returns a `Iterator<AutodetectResult>` instead of an inputstream. This makes the BlackHoleAutodetectProcess and future mocked implementations easier.

Original commit: elastic/x-pack-elasticsearch@086e7b40ab
This commit is contained in:
Martijn van Groningen 2017-02-02 11:51:34 +01:00
parent 9d9572e2b2
commit 1b65366478
19 changed files with 251 additions and 380 deletions

View File

@ -78,7 +78,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessFactor
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess; import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.NativeAutodetectProcessFactory; import org.elasticsearch.xpack.ml.job.process.autodetect.NativeAutodetectProcessFactory;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.normalizer.MultiplyingNormalizerProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.MultiplyingNormalizerProcess;
import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
@ -214,7 +213,7 @@ public class MlPlugin extends Plugin implements ActionPlugin {
try { try {
NativeController nativeController = new NativeController(env, new NamedPipeHelper()); NativeController nativeController = new NativeController(env, new NamedPipeHelper());
nativeController.tailLogsInThread(); nativeController.tailLogsInThread();
autodetectProcessFactory = new NativeAutodetectProcessFactory(jobProvider, env, settings, nativeController); autodetectProcessFactory = new NativeAutodetectProcessFactory(jobProvider, env, settings, nativeController, client);
normalizerProcessFactory = new NativeNormalizerProcessFactory(env, settings, nativeController); normalizerProcessFactory = new NativeNormalizerProcessFactory(env, settings, nativeController);
} catch (IOException e) { } catch (IOException e) {
throw new ElasticsearchException("Failed to create native process factories", e); throw new ElasticsearchException("Failed to create native process factories", e);
@ -228,9 +227,8 @@ public class MlPlugin extends Plugin implements ActionPlugin {
} }
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
threadPool.executor(MlPlugin.THREAD_POOL_NAME)); threadPool.executor(MlPlugin.THREAD_POOL_NAME));
AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings);
AutodetectProcessManager dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider, AutodetectProcessManager dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, autodetectProcessFactory, normalizerFactory); jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory);
DatafeedJobRunner datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider, DatafeedJobRunner datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider,
System::currentTimeMillis); System::currentTimeMillis);
PersistentActionService persistentActionService = new PersistentActionService(Settings.EMPTY, clusterService, client); PersistentActionService persistentActionService = new PersistentActionService(Settings.EMPTY, clusterService, client);

View File

@ -13,7 +13,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -280,22 +279,6 @@ public class JobResultsPersister extends AbstractComponent {
// read again by this process // read again by this process
} }
/**
* Persist state sent from the native process
*/
public void persistBulkState(String jobId, BytesReference bytesRef) {
try {
// No validation - assume the native process has formatted the state correctly
byte[] bytes = bytesRef.toBytesRef().bytes;
logger.trace("[{}] ES API CALL: bulk index", jobId);
client.prepareBulk()
.add(bytes, 0, bytes.length)
.execute().actionGet();
} catch (Exception e) {
logger.error(new ParameterizedMessage("[{}] Error persisting bulk state", jobId), e);
}
}
/** /**
* Delete any existing interim results synchronously * Delete any existing interim results synchronously
*/ */

View File

@ -11,14 +11,12 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream; import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
@ -34,7 +32,6 @@ import java.time.Duration;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -52,23 +49,13 @@ public class AutodetectCommunicator implements Closeable {
final AtomicReference<CountDownLatch> inUse = new AtomicReference<>(); final AtomicReference<CountDownLatch> inUse = new AtomicReference<>();
public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process, public AutodetectCommunicator(Job job, AutodetectProcess process, DataCountsReporter dataCountsReporter,
DataCountsReporter dataCountsReporter, AutoDetectResultProcessor autoDetectResultProcessor, AutoDetectResultProcessor autoDetectResultProcessor, Consumer<Exception> handler) {
StateProcessor stateProcessor, Consumer<Exception> handler) {
this.job = job; this.job = job;
this.autodetectProcess = process; this.autodetectProcess = process;
this.dataCountsReporter = dataCountsReporter; this.dataCountsReporter = dataCountsReporter;
this.autoDetectResultProcessor = autoDetectResultProcessor; this.autoDetectResultProcessor = autoDetectResultProcessor;
this.handler = handler; this.handler = handler;
AnalysisConfig analysisConfig = job.getAnalysisConfig();
boolean usePerPartitionNormalization = analysisConfig.getUsePerPartitionNormalization();
autoDetectExecutor.execute(() ->
autoDetectResultProcessor.process(process.getProcessOutStream(), usePerPartitionNormalization)
);
autoDetectExecutor.execute(() ->
stateProcessor.process(job.getId(), process.getPersistStream())
);
} }
public void writeJobInputHeader() throws IOException { public void writeJobInputHeader() throws IOException {

View File

@ -7,11 +7,12 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Iterator;
/** /**
* Interface representing the native C++ autodetect process * Interface representing the native C++ autodetect process
@ -60,16 +61,9 @@ public interface AutodetectProcess extends Closeable {
void flushStream() throws IOException; void flushStream() throws IOException;
/** /**
* Autodetect's output stream * @return stream of autodetect results.
* @return output stream
*/ */
InputStream getProcessOutStream(); Iterator<AutodetectResult> readAutodetectResults();
/**
* Autodetect's state persistence stream
* @return persist stream
*/
InputStream getPersistStream();
/** /**
* The time the process was started * The time the process was started

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.MlPlugin;
@ -28,8 +29,6 @@ import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersiste
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
@ -66,11 +65,9 @@ public class AutodetectProcessManager extends AbstractComponent {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final JobManager jobManager; private final JobManager jobManager;
private final JobProvider jobProvider; private final JobProvider jobProvider;
private final AutodetectResultsParser parser;
private final AutodetectProcessFactory autodetectProcessFactory; private final AutodetectProcessFactory autodetectProcessFactory;
private final NormalizerFactory normalizerFactory; private final NormalizerFactory normalizerFactory;
private final StateProcessor stateProcessor;
private final JobResultsPersister jobResultsPersister; private final JobResultsPersister jobResultsPersister;
private final JobDataCountsPersister jobDataCountsPersister; private final JobDataCountsPersister jobDataCountsPersister;
@ -80,20 +77,18 @@ public class AutodetectProcessManager extends AbstractComponent {
public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager, public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager,
JobProvider jobProvider, JobResultsPersister jobResultsPersister, JobProvider jobProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser, JobDataCountsPersister jobDataCountsPersister,
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory) { AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory) {
super(settings); super(settings);
this.client = client; this.client = client;
this.threadPool = threadPool; this.threadPool = threadPool;
this.maxAllowedRunningJobs = MAX_RUNNING_JOBS_PER_NODE.get(settings); this.maxAllowedRunningJobs = MAX_RUNNING_JOBS_PER_NODE.get(settings);
this.parser = parser;
this.autodetectProcessFactory = autodetectProcessFactory; this.autodetectProcessFactory = autodetectProcessFactory;
this.normalizerFactory = normalizerFactory; this.normalizerFactory = normalizerFactory;
this.jobManager = jobManager; this.jobManager = jobManager;
this.jobProvider = jobProvider; this.jobProvider = jobProvider;
this.jobResultsPersister = jobResultsPersister; this.jobResultsPersister = jobResultsPersister;
this.stateProcessor = new StateProcessor(settings, jobResultsPersister);
this.jobDataCountsPersister = jobDataCountsPersister; this.jobDataCountsPersister = jobDataCountsPersister;
this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>(); this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>();
@ -227,26 +222,25 @@ public class AutodetectProcessManager extends AbstractComponent {
RestStatus.CONFLICT); RestStatus.CONFLICT);
} }
// TODO norelease, once we remove black hole process
// then we can remove this method and move not enough threads logic to the auto detect process factory
Job job = jobManager.getJobOrThrowIfUnknown(jobId); Job job = jobManager.getJobOrThrowIfUnknown(jobId);
// A TP with no queue, so that we fail immediately if there are no threads available // A TP with no queue, so that we fail immediately if there are no threads available
ExecutorService executorService = threadPool.executor(MlPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME); ExecutorService executorService = threadPool.executor(MlPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME);
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job.getId(), dataCounts, try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job.getId(), dataCounts,
jobDataCountsPersister)) { jobDataCountsPersister)) {
ScoresUpdater scoresUpdator = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client),
normalizerFactory); normalizerFactory);
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdator, Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater,
threadPool.executor(MlPlugin.THREAD_POOL_NAME), job.getAnalysisConfig().getUsePerPartitionNormalization()); threadPool.executor(MlPlugin.THREAD_POOL_NAME), job.getAnalysisConfig().getUsePerPartitionNormalization());
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(jobId, renormalizer, jobResultsPersister, parser);
AutodetectProcess process = null; AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, modelSnapshot, quantiles, filters,
ignoreDowntime, executorService);
boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization();
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(jobId, renormalizer, jobResultsPersister);
try { try {
process = autodetectProcessFactory.createAutodetectProcess(job, modelSnapshot, quantiles, filters, executorService.submit(() -> processor.process(process, usePerPartitionNormalization));
ignoreDowntime, executorService); } catch (EsRejectedExecutionException e) {
return new AutodetectCommunicator(executorService, job, process, dataCountsReporter, processor, stateProcessor, handler); // If submitting the operation to read the results from the process fails we need to close
} catch (Exception e) { // the process too, so that other submitted operations to threadpool are stopped.
try { try {
IOUtils.close(process); IOUtils.close(process);
} catch (IOException ioe) { } catch (IOException ioe) {
@ -254,6 +248,7 @@ public class AutodetectProcessManager extends AbstractComponent {
} }
throw e; throw e;
} }
return new AutodetectCommunicator(job, process, dataCountsReporter, processor, handler);
} }
} }

View File

@ -7,25 +7,22 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/** /**
* A placeholder class simulating the actions of the native Autodetect process. * A placeholder class simulating the actions of the native Autodetect process.
* Most methods consume data without performing any action however, after a call to * Most methods consume data without performing any action however, after a call to
* {@link #flushJob(InterimResultsParams)} a {@link org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement} * {@link #flushJob(InterimResultsParams)} a {@link org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement}
* message is expected on the {@link #getProcessOutStream()} stream. This class writes the flush * message is expected on the {@link #readAutodetectResults()} ()} stream. This class writes the flush
* acknowledgement immediately. * acknowledgement immediately.
*/ */
public class BlackHoleAutodetectProcess implements AutodetectProcess { public class BlackHoleAutodetectProcess implements AutodetectProcess {
@ -33,28 +30,11 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
private static final Logger LOGGER = Loggers.getLogger(BlackHoleAutodetectProcess.class); private static final Logger LOGGER = Loggers.getLogger(BlackHoleAutodetectProcess.class);
private static final String FLUSH_ID = "flush-1"; private static final String FLUSH_ID = "flush-1";
private final PipedInputStream processOutStream;
private final PipedInputStream persistStream;
private PipedOutputStream pipedProcessOutStream;
private PipedOutputStream pipedPersistStream;
private final ZonedDateTime startTime; private final ZonedDateTime startTime;
private final BlockingQueue<AutodetectResult> results = new ArrayBlockingQueue<>(128);
public BlackHoleAutodetectProcess() { public BlackHoleAutodetectProcess() {
processOutStream = new PipedInputStream();
persistStream = new PipedInputStream();
try {
// jackson tries to read the first 4 bytes:
// if we don't do this the autodetect communication would fail starting
pipedProcessOutStream = new PipedOutputStream(processOutStream);
pipedProcessOutStream.write(' ');
pipedProcessOutStream.write(' ');
pipedProcessOutStream.write(' ');
pipedProcessOutStream.write('[');
pipedProcessOutStream.flush();
pipedPersistStream = new PipedOutputStream(persistStream);
} catch (IOException e) {
LOGGER.error("Error connecting PipedOutputStream", e);
}
startTime = ZonedDateTime.now(); startTime = ZonedDateTime.now();
} }
@ -71,7 +51,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
} }
/** /**
* Accept the request do nothing with it but write the flush acknowledgement to {@link #getProcessOutStream()} * Accept the request do nothing with it but write the flush acknowledgement to {@link #readAutodetectResults()}
* @param params Should interim results be generated * @param params Should interim results be generated
* @return {@link #FLUSH_ID} * @return {@link #FLUSH_ID}
*/ */
@ -79,11 +59,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
public String flushJob(InterimResultsParams params) throws IOException { public String flushJob(InterimResultsParams params) throws IOException {
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID); FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID);
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement); AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement);
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent()); results.add(result);
builder.value(result);
pipedProcessOutStream.write(builder.string().getBytes(StandardCharsets.UTF_8));
pipedProcessOutStream.flush();
pipedProcessOutStream.write(',');
return FLUSH_ID; return FLUSH_ID;
} }
@ -93,22 +69,30 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
pipedProcessOutStream.write('{');
pipedProcessOutStream.write('}');
pipedProcessOutStream.write(']');
pipedProcessOutStream.flush();
pipedProcessOutStream.close();
pipedPersistStream.close();
} }
@Override @Override
public InputStream getProcessOutStream() { public Iterator<AutodetectResult> readAutodetectResults() {
return processOutStream; // Create a custom iterator here, because ArrayBlockingQueue iterator and stream are not blocking when empty:
} return new Iterator<AutodetectResult>() {
@Override AutodetectResult result;
public InputStream getPersistStream() {
return persistStream; @Override
public boolean hasNext() {
try {
result = results.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return true;
}
@Override
public AutodetectResult next() {
return result;
}
};
} }
@Override @Override

View File

@ -8,12 +8,14 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler; import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter;
import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
@ -23,6 +25,7 @@ import java.io.OutputStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -40,25 +43,28 @@ class NativeAutodetectProcess implements AutodetectProcess {
private final CppLogMessageHandler cppLogHandler; private final CppLogMessageHandler cppLogHandler;
private final OutputStream processInStream; private final OutputStream processInStream;
private final InputStream processOutStream; private final InputStream processOutStream;
private final InputStream persistStream;
private final LengthEncodedWriter recordWriter; private final LengthEncodedWriter recordWriter;
private final ZonedDateTime startTime; private final ZonedDateTime startTime;
private final int numberOfAnalysisFields; private final int numberOfAnalysisFields;
private final List<Path> filesToDelete; private final List<Path> filesToDelete;
private Future<?> logTailFuture; private Future<?> logTailFuture;
private Future<?> stateProcessorFuture;
private AutodetectResultsParser resultsParser;
NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
InputStream persistStream, int numberOfAnalysisFields, List<Path> filesToDelete, int numberOfAnalysisFields, List<Path> filesToDelete, AutodetectResultsParser resultsParser) {
ExecutorService executorService) throws EsRejectedExecutionException {
this.jobId = jobId; this.jobId = jobId;
cppLogHandler = new CppLogMessageHandler(jobId, logStream); cppLogHandler = new CppLogMessageHandler(jobId, logStream);
this.processInStream = new BufferedOutputStream(processInStream); this.processInStream = new BufferedOutputStream(processInStream);
this.processOutStream = processOutStream; this.processOutStream = processOutStream;
this.persistStream = persistStream;
this.recordWriter = new LengthEncodedWriter(this.processInStream); this.recordWriter = new LengthEncodedWriter(this.processInStream);
startTime = ZonedDateTime.now(); startTime = ZonedDateTime.now();
this.numberOfAnalysisFields = numberOfAnalysisFields; this.numberOfAnalysisFields = numberOfAnalysisFields;
this.filesToDelete = filesToDelete; this.filesToDelete = filesToDelete;
this.resultsParser = resultsParser;
}
public void start(ExecutorService executorService, StateProcessor stateProcessor, InputStream persistStream) {
logTailFuture = executorService.submit(() -> { logTailFuture = executorService.submit(() -> {
try (CppLogMessageHandler h = cppLogHandler) { try (CppLogMessageHandler h = cppLogHandler) {
h.tailStream(); h.tailStream();
@ -66,6 +72,9 @@ class NativeAutodetectProcess implements AutodetectProcess {
LOGGER.error(new ParameterizedMessage("[{}] Error tailing C++ process logs", new Object[] { jobId }), e); LOGGER.error(new ParameterizedMessage("[{}] Error tailing C++ process logs", new Object[] { jobId }), e);
} }
}); });
stateProcessorFuture = executorService.submit(() -> {
stateProcessor.process(jobId, persistStream);
});
} }
@Override @Override
@ -105,6 +114,8 @@ class NativeAutodetectProcess implements AutodetectProcess {
// wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger // wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger
// this may take a long time as it persists the model state // this may take a long time as it persists the model state
logTailFuture.get(30, TimeUnit.MINUTES); logTailFuture.get(30, TimeUnit.MINUTES);
// the state processor should have stopped by now as the process should have exit
stateProcessorFuture.get(1, TimeUnit.SECONDS);
if (cppLogHandler.seenFatalError()) { if (cppLogHandler.seenFatalError()) {
throw ExceptionsHelper.serverError(cppLogHandler.getErrors()); throw ExceptionsHelper.serverError(cppLogHandler.getErrors());
} }
@ -135,13 +146,8 @@ class NativeAutodetectProcess implements AutodetectProcess {
} }
@Override @Override
public InputStream getProcessOutStream() { public Iterator<AutodetectResult> readAutodetectResults() {
return processOutStream; return resultsParser.parseResults(processOutStream);
}
@Override
public InputStream getPersistStream() {
return persistStream;
} }
@Override @Override

View File

@ -7,18 +7,21 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.NativeController; import org.elasticsearch.xpack.ml.job.process.NativeController;
import org.elasticsearch.xpack.ml.job.process.ProcessCtrl; import org.elasticsearch.xpack.ml.job.process.ProcessCtrl;
import org.elasticsearch.xpack.ml.job.process.ProcessPipes; import org.elasticsearch.xpack.ml.job.process.ProcessPipes;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
@ -39,16 +42,19 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(2); private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(2);
private final Client client;
private final Environment env; private final Environment env;
private final Settings settings; private final Settings settings;
private final JobProvider jobProvider; private final JobProvider jobProvider;
private final NativeController nativeController; private final NativeController nativeController;
public NativeAutodetectProcessFactory(JobProvider jobProvider, Environment env, Settings settings, NativeController nativeController) { public NativeAutodetectProcessFactory(JobProvider jobProvider, Environment env, Settings settings,
NativeController nativeController, Client client) {
this.env = Objects.requireNonNull(env); this.env = Objects.requireNonNull(env);
this.settings = Objects.requireNonNull(settings); this.settings = Objects.requireNonNull(settings);
this.jobProvider = Objects.requireNonNull(jobProvider); this.jobProvider = Objects.requireNonNull(jobProvider);
this.nativeController = Objects.requireNonNull(nativeController); this.nativeController = Objects.requireNonNull(nativeController);
this.client = client;
} }
@Override @Override
@ -60,11 +66,14 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
createNativeProcess(job, quantiles, filters, processPipes, ignoreDowntime, filesToDelete); createNativeProcess(job, quantiles, filters, processPipes, ignoreDowntime, filesToDelete);
int numberOfAnalysisFields = job.getAnalysisConfig().analysisFields().size(); int numberOfAnalysisFields = job.getAnalysisConfig().analysisFields().size();
NativeAutodetectProcess autodetect = null; StateProcessor stateProcessor = new StateProcessor(settings, client);
AutodetectResultsParser resultsParser = new AutodetectResultsParser(settings);
NativeAutodetectProcess autodetect = new NativeAutodetectProcess(
job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),
processPipes.getProcessOutStream().get(), numberOfAnalysisFields, filesToDelete, resultsParser
);
try { try {
autodetect = new NativeAutodetectProcess(job.getId(), processPipes.getLogStream().get(), autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get());
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(),
processPipes.getPersistStream().get(), numberOfAnalysisFields, filesToDelete, executorService);
if (modelSnapshot != null) { if (modelSnapshot != null) {
// TODO (norelease): I don't think we should do this in the background. If this happens then we should wait // TODO (norelease): I don't think we should do this in the background. If this happens then we should wait
// until restore it is done before we can accept data. // until restore it is done before we can accept data.

View File

@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
@ -21,16 +22,14 @@ import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput; import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import java.io.InputStream;
import java.time.Duration; import java.time.Duration;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;
/** /**
* A runnable class that reads the autodetect process output in the * A runnable class that reads the autodetect process output in the
* {@link #process(InputStream, boolean)} method and persists parsed * {@link #process(AutodetectProcess, boolean)} method and persists parsed
* results via the {@linkplain JobResultsPersister} passed in the constructor. * results via the {@linkplain JobResultsPersister} passed in the constructor.
* <p> * <p>
* Has methods to register and remove alert observers. * Has methods to register and remove alert observers.
@ -52,35 +51,31 @@ public class AutoDetectResultProcessor {
private final String jobId; private final String jobId;
private final Renormalizer renormalizer; private final Renormalizer renormalizer;
private final JobResultsPersister persister; private final JobResultsPersister persister;
private final AutodetectResultsParser parser;
final CountDownLatch completionLatch = new CountDownLatch(1); final CountDownLatch completionLatch = new CountDownLatch(1);
private final FlushListener flushListener; private final FlushListener flushListener;
private volatile ModelSizeStats latestModelSizeStats; private volatile ModelSizeStats latestModelSizeStats;
public AutoDetectResultProcessor(String jobId, Renormalizer renormalizer, JobResultsPersister persister, public AutoDetectResultProcessor(String jobId, Renormalizer renormalizer, JobResultsPersister persister) {
AutodetectResultsParser parser) { this(jobId, renormalizer, persister, new FlushListener());
this(jobId, renormalizer, persister, parser, new FlushListener());
} }
AutoDetectResultProcessor(String jobId, Renormalizer renormalizer, JobResultsPersister persister, AutodetectResultsParser parser, AutoDetectResultProcessor(String jobId, Renormalizer renormalizer, JobResultsPersister persister, FlushListener flushListener) {
FlushListener flushListener) {
this.jobId = jobId; this.jobId = jobId;
this.renormalizer = renormalizer; this.renormalizer = renormalizer;
this.persister = persister; this.persister = persister;
this.parser = parser;
this.flushListener = flushListener; this.flushListener = flushListener;
ModelSizeStats.Builder builder = new ModelSizeStats.Builder(jobId); ModelSizeStats.Builder builder = new ModelSizeStats.Builder(jobId);
latestModelSizeStats = builder.build(); latestModelSizeStats = builder.build();
} }
public void process(InputStream in, boolean isPerPartitionNormalization) { public void process(AutodetectProcess process, boolean isPerPartitionNormalization) {
try (Stream<AutodetectResult> stream = parser.parseResults(in)) { Context context = new Context(jobId, isPerPartitionNormalization, persister.bulkPersisterBuilder(jobId));
try {
int bucketCount = 0; int bucketCount = 0;
Iterator<AutodetectResult> iterator = stream.iterator(); Iterator<AutodetectResult> iterator = process.readAutodetectResults();
Context context = new Context(jobId, isPerPartitionNormalization, persister.bulkPersisterBuilder(jobId));
while (iterator.hasNext()) { while (iterator.hasNext()) {
AutodetectResult result = iterator.next(); AutodetectResult result = iterator.next();
processResult(context, result); processResult(context, result);
@ -89,7 +84,6 @@ public class AutoDetectResultProcessor {
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount); LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
} }
} }
context.bulkResultsPersister.executeRequest(); context.bulkResultsPersister.executeRequest();
LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount); LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount);
LOGGER.info("[{}] Parse results Complete", jobId); LOGGER.info("[{}] Parse results Complete", jobId);

View File

@ -17,10 +17,6 @@ import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Iterator; import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/** /**
@ -35,7 +31,7 @@ public class AutodetectResultsParser extends AbstractComponent {
super(settings); super(settings);
} }
public Stream<AutodetectResult> parseResults(InputStream in) throws ElasticsearchParseException { public Iterator<AutodetectResult> parseResults(InputStream in) throws ElasticsearchParseException {
try { try {
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, in); XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, in);
XContentParser.Token token = parser.nextToken(); XContentParser.Token token = parser.nextToken();
@ -43,9 +39,7 @@ public class AutodetectResultsParser extends AbstractComponent {
if (token != XContentParser.Token.START_ARRAY) { if (token != XContentParser.Token.START_ARRAY) {
throw new ElasticsearchParseException("unexpected token [" + token + "]"); throw new ElasticsearchParseException("unexpected token [" + token + "]");
} }
Spliterator<AutodetectResult> spliterator = Spliterators.spliterator(new AutodetectResultIterator(parser), Long.MAX_VALUE, 0); return new AutodetectResultIterator(in, parser);
return StreamSupport.stream(spliterator, false)
.onClose(() -> consumeAndCloseStream(in));
} catch (IOException e) { } catch (IOException e) {
consumeAndCloseStream(in); consumeAndCloseStream(in);
throw new ElasticsearchParseException(e.getMessage(), e); throw new ElasticsearchParseException(e.getMessage(), e);
@ -70,10 +64,12 @@ public class AutodetectResultsParser extends AbstractComponent {
private class AutodetectResultIterator implements Iterator<AutodetectResult> { private class AutodetectResultIterator implements Iterator<AutodetectResult> {
private final InputStream in;
private final XContentParser parser; private final XContentParser parser;
private XContentParser.Token token; private XContentParser.Token token;
private AutodetectResultIterator(XContentParser parser) { private AutodetectResultIterator(InputStream in, XContentParser parser) {
this.in = in;
this.parser = parser; this.parser = parser;
token = parser.currentToken(); token = parser.currentToken();
} }
@ -86,6 +82,7 @@ public class AutodetectResultsParser extends AbstractComponent {
throw new ElasticsearchParseException(e.getMessage(), e); throw new ElasticsearchParseException(e.getMessage(), e);
} }
if (token == XContentParser.Token.END_ARRAY) { if (token == XContentParser.Token.END_ARRAY) {
consumeAndCloseStream(in);
return false; return false;
} else if (token != XContentParser.Token.START_OBJECT) { } else if (token != XContentParser.Token.START_OBJECT) {
logger.error("Expecting Json Field name token after the Start Object token"); logger.error("Expecting Json Field name token after the Start Object token");

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.output; package org.elasticsearch.xpack.ml.job.process.autodetect.output;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference;
@ -22,11 +24,11 @@ import java.io.InputStream;
public class StateProcessor extends AbstractComponent { public class StateProcessor extends AbstractComponent {
private static final int READ_BUF_SIZE = 8192; private static final int READ_BUF_SIZE = 8192;
private final JobResultsPersister persister; private final Client client;
public StateProcessor(Settings settings, JobResultsPersister persister) { public StateProcessor(Settings settings, Client client) {
super(settings); super(settings);
this.persister = persister; this.client = client;
} }
public void process(String jobId, InputStream in) { public void process(String jobId, InputStream in) {
@ -64,7 +66,8 @@ public class StateProcessor extends AbstractComponent {
// No more zero bytes in this block // No more zero bytes in this block
break; break;
} }
persister.persistBulkState(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom)); // No validation - assume the native process has formatted the state correctly
persist(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom));
splitFrom = nextZeroByte + 1; splitFrom = nextZeroByte + 1;
} }
if (splitFrom >= bytesRef.length()) { if (splitFrom >= bytesRef.length()) {
@ -73,6 +76,17 @@ public class StateProcessor extends AbstractComponent {
return bytesRef.slice(splitFrom, bytesRef.length() - splitFrom); return bytesRef.slice(splitFrom, bytesRef.length() - splitFrom);
} }
void persist(String jobId, BytesReference bytes) {
try {
logger.trace("[{}] ES API CALL: bulk index", jobId);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bytes, null, null);
client.bulk(bulkRequest).actionGet();
} catch (Exception e) {
logger.error(new ParameterizedMessage("[{}] Error persisting bulk state", jobId), e);
}
}
private static int findNextZeroByte(BytesReference bytesRef, int searchFrom, int splitFrom) { private static int findNextZeroByte(BytesReference bytesRef, int searchFrom, int splitFrom) {
for (int i = Math.max(searchFrom, splitFrom); i < bytesRef.length(); ++i) { for (int i = Math.max(searchFrom, splitFrom); i < bytesRef.length(); ++i) {
if (bytesRef.get(i) == 0) { if (bytesRef.get(i) == 0) {

View File

@ -8,8 +8,6 @@ package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
@ -20,8 +18,8 @@ import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
@ -29,6 +27,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.process.normalizer.noop.NoOpRenormalizer; import org.elasticsearch.xpack.ml.job.process.normalizer.noop.NoOpRenormalizer;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.BucketTests; import org.elasticsearch.xpack.ml.job.results.BucketTests;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
@ -38,77 +37,59 @@ import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.ml.job.results.ModelDebugOutputTests; import org.elasticsearch.xpack.ml.job.results.ModelDebugOutputTests;
import org.junit.Before; import org.junit.Before;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
private static final String JOB_ID = "foo"; private static final String JOB_ID = "foo";
private Renormalizer renormalizer; private Renormalizer renormalizer;
private JobResultsPersister jobResultsPersister; private JobResultsPersister jobResultsPersister;
private AutodetectResultsParser autodetectResultsParser;
private JobProvider jobProvider; private JobProvider jobProvider;
@Before @Before
public void createComponents() { public void createComponents() {
renormalizer = new NoOpRenormalizer(); renormalizer = new NoOpRenormalizer();
jobResultsPersister = new JobResultsPersister(nodeSettings(), client()); jobResultsPersister = new JobResultsPersister(nodeSettings(), client());
autodetectResultsParser = new AutodetectResultsParser(nodeSettings());
jobProvider = new JobProvider(client(), 1); jobProvider = new JobProvider(client(), 1);
} }
public void testProcessResults() throws Exception { public void testProcessResults() throws Exception {
createJob(); createJob();
AutoDetectResultProcessor resultProcessor = new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister);
AutoDetectResultProcessor resultProcessor = ResultsBuilder builder = new ResultsBuilder();
new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
Bucket bucket = createBucket(false); Bucket bucket = createBucket(false);
assertNotNull(bucket); builder.addBucket(bucket);
List<AnomalyRecord> records = createRecords(false); List<AnomalyRecord> records = createRecords(false);
builder.addRecords(records);
List<Influencer> influencers = createInfluencers(false); List<Influencer> influencers = createInfluencers(false);
builder.addInfluencers(influencers);
CategoryDefinition categoryDefinition = createCategoryDefinition(); CategoryDefinition categoryDefinition = createCategoryDefinition();
builder.addCategoryDefinition(categoryDefinition);
ModelDebugOutput modelDebugOutput = createModelDebugOutput(); ModelDebugOutput modelDebugOutput = createModelDebugOutput();
builder.addModelDebugOutput(modelDebugOutput);
ModelSizeStats modelSizeStats = createModelSizeStats(); ModelSizeStats modelSizeStats = createModelSizeStats();
builder.addModelSizeStats(modelSizeStats);
ModelSnapshot modelSnapshot = createModelSnapshot(); ModelSnapshot modelSnapshot = createModelSnapshot();
builder.addModelSnapshot(modelSnapshot);
Quantiles quantiles = createQuantiles(); Quantiles quantiles = createQuantiles();
builder.addQuantiles(quantiles);
// Add the bucket last as the bucket result triggers persistence resultProcessor.process(builder.buildTestProcess(), false);
ResultsBuilder resultBuilder = new ResultsBuilder()
.start()
.addRecords(records)
.addInfluencers(influencers)
.addCategoryDefinition(categoryDefinition)
.addModelDebugOutput(modelDebugOutput)
.addModelSizeStats(modelSizeStats)
.addModelSnapshot(modelSnapshot)
.addQuantiles(quantiles)
.addBucket(bucket)
.end();
new Thread(() -> {
try {
writeResults(resultBuilder.build(), outputStream);
} catch (IOException e) {
}
}).start();
resultProcessor.process(inputStream, false);
jobResultsPersister.commitResultWrites(JOB_ID); jobResultsPersister.commitResultWrites(JOB_ID);
BucketsQueryBuilder.BucketsQuery bucketsQuery = new BucketsQueryBuilder().includeInterim(true).build(); BucketsQueryBuilder.BucketsQuery bucketsQuery = new BucketsQueryBuilder().includeInterim(true).build();
@ -125,7 +106,8 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
QueryPage<Influencer> persistedInfluencers = getInfluencers(); QueryPage<Influencer> persistedInfluencers = getInfluencers();
assertResultsAreSame(influencers, persistedInfluencers); assertResultsAreSame(influencers, persistedInfluencers);
QueryPage<CategoryDefinition> persistedDefinition = getCategoryDefinition(Long.toString(categoryDefinition.getCategoryId())); QueryPage<CategoryDefinition> persistedDefinition =
getCategoryDefinition(Long.toString(categoryDefinition.getCategoryId()));
assertEquals(1, persistedDefinition.count()); assertEquals(1, persistedDefinition.count());
assertEquals(categoryDefinition, persistedDefinition.results().get(0)); assertEquals(categoryDefinition, persistedDefinition.results().get(0));
@ -147,33 +129,18 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
public void testDeleteInterimResults() throws Exception { public void testDeleteInterimResults() throws Exception {
createJob(); createJob();
AutoDetectResultProcessor resultProcessor = new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister);
AutoDetectResultProcessor resultProcessor =
new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
Bucket nonInterimBucket = createBucket(false); Bucket nonInterimBucket = createBucket(false);
Bucket interimBucket = createBucket(true); Bucket interimBucket = createBucket(true);
ResultsBuilder resultBuilder = new ResultsBuilder() ResultsBuilder resultBuilder = new ResultsBuilder()
.start()
.addRecords(createRecords(true)) .addRecords(createRecords(true))
.addInfluencers(createInfluencers(true)) .addInfluencers(createInfluencers(true))
.addBucket(interimBucket) // this will persist the interim results .addBucket(interimBucket) // this will persist the interim results
.addFlushAcknowledgement(createFlushAcknowledgement()) .addFlushAcknowledgement(createFlushAcknowledgement())
.addBucket(nonInterimBucket) // and this will delete the interim results .addBucket(nonInterimBucket); // and this will delete the interim results
.end();
new Thread(() -> { resultProcessor.process(resultBuilder.buildTestProcess(), false);
try {
writeResults(resultBuilder.build(), outputStream);
} catch (IOException e) {
}
}).start();
resultProcessor.process(inputStream, false);
jobResultsPersister.commitResultWrites(JOB_ID); jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build()); QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());
@ -192,18 +159,11 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
public void testMultipleFlushesBetweenPersisting() throws Exception { public void testMultipleFlushesBetweenPersisting() throws Exception {
createJob(); createJob();
AutoDetectResultProcessor resultProcessor = new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister);
AutoDetectResultProcessor resultProcessor =
new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
Bucket finalBucket = createBucket(true); Bucket finalBucket = createBucket(true);
List<AnomalyRecord> finalAnomalyRecords = createRecords(true); List<AnomalyRecord> finalAnomalyRecords = createRecords(true);
ResultsBuilder resultBuilder = new ResultsBuilder() ResultsBuilder resultBuilder = new ResultsBuilder()
.start()
.addRecords(createRecords(true)) .addRecords(createRecords(true))
.addInfluencers(createInfluencers(true)) .addInfluencers(createInfluencers(true))
.addBucket(createBucket(true)) // this will persist the interim results .addBucket(createBucket(true)) // this will persist the interim results
@ -212,17 +172,9 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
.addBucket(createBucket(true)) // and this will delete the interim results and persist the new interim bucket & records .addBucket(createBucket(true)) // and this will delete the interim results and persist the new interim bucket & records
.addFlushAcknowledgement(createFlushAcknowledgement()) .addFlushAcknowledgement(createFlushAcknowledgement())
.addRecords(finalAnomalyRecords) .addRecords(finalAnomalyRecords)
.addBucket(finalBucket) // this deletes the previous interim and persists final bucket & records .addBucket(finalBucket); // this deletes the previous interim and persists final bucket & records
.end();
new Thread(() -> { resultProcessor.process(resultBuilder.buildTestProcess(), false);
try {
writeResults(resultBuilder.build(), outputStream);
} catch (IOException e) {
}
}).start();
resultProcessor.process(inputStream, false);
jobResultsPersister.commitResultWrites(JOB_ID); jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build()); QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());
@ -238,32 +190,17 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
public void testEndOfStreamTriggersPersisting() throws Exception { public void testEndOfStreamTriggersPersisting() throws Exception {
createJob(); createJob();
AutoDetectResultProcessor resultProcessor = new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister);
AutoDetectResultProcessor resultProcessor =
new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
Bucket bucket = createBucket(false); Bucket bucket = createBucket(false);
List<AnomalyRecord> firstSetOfRecords = createRecords(false); List<AnomalyRecord> firstSetOfRecords = createRecords(false);
List<AnomalyRecord> secondSetOfRecords = createRecords(false); List<AnomalyRecord> secondSetOfRecords = createRecords(false);
ResultsBuilder resultBuilder = new ResultsBuilder() ResultsBuilder resultBuilder = new ResultsBuilder()
.start()
.addRecords(firstSetOfRecords) .addRecords(firstSetOfRecords)
.addBucket(bucket) // bucket triggers persistence .addBucket(bucket) // bucket triggers persistence
.addRecords(secondSetOfRecords) .addRecords(secondSetOfRecords);
.end(); // end of stream should persist the second bunch of records
new Thread(() -> { resultProcessor.process(resultBuilder.buildTestProcess(), false);
try {
writeResults(resultBuilder.build(), outputStream);
} catch (IOException e) {
}
}).start();
resultProcessor.process(inputStream, false);
jobResultsPersister.commitResultWrites(JOB_ID); jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build()); QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());
@ -275,10 +212,6 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
assertResultsAreSame(allRecords, persistedRecords); assertResultsAreSame(allRecords, persistedRecords);
} }
private void writeResults(XContentBuilder builder, OutputStream out) throws IOException {
builder.bytes().writeTo(out);
}
private void createJob() { private void createJob() {
Detector.Builder detectorBuilder = new Detector.Builder("avg", "metric_field"); Detector.Builder detectorBuilder = new Detector.Builder("avg", "metric_field");
detectorBuilder.setByFieldName("by_instance"); detectorBuilder.setByFieldName("by_instance");
@ -369,72 +302,61 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
} }
private class ResultsBuilder { private class ResultsBuilder {
private XContentBuilder contentBuilder;
private ResultsBuilder() throws IOException { private List<AutodetectResult> results = new ArrayList<>();
contentBuilder = XContentFactory.jsonBuilder(); FlushAcknowledgement flushAcknowledgement;
}
ResultsBuilder start() throws IOException { ResultsBuilder addBucket(Bucket bucket) {
contentBuilder.startArray(); results.add(new AutodetectResult(Objects.requireNonNull(bucket), null, null, null, null, null, null, null, null));
return this; return this;
} }
ResultsBuilder addBucket(Bucket bucket) throws IOException { ResultsBuilder addRecords(List<AnomalyRecord> records) {
contentBuilder.startObject().field(Bucket.RESULT_TYPE_FIELD.getPreferredName(), bucket).endObject(); results.add(new AutodetectResult(null, records, null, null, null, null, null, null, null));
return this; return this;
} }
ResultsBuilder addRecords(List<AnomalyRecord> records) throws IOException { ResultsBuilder addInfluencers(List<Influencer> influencers) {
contentBuilder.startObject().field(AnomalyRecord.RESULTS_FIELD.getPreferredName(), records).endObject(); results.add(new AutodetectResult(null, null, influencers, null, null, null, null, null, null));
return this; return this;
} }
ResultsBuilder addInfluencers(List<Influencer> influencers) throws IOException { ResultsBuilder addCategoryDefinition(CategoryDefinition categoryDefinition) {
contentBuilder.startObject().field(Influencer.RESULTS_FIELD.getPreferredName(), influencers).endObject(); results.add(new AutodetectResult(null, null, null, null, null, null, null, categoryDefinition, null));
return this; return this;
} }
ResultsBuilder addCategoryDefinition(CategoryDefinition definition) throws IOException { ResultsBuilder addModelDebugOutput(ModelDebugOutput modelDebugOutput) {
contentBuilder.startObject().field(CategoryDefinition.TYPE.getPreferredName(), definition).endObject(); results.add(new AutodetectResult(null, null, null, null, null, null, modelDebugOutput, null, null));
return this; return this;
} }
ResultsBuilder addModelDebugOutput(ModelDebugOutput modelDebugOutput) throws IOException { ResultsBuilder addModelSizeStats(ModelSizeStats modelSizeStats) {
contentBuilder.startObject().field(ModelDebugOutput.RESULTS_FIELD.getPreferredName(), modelDebugOutput).endObject(); results.add(new AutodetectResult(null, null, null, null, null, modelSizeStats, null, null, null));
return this; return this;
} }
ResultsBuilder addModelSizeStats(ModelSizeStats modelSizeStats) throws IOException { ResultsBuilder addModelSnapshot(ModelSnapshot modelSnapshot) {
contentBuilder.startObject().field(ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName(), modelSizeStats).endObject(); results.add(new AutodetectResult(null, null, null, null, modelSnapshot, null, null, null, null));
return this; return this;
} }
ResultsBuilder addModelSnapshot(ModelSnapshot modelSnapshot) throws IOException { ResultsBuilder addQuantiles(Quantiles quantiles) {
contentBuilder.startObject().field(ModelSnapshot.TYPE.getPreferredName(), modelSnapshot).endObject(); results.add(new AutodetectResult(null, null, null, quantiles, null, null, null, null, null));
return this; return this;
} }
ResultsBuilder addQuantiles(Quantiles quantiles) throws IOException { ResultsBuilder addFlushAcknowledgement(FlushAcknowledgement flushAcknowledgement) {
contentBuilder.startObject().field(Quantiles.TYPE.getPreferredName(), quantiles).endObject(); results.add(new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement));
return this;
}
ResultsBuilder addFlushAcknowledgement(FlushAcknowledgement flushAcknowledgement) throws IOException {
contentBuilder.startObject().field(FlushAcknowledgement.TYPE.getPreferredName(), flushAcknowledgement).endObject();
return this; return this;
} }
ResultsBuilder end() throws IOException { AutodetectProcess buildTestProcess() {
contentBuilder.endArray(); AutodetectResult[] results = this.results.toArray(new AutodetectResult[0]);
return this; AutodetectProcess process = mock(AutodetectProcess.class);
} when(process.readAutodetectResults()).thenReturn(Arrays.asList(results).iterator());
return process;
XContentBuilder build() throws IOException {
XContentBuilder result = contentBuilder;
contentBuilder = XContentFactory.jsonBuilder();
return result;
} }
} }

View File

@ -14,7 +14,6 @@ import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
@ -115,11 +114,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
} }
private AutodetectProcess mockAutodetectProcessWithOutputStream() throws IOException { private AutodetectProcess mockAutodetectProcessWithOutputStream() throws IOException {
InputStream io = Mockito.mock(InputStream.class);
when(io.read(any(byte [].class))).thenReturn(-1);
AutodetectProcess process = Mockito.mock(AutodetectProcess.class); AutodetectProcess process = Mockito.mock(AutodetectProcess.class);
when(process.getProcessOutStream()).thenReturn(io);
when(process.getPersistStream()).thenReturn(io);
when(process.isProcessAlive()).thenReturn(true); when(process.isProcessAlive()).thenReturn(true);
return process; return process;
} }
@ -132,9 +127,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
return null; return null;
}).when(executorService).execute(any(Runnable.class)); }).when(executorService).execute(any(Runnable.class));
DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class); DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class);
StateProcessor stateProcessor = mock(StateProcessor.class); return new AutodetectCommunicator(createJobDetails(), autodetectProcess, dataCountsReporter, autoDetectResultProcessor, e -> {});
return new AutodetectCommunicator(executorService, createJobDetails(), autodetectProcess, dataCountsReporter,
autoDetectResultProcessor, stateProcessor, e -> {});
} }
public void testWriteToJobInUse() throws IOException { public void testWriteToJobInUse() throws IOException {

View File

@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -26,7 +27,6 @@ import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
@ -34,7 +34,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.junit.Before; import org.junit.Before;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -45,12 +44,10 @@ import java.nio.charset.StandardCharsets;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Stream;
import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.elasticsearch.mock.orig.Mockito.doReturn; import static org.elasticsearch.mock.orig.Mockito.doReturn;
@ -90,6 +87,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
jobManager = mock(JobManager.class); jobManager = mock(JobManager.class);
jobProvider = mock(JobProvider.class); jobProvider = mock(JobProvider.class);
jobResultsPersister = mock(JobResultsPersister.class); jobResultsPersister = mock(JobResultsPersister.class);
when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(mock(JobResultsPersister.Builder.class));
jobDataCountsPersister = mock(JobDataCountsPersister.class); jobDataCountsPersister = mock(JobDataCountsPersister.class);
normalizerFactory = mock(NormalizerFactory.class); normalizerFactory = mock(NormalizerFactory.class);
givenAllocationWithState(JobState.OPENED); givenAllocationWithState(JobState.OPENED);
@ -152,28 +150,16 @@ public class AutodetectProcessManagerTests extends ESTestCase {
ThreadPool threadPool = mock(ThreadPool.class); ThreadPool threadPool = mock(ThreadPool.class);
ThreadPool.Cancellable cancellable = mock(ThreadPool.Cancellable.class); ThreadPool.Cancellable cancellable = mock(ThreadPool.Cancellable.class);
when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(cancellable); when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(cancellable);
ExecutorService executorService = mock(ExecutorService.class); when(threadPool.executor(MlPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME))
doAnswer(invocation -> { .thenReturn(EsExecutors.newDirectExecutorService());
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(MlPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME)).thenReturn(executorService);
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
@SuppressWarnings("unchecked")
Stream<AutodetectResult> stream = mock(Stream.class);
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class);
when(stream.iterator()).thenReturn(iterator);
when(iterator.hasNext()).thenReturn(false);
when(parser.parseResults(any())).thenReturn(stream);
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
when(autodetectProcess.isProcessAlive()).thenReturn(true); when(autodetectProcess.isProcessAlive()).thenReturn(true);
when(autodetectProcess.getPersistStream()).thenReturn(new ByteArrayInputStream(new byte[0])); when(autodetectProcess.readAutodetectResults()).thenReturn(Collections.emptyIterator());
AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess; AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess;
Settings.Builder settings = Settings.builder(); Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3); settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3);
AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider, AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory)); jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory));
DataCounts dataCounts = new DataCounts("foo"); DataCounts dataCounts = new DataCounts("foo");
ModelSnapshot modelSnapshot = new ModelSnapshot("foo"); ModelSnapshot modelSnapshot = new ModelSnapshot("foo");
@ -296,7 +282,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
when(communicator.writeToJob(any(), any())).thenReturn(new DataCounts("foo")); when(communicator.writeToJob(any(), any())).thenReturn(new DataCounts("foo"));
AutodetectProcessManager manager = createManager(communicator); AutodetectProcessManager manager = createManager(communicator);
givenAllocationWithState(JobState.OPENED); givenAllocationWithState(JobState.OPENED);
InputStream inputStream = createInputStream(""); InputStream inputStream = createInputStream("");
@ -310,8 +295,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
Client client = mock(Client.class); Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class); ThreadPool threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class); ExecutorService executorService = mock(ExecutorService.class);
doThrow(new EsRejectedExecutionException("")).when(executorService).execute(any()); doThrow(new EsRejectedExecutionException("")).when(executorService).submit(any(Runnable.class));
when(threadPool.executor(anyString())).thenReturn(executorService); when(threadPool.executor(anyString())).thenReturn(executorService);
when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(mock(ThreadPool.Cancellable.class));
when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id")); when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id"));
doAnswer(invocationOnMock -> { doAnswer(invocationOnMock -> {
String jobId = (String) invocationOnMock.getArguments()[0]; String jobId = (String) invocationOnMock.getArguments()[0];
@ -321,11 +307,10 @@ public class AutodetectProcessManagerTests extends ESTestCase {
return null; return null;
}).when(jobProvider).dataCounts(eq("my_id"), any(), any()); }).when(jobProvider).dataCounts(eq("my_id"), any(), any());
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess; AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess;
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory); jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory);
expectThrows(EsRejectedExecutionException.class, expectThrows(EsRejectedExecutionException.class,
() -> manager.create("my_id", dataCounts, modelSnapshot, quantiles, filters, false, e -> {})); () -> manager.create("my_id", dataCounts, modelSnapshot, quantiles, filters, false, e -> {}));
@ -345,10 +330,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private AutodetectProcessManager createManager(AutodetectCommunicator communicator, Client client) { private AutodetectProcessManager createManager(AutodetectCommunicator communicator, Client client) {
ThreadPool threadPool = mock(ThreadPool.class); ThreadPool threadPool = mock(ThreadPool.class);
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class); AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class);
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory); jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory);
manager = spy(manager); manager = spy(manager);
doReturn(communicator).when(manager) doReturn(communicator).when(manager)
.create(any(), eq(dataCounts), eq(modelSnapshot), eq(quantiles), eq(filters), anyBoolean(), any()); .create(any(), eq(dataCounts), eq(modelSnapshot), eq(quantiles), eq(filters), anyBoolean(), any());

View File

@ -5,27 +5,21 @@
*/ */
package org.elasticsearch.xpack.ml.job.process.autodetect; package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import java.util.Iterator;
public class BlackHoleAutodetectProcessTests extends ESTestCase { public class BlackHoleAutodetectProcessTests extends ESTestCase {
public void testFlushJob_writesAck() throws Exception { public void testFlushJob_writesAck() throws Exception {
try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess()) { try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess()) {
String flushId = process.flushJob(InterimResultsParams.builder().build()); String flushId = process.flushJob(InterimResultsParams.builder().build());
Iterator<AutodetectResult> iterator = process.readAutodetectResults();
XContentParser parser = XContentFactory.xContent(XContentType.JSON) iterator.hasNext();
.createParser(NamedXContentRegistry.EMPTY, process.getProcessOutStream()); AutodetectResult result = iterator.next();
parser.nextToken(); // FlushAcknowledgementParser expects this to be
// called first
AutodetectResult result = AutodetectResult.PARSER.apply(parser, null);
FlushAcknowledgement ack = result.getFlushAcknowledgement(); FlushAcknowledgement ack = result.getFlushAcknowledgement();
assertEquals(flushId, ack.getId()); assertEquals(flushId, ack.getId());
} }

View File

@ -5,13 +5,16 @@
*/ */
package org.elasticsearch.xpack.ml.job.process.autodetect; package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@ -24,21 +27,35 @@ import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.Collections; import java.util.Collections;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class NativeAutodetectProcessTests extends ESTestCase { public class NativeAutodetectProcessTests extends ESTestCase {
private static final int NUMBER_ANALYSIS_FIELDS = 3; private static final int NUMBER_ANALYSIS_FIELDS = 3;
private ExecutorService executorService;
@Before
@SuppressWarnings("unchecked")
public void initialize() {
executorService = mock(ExecutorService.class);
when(executorService.submit(any(Runnable.class))).thenReturn(mock(Future.class));
}
public void testProcessStartTime() throws Exception { public void testProcessStartTime() throws Exception {
InputStream logStream = Mockito.mock(InputStream.class); InputStream logStream = Mockito.mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1); when(logStream.read(new byte[1024])).thenReturn(-1);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
Mockito.mock(OutputStream.class), Mockito.mock(InputStream.class), Mockito.mock(InputStream.class), Mockito.mock(OutputStream.class), Mockito.mock(InputStream.class),
NUMBER_ANALYSIS_FIELDS, null, EsExecutors.newDirectExecutorService())) { NUMBER_ANALYSIS_FIELDS, null, new AutodetectResultsParser(Settings.EMPTY))) {
process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
ZonedDateTime startTime = process.getProcessStartTime(); ZonedDateTime startTime = process.getProcessStartTime();
Thread.sleep(500); Thread.sleep(500);
@ -56,8 +73,9 @@ public class NativeAutodetectProcessTests extends ESTestCase {
String[] record = {"r1", "r2", "r3", "r4", "r5"}; String[] record = {"r1", "r2", "r3", "r4", "r5"};
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class), bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(),
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) { new AutodetectResultsParser(Settings.EMPTY))) {
process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
process.writeRecord(record); process.writeRecord(record);
process.flushStream(); process.flushStream();
@ -87,8 +105,9 @@ public class NativeAutodetectProcessTests extends ESTestCase {
when(logStream.read(new byte[1024])).thenReturn(-1); when(logStream.read(new byte[1024])).thenReturn(-1);
ByteArrayOutputStream bos = new ByteArrayOutputStream(ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH + 1024); ByteArrayOutputStream bos = new ByteArrayOutputStream(ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH + 1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class), bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(),
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) { new AutodetectResultsParser(Settings.EMPTY))) {
process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
InterimResultsParams params = InterimResultsParams.builder().build(); InterimResultsParams params = InterimResultsParams.builder().build();
process.flushJob(params); process.flushJob(params);
@ -103,8 +122,9 @@ public class NativeAutodetectProcessTests extends ESTestCase {
when(logStream.read(new byte[1024])).thenReturn(-1); when(logStream.read(new byte[1024])).thenReturn(-1);
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class), bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(),
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) { new AutodetectResultsParser(Settings.EMPTY))) {
process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), Optional.empty()); DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), Optional.empty());
process.writeResetBucketsControlMessage(params); process.writeResetBucketsControlMessage(params);
@ -120,8 +140,9 @@ public class NativeAutodetectProcessTests extends ESTestCase {
when(logStream.read(new byte[1024])).thenReturn(-1); when(logStream.read(new byte[1024])).thenReturn(-1);
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class), bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(),
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) { new AutodetectResultsParser(Settings.EMPTY))) {
process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
process.writeUpdateConfigMessage(""); process.writeUpdateConfigMessage("");
process.flushStream(); process.flushStream();

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
@ -20,12 +21,10 @@ import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.mockito.InOrder; import org.mockito.InOrder;
import java.io.InputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.stream.Stream;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.inOrder;
@ -43,19 +42,15 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcess() { public void testProcess() {
AutodetectResult autodetectResult = mock(AutodetectResult.class); AutodetectResult autodetectResult = mock(AutodetectResult.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Stream<AutodetectResult> stream = mock(Stream.class);
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class); Iterator<AutodetectResult> iterator = mock(Iterator.class);
when(stream.iterator()).thenReturn(iterator);
when(iterator.hasNext()).thenReturn(true).thenReturn(false); when(iterator.hasNext()).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn(autodetectResult); when(iterator.next()).thenReturn(autodetectResult);
AutodetectResultsParser parser = mock(AutodetectResultsParser.class); AutodetectProcess process = mock(AutodetectProcess.class);
when(parser.parseResults(any())).thenReturn(stream); when(process.readAutodetectResults()).thenReturn(iterator);
Renormalizer renormalizer = mock(Renormalizer.class); Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class); JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, parser); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister);
processor.process(mock(InputStream.class), randomBoolean()); processor.process(process, randomBoolean());
verify(renormalizer, times(1)).waitUntilIdle(); verify(renormalizer, times(1)).waitUntilIdle();
assertEquals(0, processor.completionLatch.getCount()); assertEquals(0, processor.completionLatch.getCount());
} }
@ -196,7 +191,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
FlushListener flushListener = mock(FlushListener.class); FlushListener flushListener = mock(FlushListener.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null, flushListener); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, flushListener);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false; context.deleteInterimRequired = false;
@ -218,7 +213,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class); JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
FlushListener flushListener = mock(FlushListener.class); FlushListener flushListener = mock(FlushListener.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null, flushListener); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, flushListener);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false; context.deleteInterimRequired = false;

View File

@ -233,7 +233,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
InputStream inputStream = new ByteArrayInputStream(METRIC_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8)); InputStream inputStream = new ByteArrayInputStream(METRIC_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY);
List<AutodetectResult> results = new ArrayList<>(); List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).iterator().forEachRemaining(results::add); parser.parseResults(inputStream).forEachRemaining(results::add);
List<Bucket> buckets = results.stream().map(AutodetectResult::getBucket) List<Bucket> buckets = results.stream().map(AutodetectResult::getBucket)
.filter(b -> b != null) .filter(b -> b != null)
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -328,7 +328,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
InputStream inputStream = new ByteArrayInputStream(POPULATION_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8)); InputStream inputStream = new ByteArrayInputStream(POPULATION_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY);
List<AutodetectResult> results = new ArrayList<>(); List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).iterator().forEachRemaining(results::add); parser.parseResults(inputStream).forEachRemaining(results::add);
List<Bucket> buckets = results.stream().map(AutodetectResult::getBucket) List<Bucket> buckets = results.stream().map(AutodetectResult::getBucket)
.filter(b -> b != null) .filter(b -> b != null)
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -355,7 +355,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
String json = "[]"; String json = "[]";
InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY);
assertFalse(parser.parseResults(inputStream).iterator().hasNext()); assertFalse(parser.parseResults(inputStream).hasNext());
} }
public void testParse_GivenModelSizeStats() throws ElasticsearchParseException, IOException { public void testParse_GivenModelSizeStats() throws ElasticsearchParseException, IOException {
@ -364,7 +364,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY);
List<AutodetectResult> results = new ArrayList<>(); List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).iterator().forEachRemaining(results::add); parser.parseResults(inputStream).forEachRemaining(results::add);
assertEquals(1, results.size()); assertEquals(1, results.size());
assertEquals(300, results.get(0).getModelSizeStats().getModelBytes()); assertEquals(300, results.get(0).getModelSizeStats().getModelBytes());
@ -375,7 +375,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY);
List<AutodetectResult> results = new ArrayList<>(); List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).iterator().forEachRemaining(results::add); parser.parseResults(inputStream).forEachRemaining(results::add);
assertEquals(1, results.size()); assertEquals(1, results.size());
assertEquals(18, results.get(0).getCategoryDefinition().getCategoryId()); assertEquals(18, results.get(0).getCategoryDefinition().getCategoryId());
@ -386,7 +386,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> parser.parseResults(inputStream).iterator().forEachRemaining(a -> {})); () -> parser.parseResults(inputStream).forEachRemaining(a -> {}));
assertEquals("[autodetect_result] unknown field [unknown], parser not found", e.getMessage()); assertEquals("[autodetect_result] unknown field [unknown], parser not found", e.getMessage());
} }
@ -395,7 +395,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY); AutodetectResultsParser parser = new AutodetectResultsParser(Settings.EMPTY);
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class,
() -> parser.parseResults(inputStream).iterator().forEachRemaining(a -> {})); () -> parser.parseResults(inputStream).forEachRemaining(a -> {}));
assertEquals("unexpected token [START_ARRAY]", e.getMessage()); assertEquals("unexpected token [START_ARRAY]", e.getMessage());
} }

View File

@ -6,12 +6,12 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.output; package org.elasticsearch.xpack.ml.job.process.autodetect.output;
import com.carrotsearch.randomizedtesting.annotations.Timeout; import com.carrotsearch.randomizedtesting.annotations.Timeout;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.junit.Before;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
@ -20,6 +20,9 @@ import java.util.List;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -41,16 +44,19 @@ public class StateProcessorTests extends ESTestCase {
private static final int NUM_LARGE_DOCS = 2; private static final int NUM_LARGE_DOCS = 2;
private static final int LARGE_DOC_SIZE = 16000000; private static final int LARGE_DOC_SIZE = 16000000;
private StateProcessor stateProcessor;
@Before
public void initialize() {
stateProcessor = spy(new StateProcessor(Settings.EMPTY, mock(Client.class)));
doNothing().when(stateProcessor).persist(any(), any());
}
public void testStateRead() throws IOException { public void testStateRead() throws IOException {
ByteArrayInputStream stream = new ByteArrayInputStream(STATE_SAMPLE.getBytes(StandardCharsets.UTF_8)); ByteArrayInputStream stream = new ByteArrayInputStream(STATE_SAMPLE.getBytes(StandardCharsets.UTF_8));
stateProcessor.process("_id", stream);
ArgumentCaptor<BytesReference> bytesRefCaptor = ArgumentCaptor.forClass(BytesReference.class); ArgumentCaptor<BytesReference> bytesRefCaptor = ArgumentCaptor.forClass(BytesReference.class);
JobResultsPersister persister = Mockito.mock(JobResultsPersister.class); verify(stateProcessor, times(3)).persist(eq("_id"), bytesRefCaptor.capture());
StateProcessor stateParser = new StateProcessor(Settings.EMPTY, persister);
stateParser.process("_id", stream);
verify(persister, times(3)).persistBulkState(eq("_id"), bytesRefCaptor.capture());
String[] threeStates = STATE_SAMPLE.split("\0"); String[] threeStates = STATE_SAMPLE.split("\0");
List<BytesReference> capturedBytes = bytesRefCaptor.getAllValues(); List<BytesReference> capturedBytes = bytesRefCaptor.getAllValues();
@ -68,7 +74,7 @@ public class StateProcessorTests extends ESTestCase {
public void testLargeStateRead() throws Exception { public void testLargeStateRead() throws Exception {
StringBuilder builder = new StringBuilder(NUM_LARGE_DOCS * (LARGE_DOC_SIZE + 10)); // 10 for header and separators StringBuilder builder = new StringBuilder(NUM_LARGE_DOCS * (LARGE_DOC_SIZE + 10)); // 10 for header and separators
for (int docNum = 1; docNum <= NUM_LARGE_DOCS; ++docNum) { for (int docNum = 1; docNum <= NUM_LARGE_DOCS; ++docNum) {
builder.append("header").append(docNum).append("\n"); builder.append("{\"index\":{\"_index\":\"header").append(docNum).append("\",\"_type\":\"type\"}}\n");
for (int count = 0; count < (LARGE_DOC_SIZE / "data".length()); ++count) { for (int count = 0; count < (LARGE_DOC_SIZE / "data".length()); ++count) {
builder.append("data"); builder.append("data");
} }
@ -76,12 +82,7 @@ public class StateProcessorTests extends ESTestCase {
} }
ByteArrayInputStream stream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8)); ByteArrayInputStream stream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8));
stateProcessor.process("_id", stream);
JobResultsPersister persister = Mockito.mock(JobResultsPersister.class); verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(eq("_id"), any());
StateProcessor stateParser = new StateProcessor(Settings.EMPTY, persister);
stateParser.process("_id", stream);
verify(persister, times(NUM_LARGE_DOCS)).persistBulkState(eq("_id"), any());
} }
} }