From 15f9b1ed9c07207bc1d19a1e06da5d40acff0831 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 5 Jul 2017 11:33:42 +0100 Subject: [PATCH] [ML] Impove mechanism for ignoring maintenance windows (elastic/x-pack-elasticsearch#1914) Currently, the autodetect process has an `ignoreDowntime` parameter which, when set to true, results to time being skipped over to the end of the bucket of the first data point received. After that, skipping time requires closing and opening the job. With regard to datafeeds, this does not work well with real-time requests which use the advance-time API in order to ensure results are created for data gaps. This commit improves this functionality by making it more flexible and less ambiguous. - flush API now supports skip_time parameter which sends a control message to the autodetect process telling it to skip time to a given value - the flush API now also returns the last_finalized_bucket_end time which allows clients to resume data searches correctly - the datafeed start API issues a skip_time request when the given start time is after the resume point. It then resumes the search from the last_finalized_bucket_end time. relates elastic/x-pack-elasticsearch#1913 Original commit: elastic/x-pack-elasticsearch@caa5fe801621a8bf628e9a0cffadebcbdd145c98 --- .../xpack/ml/MachineLearning.java | 3 +- .../xpack/ml/action/FlushJobAction.java | 72 +++++++++++++++---- .../xpack/ml/action/OpenJobAction.java | 29 ++++---- .../xpack/ml/datafeed/DatafeedJob.java | 22 +++++- .../xpack/ml/job/process/ProcessCtrl.java | 8 +-- .../process/autodetect/AutodetectBuilder.java | 15 +--- .../autodetect/AutodetectCommunicator.java | 19 +++-- .../autodetect/AutodetectProcessFactory.java | 2 - .../autodetect/AutodetectProcessManager.java | 26 +++---- .../BlackHoleAutodetectProcess.java | 2 +- .../NativeAutodetectProcessFactory.java | 6 +- .../output/AutoDetectResultProcessor.java | 13 ++-- .../output/FlushAcknowledgement.java | 40 +++++++++-- .../autodetect/output/FlushListener.java | 40 ++++++++--- .../autodetect/params/FlushJobParams.java | 70 +++++++++++++----- .../writer/ControlMsgToProcessWriter.java | 8 +++ .../xpack/ml/rest/job/RestFlushJobAction.java | 2 + .../xpack/ml/rest/job/RestOpenJobAction.java | 1 - .../ml/action/OpenJobActionRequestTests.java | 3 - .../ml/action/PostDataFlushResponseTests.java | 3 +- .../xpack/ml/datafeed/DatafeedJobTests.java | 17 ++++- .../AutodetectResultProcessorIT.java | 2 +- .../ml/job/process/ProcessCtrlTests.java | 33 ++------- .../AutodetectCommunicatorTests.java | 15 ++-- .../AutodetectProcessManagerTests.java | 56 +++++++-------- .../AutoDetectResultProcessorTests.java | 10 ++- .../output/FlushAcknowledgementTests.java | 4 +- .../autodetect/output/FlushListenerTests.java | 39 +++++----- .../params/FlushJobParamsTests.java | 34 +++++++-- .../ControlMsgToProcessWriterTests.java | 29 +++++++- .../ml/job/results/AutodetectResultTests.java | 2 +- .../rest-api-spec/api/xpack.ml.flush_job.json | 6 +- .../rest-api-spec/test/ml/post_data.yml | 64 +++++++++++++++++ 33 files changed, 473 insertions(+), 222 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index dbe8a4cfbb1..f13cadedd23 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -280,8 +280,7 @@ public class MachineLearning implements ActionPlugin { throw new ElasticsearchException("Failed to create native process factories for Machine Learning", e); } } else { - autodetectProcessFactory = (job, modelSnapshot, quantiles, filters, - ignoreDowntime, executorService, onProcessCrash) -> + autodetectProcessFactory = (job, modelSnapshot, quantiles, filters, executorService, onProcessCrash) -> new BlackHoleAutodetectProcess(job.getId()); // factor of 1.0 makes renormalization a no-op normalizerProcessFactory = (jobId, quantilesState, bucketSpan, perPartitionNormalization, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java index b846401723a..2c585691ff1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; @@ -13,6 +14,7 @@ import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -28,10 +30,12 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import java.io.IOException; +import java.util.Date; import java.util.Objects; public class FlushJobAction extends Action { @@ -59,6 +63,7 @@ public class FlushJobAction extends Action PARSER = new ObjectParser<>(NAME, Request::new); @@ -68,6 +73,7 @@ public class FlushJobAction extends Action { - if (e == null) { - listener.onResponse(new Response(true)); - } else { - listener.onFailure(e); - } - }); + processManager.flushJob(task, paramsBuilder.build(), ActionListener.wrap( + flushAcknowledgement -> { + listener.onResponse(new Response(true, + flushAcknowledgement == null ? null : flushAcknowledgement.getLastFinalizedBucketEnd())); + }, listener::onFailure + )); } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index cf149438057..b14d9745494 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -186,13 +186,15 @@ public class OpenJobAction extends Action PARSER = new ObjectParser<>(TASK_NAME, JobParams::new); static { PARSER.declareString(JobParams::setJobId, Job.ID); - PARSER.declareBoolean(JobParams::setIgnoreDowntime, IGNORE_DOWNTIME); + PARSER.declareBoolean((p, v) -> {}, IGNORE_DOWNTIME); PARSER.declareString((params, val) -> params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); } @@ -210,7 +212,6 @@ public class OpenJobAction extends Action { + autodetectProcessManager.openJob(jobTask, e2 -> { if (e2 == null) { task.markAsCompleted(); } else { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index f6f23779a2f..6777011641d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -77,7 +77,7 @@ class DatafeedJob { } Long runLookBack(long startTime, Long endTime) throws Exception { - lookbackStartTimeMs = (lastEndTimeMs != null && lastEndTimeMs + 1 > startTime) ? lastEndTimeMs + 1 : startTime; + lookbackStartTimeMs = skipToStartTime(startTime); Optional endMs = Optional.ofNullable(endTime); long lookbackEnd = endMs.orElse(currentTimeSupplier.get() - queryDelayMs); boolean isLookbackOnly = endMs.isPresent(); @@ -115,6 +115,22 @@ class DatafeedJob { return null; } + private long skipToStartTime(long startTime) { + if (lastEndTimeMs != null) { + if (lastEndTimeMs + 1 > startTime) { + // start time is before last checkpoint, thus continue from checkpoint + return lastEndTimeMs + 1; + } + // start time is after last checkpoint, thus we need to skip time + FlushJobAction.Request request = new FlushJobAction.Request(jobId); + request.setSkipTime(String.valueOf(startTime)); + FlushJobAction.Response flushResponse = flushJob(request); + LOGGER.info("Skipped to time [" + flushResponse.getLastFinalizedBucketEnd().getTime() + "]"); + return flushResponse.getLastFinalizedBucketEnd().getTime(); + } + return startTime; + } + long runRealtime() throws Exception { long start = lastEndTimeMs == null ? lookbackStartTimeMs : Math.max(lookbackStartTimeMs, lastEndTimeMs + 1); long nowMinusQueryDelay = currentTimeSupplier.get() - queryDelayMs; @@ -265,10 +281,10 @@ class DatafeedJob { return (epochMs / frequencyMs) * frequencyMs; } - private void flushJob(FlushJobAction.Request flushRequest) { + private FlushJobAction.Response flushJob(FlushJobAction.Request flushRequest) { try { LOGGER.trace("[" + jobId + "] Sending flush request"); - client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet(); + return client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet(); } catch (Exception e) { LOGGER.debug("[" + jobId + "] error while flushing job", e); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/ProcessCtrl.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/ProcessCtrl.java index 7049d7093d1..aaf0eb43b7d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/ProcessCtrl.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/ProcessCtrl.java @@ -78,7 +78,6 @@ public class ProcessCtrl { */ static final String BUCKET_SPAN_ARG = "--bucketspan="; public static final String DELETE_STATE_FILES_ARG = "--deleteStateFiles"; - static final String IGNORE_DOWNTIME_ARG = "--ignoreDowntime"; static final String LENGTH_ENCODED_INPUT_ARG = "--lengthEncodedInput"; static final String MODEL_CONFIG_ARG = "--modelconfig="; public static final String QUANTILES_STATE_PATH_ARG = "--quantilesState="; @@ -151,8 +150,7 @@ public class ProcessCtrl { return rng.nextInt(SECONDS_IN_HOUR); } - public static List buildAutodetectCommand(Environment env, Settings settings, Job job, Logger logger, boolean ignoreDowntime, - long controllerPid) { + public static List buildAutodetectCommand(Environment env, Settings settings, Job job, Logger logger, long controllerPid) { List command = new ArrayList<>(); command.add(AUTODETECT_PATH); @@ -213,10 +211,6 @@ public class ProcessCtrl { int maxQuantileInterval = BASE_MAX_QUANTILE_INTERVAL + intervalStagger; command.add(MAX_QUANTILE_INTERVAL_ARG + maxQuantileInterval); - if (ignoreDowntime) { - command.add(IGNORE_DOWNTIME_ARG); - } - if (modelConfigFilePresent(env)) { String modelConfigFile = XPackPlugin.resolveConfigFile(env, ML_MODEL_CONF).toString(); command.add(MODEL_CONFIG_ARG + modelConfigFile); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index 93f3954fbcd..e930fc7aa7d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -43,7 +43,6 @@ public class AutodetectBuilder { private Job job; private List filesToDelete; private Logger logger; - private boolean ignoreDowntime; private Set referencedFilters; private Quantiles quantiles; private Environment env; @@ -68,21 +67,9 @@ public class AutodetectBuilder { this.job = Objects.requireNonNull(job); this.filesToDelete = Objects.requireNonNull(filesToDelete); this.logger = Objects.requireNonNull(logger); - ignoreDowntime = false; referencedFilters = new HashSet<>(); } - /** - * Set ignoreDowntime - * - * @param ignoreDowntime If true set the ignore downtime flag overriding the - * setting in the job configuration - */ - public AutodetectBuilder ignoreDowntime(boolean ignoreDowntime) { - this.ignoreDowntime = ignoreDowntime; - return this; - } - public AutodetectBuilder referencedFilters(Set filters) { referencedFilters = filters; return this; @@ -103,7 +90,7 @@ public class AutodetectBuilder { */ public void build() throws IOException, TimeoutException { - List command = ProcessCtrl.buildAutodetectCommand(env, settings, job, logger, ignoreDowntime, controller.getPid()); + List command = ProcessCtrl.buildAutodetectCommand(env, settings, job, logger, controller.getPid()); buildLimits(command); buildModelPlotConfig(command); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 2f4cd55aeb0..5875b5249a2 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -23,6 +24,7 @@ import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.CountingInputStream; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; @@ -196,23 +198,24 @@ public class AutodetectCommunicator implements Closeable { }, handler); } - public void flushJob(FlushJobParams params, BiConsumer handler) { + public void flushJob(FlushJobParams params, BiConsumer handler) { submitOperation(() -> { String flushId = autodetectProcess.flushJob(params); - waitFlushToCompletion(flushId); - return null; + return waitFlushToCompletion(flushId); }, handler); } - void waitFlushToCompletion(String flushId) { + @Nullable + FlushAcknowledgement waitFlushToCompletion(String flushId) { LOGGER.debug("[{}] waiting for flush", job.getId()); + FlushAcknowledgement flushAcknowledgement; try { - boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY); - while (isFlushComplete == false) { + flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY); + while (flushAcknowledgement == null) { checkProcessIsAlive(); checkResultsProcessorIsAlive(); - isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY); + flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY); } } finally { autoDetectResultProcessor.clearAwaitingFlush(flushId); @@ -225,6 +228,8 @@ public class AutodetectCommunicator implements Closeable { LOGGER.debug("[{}] Flush completed", job.getId()); } + + return flushAcknowledgement; } /** diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessFactory.java index 1126c2021b8..0bd505980b3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessFactory.java @@ -25,13 +25,11 @@ public interface AutodetectProcessFactory { * @param modelSnapshot The model snapshot to restore from * @param quantiles The quantiles to push to the native process * @param filters The filters to push to the native process - * @param ignoreDowntime Should gaps in data be treated as anomalous or as a maintenance window after a job re-start * @param executorService Executor service used to start the async tasks a job needs to operate the analytical process * @param onProcessCrash Callback to execute if the process stops unexpectedly * @return The process */ AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot, Quantiles quantiles, Set filters, - boolean ignoreDowntime, ExecutorService executorService, Runnable onProcessCrash); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index e7d2eb7521b..849d86587f0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -37,6 +37,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; @@ -201,23 +202,23 @@ public class AutodetectProcessManager extends AbstractComponent { * @param params Parameters describing the controls that will accompany the flushing * (e.g. calculating interim results, time control, etc.) */ - public void flushJob(JobTask jobTask, FlushJobParams params, Consumer handler) { + public void flushJob(JobTask jobTask, FlushJobParams params, ActionListener handler) { logger.debug("Flushing job {}", jobTask.getJobId()); AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobTask.getAllocationId()); if (communicator == null) { String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId()); logger.debug(message); - handler.accept(ExceptionsHelper.conflictStatusException(message)); + handler.onFailure(ExceptionsHelper.conflictStatusException(message)); return; } - communicator.flushJob(params, (aVoid, e) -> { - if (e == null) { - handler.accept(null); - } else { + communicator.flushJob(params, (flushAcknowledgement, e) -> { + if (e != null) { String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobTask.getJobId()); logger.error(msg); - handler.accept(ExceptionsHelper.serverError(msg, e)); + handler.onFailure(ExceptionsHelper.serverError(msg, e)); + } else { + handler.onResponse(flushAcknowledgement); } }); } @@ -240,7 +241,7 @@ public class AutodetectProcessManager extends AbstractComponent { }); } - public void openJob(JobTask jobTask, boolean ignoreDowntime, Consumer handler) { + public void openJob(JobTask jobTask, Consumer handler) { String jobId = jobTask.getJobId(); Job job = jobManager.getJobOrThrowIfUnknown(jobId); @@ -263,7 +264,7 @@ public class AutodetectProcessManager extends AbstractComponent { protected void doRun() throws Exception { try { AutodetectCommunicator communicator = autoDetectCommunicatorByJob.computeIfAbsent(jobTask.getAllocationId(), - id -> create(jobTask, params, ignoreDowntime, handler)); + id -> create(jobTask, params, handler)); communicator.init(params.modelSnapshot()); setJobState(jobTask, JobState.OPENED); } catch (Exception e1) { @@ -286,8 +287,7 @@ public class AutodetectProcessManager extends AbstractComponent { }); } - AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams, - boolean ignoreDowntime, Consumer handler) { + AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams, Consumer handler) { if (autoDetectCommunicatorByJob.size() == maxAllowedRunningJobs) { throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached", RestStatus.TOO_MANY_REQUESTS); @@ -321,8 +321,8 @@ public class AutodetectProcessManager extends AbstractComponent { renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization()); AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(), - autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime, - autoDetectExecutorService, () -> setJobState(jobTask, JobState.FAILED)); + autodetectParams.quantiles(), autodetectParams.filters(), autoDetectExecutorService, + () -> setJobState(jobTask, JobState.FAILED)); AutoDetectResultProcessor processor = new AutoDetectResultProcessor( client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats()); ExecutorService autodetectWorkerExecutor; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index a8acb7093ba..a9d091a5b82 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -77,7 +77,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { */ @Override public String flushJob(FlushJobParams params) throws IOException { - FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID); + FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, null); AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement); results.add(result); return FLUSH_ID; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index a78c5b20f99..04870785e06 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -59,13 +59,12 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory @Override public AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot, Quantiles quantiles, Set filters, - boolean ignoreDowntime, ExecutorService executorService, Runnable onProcessCrash) { List filesToDelete = new ArrayList<>(); ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, ProcessCtrl.AUTODETECT, job.getId(), true, false, true, true, modelSnapshot != null, !ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING.get(settings)); - createNativeProcess(job, quantiles, filters, processPipes, ignoreDowntime, filesToDelete); + createNativeProcess(job, quantiles, filters, processPipes, filesToDelete); int numberOfAnalysisFields = job.getAnalysisConfig().analysisFields().size(); StateProcessor stateProcessor = new StateProcessor(settings, client); @@ -88,11 +87,10 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory } private void createNativeProcess(Job job, Quantiles quantiles, Set filters, ProcessPipes processPipes, - boolean ignoreDowntime, List filesToDelete) { + List filesToDelete) { try { AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, LOGGER, env, settings, nativeController, processPipes) - .ignoreDowntime(ignoreDowntime) .referencedFilters(filters); // if state is null or empty it will be ignored diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 384a3eee677..984d868017d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.xpack.ml.MachineLearning; @@ -231,7 +232,7 @@ public class AutoDetectResultProcessor { // through to the data store context.bulkResultsPersister.executeRequest(); persister.commitResultWrites(context.jobId); - flushListener.acknowledgeFlush(flushAcknowledgement.getId()); + flushListener.acknowledgeFlush(flushAcknowledgement); // Interim results may have been produced by the flush, // which need to be // deleted when the next finalized results come through @@ -291,13 +292,11 @@ public class AutoDetectResultProcessor { * * @param flushId the id of the flush request to wait for * @param timeout the timeout - * @return {@code true} if the flush has completed or the parsing finished; {@code false} if the timeout expired + * @return The {@link FlushAcknowledgement} if the flush has completed or the parsing finished; {@code null} if the timeout expired */ - public boolean waitForFlushAcknowledgement(String flushId, Duration timeout) { - if (failed) { - return false; - } - return flushListener.waitForFlush(flushId, timeout); + @Nullable + public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) { + return failed ? null : flushListener.waitForFlush(flushId, timeout); } public void clearAwaitingFlush(String flushId) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgement.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgement.java index 3f4aa33f565..2d2c9e7c77c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgement.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgement.java @@ -5,15 +5,20 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.output; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.ml.utils.time.TimeUtils; import java.io.IOException; +import java.util.Date; import java.util.Objects; /** @@ -25,44 +30,70 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable { */ public static final ParseField TYPE = new ParseField("flush"); public static final ParseField ID = new ParseField("id"); + public static final ParseField LAST_FINALIZED_BUCKET_END = new ParseField("last_finalized_bucket_end"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - TYPE.getPreferredName(), a -> new FlushAcknowledgement((String) a[0])); + TYPE.getPreferredName(), a -> new FlushAcknowledgement((String) a[0], (Date) a[1])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), ID); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> { + if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) { + return new Date(p.longValue()); + } else if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return new Date(TimeUtils.dateStringToEpoch(p.text())); + } + throw new IllegalArgumentException( + "unexpected token [" + p.currentToken() + "] for [" + LAST_FINALIZED_BUCKET_END.getPreferredName() + "]"); + }, LAST_FINALIZED_BUCKET_END, ObjectParser.ValueType.VALUE); } private String id; + private Date lastFinalizedBucketEnd; - public FlushAcknowledgement(String id) { + public FlushAcknowledgement(String id, Date lastFinalizedBucketEnd) { this.id = id; + this.lastFinalizedBucketEnd = lastFinalizedBucketEnd; } public FlushAcknowledgement(StreamInput in) throws IOException { id = in.readString(); + if (in.getVersion().after(Version.V_5_5_0)) { + lastFinalizedBucketEnd = new Date(in.readVLong()); + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(id); + if (out.getVersion().after(Version.V_5_5_0)) { + out.writeVLong(lastFinalizedBucketEnd.getTime()); + } } public String getId() { return id; } + public Date getLastFinalizedBucketEnd() { + return lastFinalizedBucketEnd; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(ID.getPreferredName(), id); + if (lastFinalizedBucketEnd != null) { + builder.dateField(LAST_FINALIZED_BUCKET_END.getPreferredName(), LAST_FINALIZED_BUCKET_END.getPreferredName() + "_string", + lastFinalizedBucketEnd.getTime()); + } builder.endObject(); return builder; } @Override public int hashCode() { - return Objects.hash(id); + return Objects.hash(id, lastFinalizedBucketEnd); } @Override @@ -74,7 +105,8 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable { return false; } FlushAcknowledgement other = (FlushAcknowledgement) obj; - return Objects.equals(id, other.id); + return Objects.equals(id, other.id) && + Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java index a268669978e..b9250190ee5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.output; +import org.elasticsearch.common.Nullable; + import java.time.Duration; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; @@ -15,29 +17,34 @@ import java.util.concurrent.atomic.AtomicBoolean; class FlushListener { - final ConcurrentMap awaitingFlushed = new ConcurrentHashMap<>(); + final ConcurrentMap awaitingFlushed = new ConcurrentHashMap<>(); final AtomicBoolean cleared = new AtomicBoolean(false); - boolean waitForFlush(String flushId, Duration timeout) { + @Nullable + FlushAcknowledgement waitForFlush(String flushId, Duration timeout) { if (cleared.get()) { - return false; + return null; } - CountDownLatch latch = awaitingFlushed.computeIfAbsent(flushId, (key) -> new CountDownLatch(1)); + FlushAcknowledgementHolder holder = awaitingFlushed.computeIfAbsent(flushId, (key) -> new FlushAcknowledgementHolder(flushId)); try { - return latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS); + if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + return holder.flushAcknowledgement; + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - return false; } + return null; } - void acknowledgeFlush(String flushId) { + void acknowledgeFlush(FlushAcknowledgement flushAcknowledgement) { // acknowledgeFlush(...) could be called before waitForFlush(...) // a flush api call writes a flush command to the analytical process and then via a different thread the // result reader then reads whether the flush has been acked. - CountDownLatch latch = awaitingFlushed.computeIfAbsent(flushId, (key) -> new CountDownLatch(1)); - latch.countDown(); + String flushId = flushAcknowledgement.getId(); + FlushAcknowledgementHolder holder = awaitingFlushed.computeIfAbsent(flushId, (key) -> new FlushAcknowledgementHolder(flushId)); + holder.flushAcknowledgement = flushAcknowledgement; + holder.latch.countDown(); } void clear(String flushId) { @@ -46,11 +53,22 @@ class FlushListener { void clear() { if (cleared.compareAndSet(false, true)) { - Iterator> latches = awaitingFlushed.entrySet().iterator(); + Iterator> latches = awaitingFlushed.entrySet().iterator(); while (latches.hasNext()) { - latches.next().getValue().countDown(); + latches.next().getValue().latch.countDown(); latches.remove(); } } } + + private static class FlushAcknowledgementHolder { + + private final CountDownLatch latch; + private volatile FlushAcknowledgement flushAcknowledgement; + + private FlushAcknowledgementHolder(String flushId) { + this.flushAcknowledgement = new FlushAcknowledgement(flushId, null); + this.latch = new CountDownLatch(1); + } + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java index 0ef757b801b..e4572af77f4 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.params; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.Strings; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.time.TimeUtils; @@ -13,14 +14,32 @@ import org.elasticsearch.xpack.ml.utils.time.TimeUtils; import java.util.Objects; public class FlushJobParams { + + /** + * Whether interim results should be calculated + */ private final boolean calcInterim; + + /** + * The time range for which interim results should be calculated + */ private final TimeRange timeRange; + + /** + * The epoch (seconds) to advance time to + */ private final Long advanceTimeSeconds; - private FlushJobParams(boolean calcInterim, TimeRange timeRange, Long advanceTimeSeconds) { + /** + * The epoch (seconds) to skip time to + */ + private final Long skipTimeSeconds; + + private FlushJobParams(boolean calcInterim, TimeRange timeRange, Long advanceTimeSeconds, Long skipTimeSeconds) { this.calcInterim = calcInterim; this.timeRange = Objects.requireNonNull(timeRange); this.advanceTimeSeconds = advanceTimeSeconds; + this.skipTimeSeconds = skipTimeSeconds; } public boolean shouldCalculateInterim() { @@ -31,6 +50,10 @@ public class FlushJobParams { return advanceTimeSeconds != null; } + public boolean shouldSkipTime() { + return skipTimeSeconds != null; + } + public String getStart() { return timeRange.getStart(); } @@ -46,6 +69,13 @@ public class FlushJobParams { return advanceTimeSeconds; } + public long getSkipTime() { + if (!shouldSkipTime()) { + throw new IllegalStateException(); + } + return skipTimeSeconds; + } + public static Builder builder() { return new Builder(); } @@ -57,24 +87,20 @@ public class FlushJobParams { FlushJobParams that = (FlushJobParams) o; return calcInterim == that.calcInterim && Objects.equals(timeRange, that.timeRange) && - Objects.equals(advanceTimeSeconds, that.advanceTimeSeconds); + Objects.equals(advanceTimeSeconds, that.advanceTimeSeconds) && + Objects.equals(skipTimeSeconds, that.skipTimeSeconds); } @Override public int hashCode() { - return Objects.hash(calcInterim, timeRange, advanceTimeSeconds); + return Objects.hash(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds); } public static class Builder { private boolean calcInterim = false; - private TimeRange timeRange; + private TimeRange timeRange = TimeRange.builder().build(); private String advanceTime; - - private Builder() { - calcInterim = false; - timeRange = TimeRange.builder().build(); - advanceTime = ""; - } + private String skipTime; public Builder calcInterim(boolean value) { calcInterim = value; @@ -87,14 +113,24 @@ public class FlushJobParams { } public Builder advanceTime(String timestamp) { - advanceTime = ExceptionsHelper.requireNonNull(timestamp, "advance"); + advanceTime = ExceptionsHelper.requireNonNull(timestamp, "advance_time"); + return this; + } + + public Builder skipTime(String timestamp) { + skipTime = ExceptionsHelper.requireNonNull(timestamp, "skip_time"); return this; } public FlushJobParams build() { checkValidFlushArgumentsCombination(); - Long advanceTimeSeconds = checkAdvanceTimeParam(); - return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds); + Long advanceTimeSeconds = parseTimeParam("advance_time", advanceTime); + Long skipTimeSeconds = parseTimeParam("skip_time", skipTime); + if (skipTimeSeconds != null && advanceTimeSeconds != null && advanceTimeSeconds <= skipTimeSeconds) { + throw ExceptionsHelper.badRequestException("advance_time [" + advanceTime + "] must be later than skip_time [" + + skipTime + "]"); + } + return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds); } private void checkValidFlushArgumentsCombination() { @@ -107,11 +143,11 @@ public class FlushJobParams { } } - private Long checkAdvanceTimeParam() { - if (advanceTime != null && !advanceTime.isEmpty()) { - return paramToEpochIfValidOrThrow("advance_time", advanceTime) / TimeRange.MILLISECONDS_IN_SECOND; + private Long parseTimeParam(String name, String value) { + if (Strings.isNullOrEmpty(value)) { + return null; } - return null; + return paramToEpochIfValidOrThrow(name, value) / TimeRange.MILLISECONDS_IN_SECOND; } private long paramToEpochIfValidOrThrow(String paramName, String date) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java index 537bba0b1ee..a43b26890ec 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java @@ -52,6 +52,11 @@ public class ControlMsgToProcessWriter { */ private static final String ADVANCE_TIME_MESSAGE_CODE = "t"; + /** + * This must match the code defined in the api::CAnomalyDetector C++ class. + */ + private static final String SKIP_TIME_MESSAGE_CODE = "s"; + /** * This must match the code defined in the api::CAnomalyDetector C++ class. */ @@ -108,6 +113,9 @@ public class ControlMsgToProcessWriter { * (e.g. calculating interim results, time control, etc.) */ public void writeFlushControlMessage(FlushJobParams params) throws IOException { + if (params.shouldSkipTime()) { + writeMessage(SKIP_TIME_MESSAGE_CODE + params.getSkipTime()); + } if (params.shouldAdvanceTime()) { writeMessage(ADVANCE_TIME_MESSAGE_CODE + params.getAdvanceTime()); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestFlushJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestFlushJobAction.java index 7b671aff40c..f5169b01332 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestFlushJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestFlushJobAction.java @@ -24,6 +24,7 @@ public class RestFlushJobAction extends BaseRestHandler { private final String DEFAULT_START = ""; private final String DEFAULT_END = ""; private final String DEFAULT_ADVANCE_TIME = ""; + private final String DEFAULT_SKIP_TIME = ""; public RestFlushJobAction(Settings settings, RestController controller) { super(settings); @@ -50,6 +51,7 @@ public class RestFlushJobAction extends BaseRestHandler { request.setStart(restRequest.param(FlushJobAction.Request.START.getPreferredName(), DEFAULT_START)); request.setEnd(restRequest.param(FlushJobAction.Request.END.getPreferredName(), DEFAULT_END)); request.setAdvanceTime(restRequest.param(FlushJobAction.Request.ADVANCE_TIME.getPreferredName(), DEFAULT_ADVANCE_TIME)); + request.setSkipTime(restRequest.param(FlushJobAction.Request.SKIP_TIME.getPreferredName(), DEFAULT_SKIP_TIME)); } return channel -> client.execute(FlushJobAction.INSTANCE, request, new RestToXContentListener<>(channel)); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestOpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestOpenJobAction.java index 3758fc6224a..6c050baa7d8 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestOpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestOpenJobAction.java @@ -42,7 +42,6 @@ public class RestOpenJobAction extends BaseRestHandler { request = OpenJobAction.Request.parseRequest(restRequest.param(Job.ID.getPreferredName()), restRequest.contentParser()); } else { OpenJobAction.JobParams jobParams = new OpenJobAction.JobParams(restRequest.param(Job.ID.getPreferredName())); - jobParams.setIgnoreDowntime(restRequest.paramAsBoolean(OpenJobAction.JobParams.IGNORE_DOWNTIME.getPreferredName(), true)); if (restRequest.hasParam(OpenJobAction.JobParams.TIMEOUT.getPreferredName())) { TimeValue openTimeout = restRequest.paramAsTime(OpenJobAction.JobParams.TIMEOUT.getPreferredName(), TimeValue.timeValueSeconds(20)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionRequestTests.java index 2a9ff8a9748..03240af41d5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionRequestTests.java @@ -18,9 +18,6 @@ public class OpenJobActionRequestTests extends AbstractStreamableXContentTestCas if (randomBoolean()) { params.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); } - if (randomBoolean()) { - params.setIgnoreDowntime(randomBoolean()); - } return new Request(params); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/PostDataFlushResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/PostDataFlushResponseTests.java index fff721b90e0..8cb971f6245 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/PostDataFlushResponseTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/PostDataFlushResponseTests.java @@ -7,12 +7,13 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.test.AbstractStreamableTestCase; import org.elasticsearch.xpack.ml.action.FlushJobAction.Response; +import org.joda.time.DateTime; public class PostDataFlushResponseTests extends AbstractStreamableTestCase { @Override protected Response createTestInstance() { - return new Response(randomBoolean()); + return new Response(randomBoolean(), new DateTime(randomDateTimeZone()).toDate()); } @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 623d58aed31..9974d1aa803 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -27,9 +27,11 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Date; +import java.util.List; import java.util.Optional; import java.util.function.Supplier; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; @@ -139,6 +141,7 @@ public class DatafeedJobTests extends ESTestCase { assertEquals(10000 + frequencyMs + 100, next); verify(dataExtractorFactory).newExtractor(5000 + 1L, currentTime - queryDelayMs); + assertThat(flushJobRequests.getAllValues().size(), equalTo(1)); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); flushRequest.setCalcInterim(true); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); @@ -148,14 +151,17 @@ public class DatafeedJobTests extends ESTestCase { // We need to return empty counts so that the lookback doesn't update the last end time when(postDataFuture.actionGet()).thenReturn(new PostDataAction.Response(new DataCounts("_job_id"))); - currentTime = 10000L; + currentTime = 9999L; long latestFinalBucketEndTimeMs = 5000; long latestRecordTimeMs = 5000; + FlushJobAction.Response skipTimeResponse = new FlushJobAction.Response(true, new Date(10000L)); + when(flushJobFuture.actionGet()).thenReturn(skipTimeResponse); + long frequencyMs = 1000; long queryDelayMs = 500; DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs); - datafeedJob.runLookBack(10000L, null); + datafeedJob.runLookBack(currentTime, null); // advance time currentTime = 12000L; @@ -163,6 +169,13 @@ public class DatafeedJobTests extends ESTestCase { expectThrows(DatafeedJob.EmptyDataCountException.class, () -> datafeedJob.runRealtime()); verify(dataExtractorFactory, times(1)).newExtractor(10000L, 11000L); + List capturedFlushJobRequests = flushJobRequests.getAllValues(); + assertThat(capturedFlushJobRequests.size(), equalTo(2)); + assertThat(capturedFlushJobRequests.get(0).getCalcInterim(), is(false)); + assertThat(capturedFlushJobRequests.get(0).getSkipTime(), equalTo("9999")); + assertThat(capturedFlushJobRequests.get(1).getCalcInterim(), is(true)); + assertThat(capturedFlushJobRequests.get(1).getSkipTime(), is(nullValue())); + assertThat(capturedFlushJobRequests.get(1).getAdvanceTime(), equalTo("11000")); Mockito.verifyNoMoreInteractions(dataExtractorFactory); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 462d6bdd92b..6a366543878 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -354,7 +354,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { } private FlushAcknowledgement createFlushAcknowledgement() { - return new FlushAcknowledgement(randomAlphaOfLength(5)); + return new FlushAcknowledgement(randomAlphaOfLength(5), new Date(randomNonNegativeLong())); } private class ResultsBuilder { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/ProcessCtrlTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/ProcessCtrlTests.java index fd3edd69bf4..91ef2840f8b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/ProcessCtrlTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/ProcessCtrlTests.java @@ -51,8 +51,8 @@ public class ProcessCtrlTests extends ESTestCase { dd.setTimeField("tf"); job.setDataDescription(dd); - List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, true, pid); - assertEquals(15, command.size()); + List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, pid); + assertEquals(14, command.size()); assertTrue(command.contains(ProcessCtrl.AUTODETECT_PATH)); assertTrue(command.contains(ProcessCtrl.BUCKET_SPAN_ARG + "120")); assertTrue(command.contains(ProcessCtrl.LATENCY_ARG + "360")); @@ -73,7 +73,6 @@ public class ProcessCtrlTests extends ESTestCase { assertTrue(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval)); int expectedMaxQuantileInterval = 21600 + ProcessCtrl.calculateStaggeringInterval(job.getId()); assertTrue(command.contains(ProcessCtrl.MAX_QUANTILE_INTERVAL_ARG + expectedMaxQuantileInterval)); - assertTrue(command.contains(ProcessCtrl.IGNORE_DOWNTIME_ARG)); } public void testBuildAutodetectCommand_defaultTimeField() { @@ -81,7 +80,7 @@ public class ProcessCtrlTests extends ESTestCase { Environment env = new Environment(settings); Job.Builder job = buildJobBuilder("unit-test-job"); - List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid); + List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, pid); assertTrue(command.contains(ProcessCtrl.TIME_FIELD_ARG + "time")); } @@ -94,38 +93,16 @@ public class ProcessCtrlTests extends ESTestCase { int expectedPersistInterval = 10800 + ProcessCtrl.calculateStaggeringInterval(job.getId()); - List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid); + List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, pid); assertFalse(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval)); settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); env = new Environment(settings); - command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid); + command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, pid); assertTrue(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval)); } - public void testBuildAutodetectCommand_GivenNoIgnoreDowntime() { - Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); - Environment env = new Environment( - settings); - Job.Builder job = buildJobBuilder("foo"); - - List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid); - - assertFalse(command.contains("--ignoreDowntime")); - } - - public void testBuildAutodetectCommand_GivenIgnoreDowntimeParam() { - Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); - Environment env = new Environment( - settings); - Job.Builder job = buildJobBuilder("foo"); - - List command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, true, pid); - - assertTrue(command.contains("--ignoreDowntime")); - } - public void testBuildNormalizerCommand() throws IOException { Environment env = new Environment( Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 5704ce6cbe8..27396737af5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; @@ -35,9 +36,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import static org.elasticsearch.mock.orig.Mockito.doAnswer; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -70,10 +73,13 @@ public class AutodetectCommunicatorTests extends ESTestCase { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class); - when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(true); + FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); + when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(flushAcknowledgement); try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor)) { FlushJobParams params = FlushJobParams.builder().build(); - communicator.flushJob(params, (aVoid, e) -> {}); + AtomicReference flushAcknowledgementHolder = new AtomicReference<>(); + communicator.flushJob(params, (f, e) -> flushAcknowledgementHolder.set(f)); + assertThat(flushAcknowledgementHolder.get(), equalTo(flushAcknowledgement)); Mockito.verify(process).flushJob(params); } } @@ -83,7 +89,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { when(process.isProcessAlive()).thenReturn(true); AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class); when(processor.isFailed()).thenReturn(true); - when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(false); + when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(null); AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor); expectThrows(ElasticsearchException.class, () -> communicator.waitFlushToCompletion("foo")); } @@ -103,8 +109,9 @@ public class AutodetectCommunicatorTests extends ESTestCase { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class); + FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); when(autoDetectResultProcessor.waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1)))) - .thenReturn(false).thenReturn(true); + .thenReturn(null).thenReturn(flushAcknowledgement); FlushJobParams params = FlushJobParams.builder().build(); try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autoDetectResultProcessor)) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index dc7ba3de7fb..2cb2e99baef 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.settings.Settings; @@ -134,7 +135,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobTask.getJobId()).thenReturn(job.getId()); AtomicReference errorHolder = new AtomicReference<>(); - manager.openJob(jobTask, false, e -> errorHolder.set(e)); + manager.openJob(jobTask, e -> errorHolder.set(e)); Exception error = errorHolder.get(); assertThat(error, is(notNullValue())); @@ -150,7 +151,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); when(jobTask.getAllocationId()).thenReturn(1L); - manager.openJob(jobTask, false, e -> {}); + manager.openJob(jobTask, e -> {}); assertEquals(1, manager.numberOfOpenJobs()); assertTrue(manager.jobHasActiveAutodetectProcess(jobTask)); verify(jobTask).updatePersistentStatus(eq(new JobTaskStatus(JobState.OPENED, 1L)), any()); @@ -175,7 +176,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(autodetectProcess.isProcessAlive()).thenReturn(true); when(autodetectProcess.readAutodetectResults()).thenReturn(Collections.emptyIterator()); AutodetectProcessFactory autodetectProcessFactory = - (j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess; + (j, modelSnapshot, quantiles, filters, e, onProcessCrash) -> autodetectProcess; Settings.Builder settings = Settings.builder(); settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3); AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider, @@ -192,22 +193,22 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, false, e -> {}); + manager.openJob(jobTask, e -> {}); jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("bar"); when(jobTask.getAllocationId()).thenReturn(1L); - manager.openJob(jobTask, false, e -> {}); + manager.openJob(jobTask, e -> {}); jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("baz"); when(jobTask.getAllocationId()).thenReturn(2L); - manager.openJob(jobTask, false, e -> {}); + manager.openJob(jobTask, e -> {}); assertEquals(3, manager.numberOfOpenJobs()); Exception[] holder = new Exception[1]; jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foobar"); when(jobTask.getAllocationId()).thenReturn(3L); - manager.openJob(jobTask, false, e -> holder[0] = e); + manager.openJob(jobTask, e -> holder[0] = e); Exception e = holder[0]; assertEquals("max running job capacity [3] reached", e.getMessage()); @@ -216,7 +217,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobTask.getJobId()).thenReturn("baz"); manager.closeJob(jobTask, false, null); assertEquals(2, manager.numberOfOpenJobs()); - manager.openJob(jobTask, false, e1 -> {}); + manager.openJob(jobTask, e1 -> {}); assertEquals(3, manager.numberOfOpenJobs()); } @@ -228,7 +229,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty()); - manager.openJob(jobTask, false, e -> {}); + manager.openJob(jobTask, e -> {}); manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()), params, (dataCounts1, e) -> {}); assertEquals(1, manager.numberOfOpenJobs()); @@ -250,7 +251,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, false, e -> {}); + manager.openJob(jobTask, e -> {}); Exception[] holder = new Exception[1]; manager.processData(jobTask, inputStream, xContentType, params, (dataCounts1, e) -> holder[0] = e); assertNotNull(holder[0]); @@ -263,7 +264,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, false, e -> {}); + manager.openJob(jobTask, e -> {}); manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -282,7 +283,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { InputStream inputStream = createInputStream(""); JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, false, e -> {}); + manager.openJob(jobTask, e -> {}); manager.processData(jobTask, inputStream, xContentType, params, (dataCounts1, e) -> {}); verify(communicator).writeToJob(same(inputStream), same(xContentType), same(params), any()); } @@ -294,12 +295,12 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); InputStream inputStream = createInputStream(""); - manager.openJob(jobTask, false, e -> {}); + manager.openJob(jobTask, e -> {}); manager.processData(jobTask, inputStream, randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); FlushJobParams params = FlushJobParams.builder().build(); - manager.flushJob(jobTask, params, e -> {}); + manager.flushJob(jobTask, params, ActionListener.wrap(flushAcknowledgement -> {}, e -> fail(e.getMessage()))); verify(communicator).flushJob(same(params), any()); } @@ -318,7 +319,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); Exception[] holder = new Exception[1]; - manager.flushJob(jobTask, params, e -> holder[0] = e); + manager.flushJob(jobTask, params, ActionListener.wrap(flushAcknowledgement -> {}, e -> holder[0] = e)); assertEquals("[foo] exception while flushing job", holder[0].getMessage()); } @@ -333,8 +334,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { // create a jobtask JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, false, e -> { - }); + manager.openJob(jobTask, e -> {}); manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> { }); @@ -366,7 +366,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobTask.getJobId()).thenReturn("foo"); assertFalse(manager.jobHasActiveAutodetectProcess(jobTask)); - manager.openJob(jobTask, false, e -> {}); + manager.openJob(jobTask, e -> {}); manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -385,7 +385,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { assertFalse(manager.jobHasActiveAutodetectProcess(jobTask)); when(communicator.getJobTask()).thenReturn(jobTask); - manager.openJob(jobTask, false, e -> {}); + manager.openJob(jobTask, e -> {}); manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -408,7 +408,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, false, e -> {}); + manager.openJob(jobTask, e -> {}); InputStream inputStream = createInputStream(""); DataCounts[] dataCounts = new DataCounts[1]; manager.processData(jobTask, inputStream, @@ -429,7 +429,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id")); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); AutodetectProcessFactory autodetectProcessFactory = - (j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess; + (j, modelSnapshot, quantiles, filters, e, onProcessCrash) -> autodetectProcess; AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor); @@ -437,7 +437,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("my_id"); expectThrows(EsRejectedExecutionException.class, - () -> manager.create(jobTask, buildAutodetectParams(), false, e -> {})); + () -> manager.create(jobTask, buildAutodetectParams(), e -> {})); verify(autodetectProcess, times(1)).close(); } @@ -447,7 +447,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.create(jobTask, buildAutodetectParams(), false, e -> {}); + manager.create(jobTask, buildAutodetectParams(), e -> {}); String expectedNotification = "Loading model snapshot [N/A], job latest_record_timestamp [N/A]"; verify(auditor).info("foo", expectedNotification); @@ -463,7 +463,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.create(jobTask, buildAutodetectParams(), false, e -> {}); + manager.create(jobTask, buildAutodetectParams(), e -> {}); String expectedNotification = "Loading model snapshot [snapshot-1] with " + "latest_record_timestamp [1970-01-01T00:00:00.000Z], " + @@ -482,7 +482,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.create(jobTask, buildAutodetectParams(), false, e -> {}); + manager.create(jobTask, buildAutodetectParams(), e -> {}); String expectedNotification = "Loading model snapshot [N/A], " + "job latest_record_timestamp [1970-01-01T00:00:00.000Z]"; @@ -501,7 +501,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobManager.getJobOrThrowIfUnknown(jobId)).thenReturn(createJobDetails(jobId)); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); AutodetectProcessFactory autodetectProcessFactory = - (j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess; + (j, modelSnapshot, quantiles, filters, e, onProcessCrash) -> autodetectProcess; return new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor); @@ -531,7 +531,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { autodetectProcessFactory, normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor); manager = spy(manager); - doReturn(communicator).when(manager).create(any(), eq(buildAutodetectParams()), anyBoolean(), any()); + doReturn(communicator).when(manager).create(any(), eq(buildAutodetectParams()), any()); return manager; } @@ -539,7 +539,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = createManager(communicator); JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, false, e -> {}); + manager.openJob(jobTask, e -> {}); manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts, e) -> {}); return manager; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 7f979095428..c3563ebf2e6 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -34,6 +34,8 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeoutException; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; @@ -200,7 +202,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement); processorUnderTest.processResult(context, result); - verify(flushListener, times(1)).acknowledgeFlush(JOB_ID); + verify(flushListener, times(1)).acknowledgeFlush(flushAcknowledgement); verify(persister, times(1)).commitResultWrites(JOB_ID); verify(bulkBuilder, times(1)).executeRequest(); verifyNoMoreInteractions(persister); @@ -225,7 +227,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition); inOrder.verify(bulkBuilder, times(1)).executeRequest(); inOrder.verify(persister, times(1)).commitResultWrites(JOB_ID); - inOrder.verify(flushListener, times(1)).acknowledgeFlush(JOB_ID); + inOrder.verify(flushListener, times(1)).acknowledgeFlush(flushAcknowledgement); verifyNoMoreInteractions(persister); assertTrue(context.deleteInterimRequired); } @@ -342,7 +344,9 @@ public class AutoDetectResultProcessorTests extends ESTestCase { assertTrue(processorUnderTest.isFailed()); // Wait for flush should return immediately - assertFalse(processorUnderTest.waitForFlushAcknowledgement("foo", Duration.of(300, ChronoUnit.SECONDS))); + FlushAcknowledgement flushAcknowledgement = processorUnderTest.waitForFlushAcknowledgement( + "foo", Duration.of(300, ChronoUnit.SECONDS)); + assertThat(flushAcknowledgement, is(nullValue())); } public void testKill() throws TimeoutException { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgementTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgementTests.java index 4a1631227e6..d451bd57267 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgementTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgementTests.java @@ -9,6 +9,8 @@ import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; +import java.util.Date; + public class FlushAcknowledgementTests extends AbstractSerializingTestCase { @Override @@ -18,7 +20,7 @@ public class FlushAcknowledgementTests extends AbstractSerializingTestCase flushAcknowledgementHolder = new AtomicReference<>(); new Thread(() -> { - boolean result = listener.waitForFlush("_id", Duration.ofMillis(10000)); - bool.set(result); + FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000)); + flushAcknowledgementHolder.set(flushAcknowledgement); }).start(); assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id"))); - assertFalse(bool.get()); - listener.acknowledgeFlush("_id"); - assertBusy(() -> assertTrue(bool.get())); + assertNull(flushAcknowledgementHolder.get()); + FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", new Date(12345678L)); + listener.acknowledgeFlush(flushAcknowledgement); + assertBusy(() -> assertNotNull(flushAcknowledgementHolder.get())); assertEquals(1, listener.awaitingFlushed.size()); listener.clear("_id"); @@ -35,27 +39,26 @@ public class FlushListenerTests extends ESTestCase { FlushListener listener = new FlushListener(); int numWaits = 9; - List bools = new ArrayList<>(numWaits); + List> flushAcknowledgementHolders = new ArrayList<>(numWaits); for (int i = 0; i < numWaits; i++) { int id = i; - AtomicBoolean bool = new AtomicBoolean(); - bools.add(bool); + AtomicReference flushAcknowledgementHolder = new AtomicReference<>(); + flushAcknowledgementHolders.add(flushAcknowledgementHolder); new Thread(() -> { - boolean result = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000)); - bool.set(result); + FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000)); + flushAcknowledgementHolder.set(flushAcknowledgement); }).start(); } assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size())); - for (AtomicBoolean bool : bools) { - assertFalse(bool.get()); - } + assertThat(flushAcknowledgementHolders.stream().map(f -> f.get()).filter(f -> f != null).findAny().isPresent(), is(false)); assertFalse(listener.cleared.get()); + listener.clear(); - for (AtomicBoolean bool : bools) { - assertBusy(() -> assertTrue(bool.get())); + + for (AtomicReference f : flushAcknowledgementHolders) { + assertBusy(() -> assertNotNull(f.get())); } assertTrue(listener.awaitingFlushed.isEmpty()); assertTrue(listener.cleared.get()); } - } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParamsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParamsTests.java index e99dc25f14d..ff18807096a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParamsTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParamsTests.java @@ -5,27 +5,31 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.params; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.test.ESTestCase; +import static org.hamcrest.Matchers.equalTo; + public class FlushJobParamsTests extends ESTestCase { + public void testBuilder_GivenDefault() { FlushJobParams params = FlushJobParams.builder().build(); assertFalse(params.shouldCalculateInterim()); assertFalse(params.shouldAdvanceTime()); + assertFalse(params.shouldSkipTime()); assertEquals("", params.getStart()); assertEquals("", params.getEnd()); } - public void testBuilder_GivenCalcInterim() { FlushJobParams params = FlushJobParams.builder().calcInterim(true).build(); assertTrue(params.shouldCalculateInterim()); assertFalse(params.shouldAdvanceTime()); + assertFalse(params.shouldSkipTime()); assertEquals("", params.getStart()); assertEquals("", params.getEnd()); } - public void testBuilder_GivenCalcInterimAndStart() { FlushJobParams params = FlushJobParams.builder() .calcInterim(true) @@ -33,6 +37,7 @@ public class FlushJobParamsTests extends ESTestCase { .build(); assertTrue(params.shouldCalculateInterim()); assertFalse(params.shouldAdvanceTime()); + assertFalse(params.shouldSkipTime()); assertEquals("42", params.getStart()); assertEquals("43", params.getEnd()); } @@ -47,7 +52,6 @@ public class FlushJobParamsTests extends ESTestCase { assertEquals("Invalid flush parameters: 'start' has not been specified.", e.getMessage()); } - public void testBuilder_GivenCalcInterimAndStartAndEnd() { FlushJobParams params = FlushJobParams.builder() .calcInterim(true) @@ -59,7 +63,6 @@ public class FlushJobParamsTests extends ESTestCase { assertEquals("7200", params.getEnd()); } - public void testBuilder_GivenAdvanceTime() { FlushJobParams params = FlushJobParams.builder().advanceTime("1821").build(); assertFalse(params.shouldCalculateInterim()); @@ -69,7 +72,6 @@ public class FlushJobParamsTests extends ESTestCase { assertEquals(1821, params.getAdvanceTime()); } - public void testBuilder_GivenCalcInterimAndAdvanceTime() { FlushJobParams params = FlushJobParams.builder() .calcInterim(true) @@ -82,7 +84,6 @@ public class FlushJobParamsTests extends ESTestCase { assertEquals(1940, params.getAdvanceTime()); } - public void testBuilder_GivenCalcInterimWithTimeRangeAndAdvanceTime() { FlushJobParams params = FlushJobParams.builder() .calcInterim(true) @@ -96,6 +97,27 @@ public class FlushJobParamsTests extends ESTestCase { assertEquals(1940, params.getAdvanceTime()); } + public void testBuilder_GivenAdvanceTimeIsEarlierThanSkipTime() { + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> FlushJobParams.builder().advanceTime("2017-01-01T00:00:00Z").skipTime("2017-02-01T00:00:00Z").build()); + + assertEquals("advance_time [2017-01-01T00:00:00Z] must be later than skip_time [2017-02-01T00:00:00Z]", e.getMessage()); + } + + public void testBuilder_GivenAdvanceTimeIsEqualToSkipTime() { + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> FlushJobParams.builder().advanceTime("2017-01-01T00:00:00Z").skipTime("2017-01-01T00:00:00Z").build()); + + assertEquals("advance_time [2017-01-01T00:00:00Z] must be later than skip_time [2017-01-01T00:00:00Z]", e.getMessage()); + } + + public void testBuilder_GivenAdvanceTimeIsLaterToSkipTime() { + FlushJobParams params = FlushJobParams.builder().advanceTime("2017-02-01T00:00:00Z").skipTime("2017-01-01T00:00:00Z").build(); + + assertThat(params.getSkipTime(), equalTo(1483228800L)); + assertThat(params.getAdvanceTime(), equalTo(1485907200L)); + } + public void testValidate_GivenOnlyStartSpecified() { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> FlushJobParams.builder().forTimeRange(TimeRange.builder().startTime("1").build()).build()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java index c3601f84dd5..7a1b78e3401 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java @@ -41,8 +41,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { public void testWriteFlushControlMessage_GivenAdvanceTime() throws IOException { ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); - FlushJobParams flushJobParams = FlushJobParams.builder() - .advanceTime("1234567890").build(); + FlushJobParams flushJobParams = FlushJobParams.builder().advanceTime("1234567890").build(); writer.writeFlushControlMessage(flushJobParams); @@ -53,6 +52,30 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { verifyNoMoreInteractions(lengthEncodedWriter); } + public void testWriteFlushControlMessage_GivenSkipTime() throws IOException { + ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); + FlushJobParams flushJobParams = FlushJobParams.builder().skipTime("1234567890").build(); + + writer.writeFlushControlMessage(flushJobParams); + + InOrder inOrder = inOrder(lengthEncodedWriter); + inOrder.verify(lengthEncodedWriter).writeNumFields(4); + inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); + inOrder.verify(lengthEncodedWriter).writeField("s1234567890"); + verifyNoMoreInteractions(lengthEncodedWriter); + } + + public void testWriteFlushControlMessage_GivenSkipAndAdvanceTime() throws IOException { + ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); + FlushJobParams flushJobParams = FlushJobParams.builder().skipTime("1000").advanceTime("2000").build(); + + writer.writeFlushControlMessage(flushJobParams); + + InOrder inOrder = inOrder(lengthEncodedWriter); + inOrder.verify(lengthEncodedWriter).writeField("s1000"); + inOrder.verify(lengthEncodedWriter).writeField("t2000"); + } + public void testWriteFlushControlMessage_GivenCalcInterimResultsWithNoTimeParams() throws IOException { ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); FlushJobParams flushJobParams = FlushJobParams.builder() @@ -67,7 +90,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { verifyNoMoreInteractions(lengthEncodedWriter); } - public void testWriteFlushControlMessage_GivenNeitherCalcInterimNorAdvanceTime() throws IOException { + public void testWriteFlushControlMessage_GivenPlainFlush() throws IOException { ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); FlushJobParams flushJobParams = FlushJobParams.builder().build(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java index ee0ecfb05e8..1f9cb78f590 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java @@ -91,7 +91,7 @@ public class AutodetectResultTests extends AbstractSerializingTestCase