From c33f26976db83179d393a237cea9e3c2c6901c57 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 18 Jan 2017 18:46:43 +0000 Subject: [PATCH] Improve field extraction in scheduler (elastic/elasticsearch#748) This commit performs the following improvements: - the time field is always requested as doc_value. This makes specifying a time format for scheduled jobs unnecessary. - adds DataDescription as a param to the PostDataAction. When set, it overrides the job's DataDescription. This allows the scheduler to override the job's DataDescription since it knows the data format (JSON) and the time format (epoch_ms). This is not exposed in the REST API to discourage users from using it. - by default, data extractor search now requests doc_values for analysis fields. This is expected to result in increased performance. - a `_source` field is added to the scheduler config. This needs to be set to true when one or more of the analysis fields do not have doc_values. - the ELASTICSEARCH data format is removed as is now redundant. - fixes the usage of `script_fields`. Previously, setting `script_fields` would result to none of the source to be returned. Thus, is the analysis fields were a mixture of script and non-script fields it would not work. - ensures nested fields are handled properly Closes elastic/elasticsearch#679, Closes elastic/elasticsearch#267 Original commit: elastic/x-pack-elasticsearch@fed35ed354112ad6cc6f69470297bb5a32b656db --- .../xpack/ml/action/PostDataAction.java | 21 +- .../xpack/ml/job/AnalysisConfig.java | 2 +- .../xpack/ml/job/DataDescription.java | 4 +- .../xpack/ml/job/messages/Messages.java | 2 - .../autodetect/AutodetectCommunicator.java | 41 +-- .../autodetect/params/DataLoadParams.java | 15 +- .../writer/AbstractDataToProcessWriter.java | 53 ++-- .../writer/DataToProcessWriterFactory.java | 2 - .../xpack/ml/scheduler/ScheduledJob.java | 9 +- .../ml/scheduler/ScheduledJobRunner.java | 13 +- .../ml/scheduler/ScheduledJobValidator.java | 5 - .../xpack/ml/scheduler/SchedulerConfig.java | 35 ++- .../extractor/SearchHitFieldExtractor.java | 53 ---- .../extractor/scroll/ExtractedField.java | 109 ++++++++ .../extractor/scroll/ExtractedFields.java | 86 ++++++ .../extractor/scroll/ScrollDataExtractor.java | 33 +-- .../scroll/ScrollDataExtractorContext.java | 19 +- .../scroll/ScrollDataExtractorFactory.java | 7 +- .../SearchHitToJsonProcessor.java | 17 +- .../transforms/date/DateFormatTransform.java | 20 +- .../ml/job/messages/ml_messages.properties | 1 - .../xpack/ml/action/ScheduledJobsIT.java | 10 +- .../xpack/ml/integration/ScheduledJobIT.java | 254 +++++++++++++++--- .../xpack/ml/job/DataFormatTests.java | 14 - .../AutodetectProcessManagerTests.java | 5 +- .../ml/job/metadata/MlMetadataTests.java | 9 +- .../AutodetectCommunicatorTests.java | 5 +- .../NativeAutodetectProcessTests.java | 3 +- .../params/DataLoadParamsTests.java | 14 +- .../ControlMsgToProcessWriterTests.java | 4 +- .../DataToProcessWriterFactoryTests.java | 7 - .../ml/scheduler/ScheduledJobRunnerTests.java | 16 +- .../xpack/ml/scheduler/ScheduledJobTests.java | 11 +- .../scheduler/ScheduledJobValidatorTests.java | 18 -- .../ml/scheduler/SchedulerConfigTests.java | 3 + .../SearchHitFieldExtractorTests.java | 62 ----- .../SearchHitToJsonProcessorTests.java | 122 --------- .../extractor/scroll/ExtractedFieldTests.java | 130 +++++++++ .../scroll/ExtractedFieldsTests.java | 80 ++++++ .../scroll/ScrollDataExtractorTests.java | 16 +- .../scroll/SearchHitToJsonProcessorTests.java | 72 +++++ .../date/DateFormatTransformTests.java | 33 +-- .../rest-api-spec/test/get_schedulers.yaml | 6 +- .../test/get_schedulers_stats.yaml | 6 +- .../rest-api-spec/test/jobs_crud.yaml | 2 +- .../rest-api-spec/test/jobs_get_stats.yaml | 2 +- .../rest-api-spec/test/schedulers_crud.yaml | 6 +- .../test/start_stop_scheduler.yaml | 2 +- 48 files changed, 931 insertions(+), 528 deletions(-) delete mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitFieldExtractor.java create mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedField.java create mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFields.java rename elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/{ => scroll}/SearchHitToJsonProcessor.java (66%) delete mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitFieldExtractorTests.java delete mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitToJsonProcessorTests.java create mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFieldTests.java create mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFieldsTests.java create mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/SearchHitToJsonProcessorTests.java diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java index e72f05ca233..57e18ca485f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.job.DataCounts; +import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.manager.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -39,6 +40,7 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; import java.util.Objects; +import java.util.Optional; public class PostDataAction extends Action { @@ -148,6 +150,7 @@ public class PostDataAction extends Action listener) { PostDataTask postDataTask = (PostDataTask) task; TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build(); - DataLoadParams params = new DataLoadParams(timeRange, request.isIgnoreDowntime()); + DataLoadParams params = new DataLoadParams(timeRange, request.isIgnoreDowntime(), + Optional.ofNullable(request.getDataDescription())); threadPool.executor(MlPlugin.THREAD_POOL_NAME).execute(() -> { try { DataCounts dataCounts = processManager.processData(request.getJobId(), request.content.streamInput(), params, diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/AnalysisConfig.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/AnalysisConfig.java index 76eac37913f..025b6351e99 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/AnalysisConfig.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/AnalysisConfig.java @@ -459,7 +459,7 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable { this.detectors = detectors; } - Builder(AnalysisConfig analysisConfig) { + public Builder(AnalysisConfig analysisConfig) { this.detectors = analysisConfig.detectors; this.bucketSpan = analysisConfig.bucketSpan; this.batchSpan = analysisConfig.batchSpan; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/DataDescription.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/DataDescription.java index 72af1eadf9d..323c0274a14 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/DataDescription.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/DataDescription.java @@ -41,9 +41,7 @@ public class DataDescription extends ToXContentToBytes implements Writeable { public enum DataFormat implements Writeable { JSON("json"), DELIMITED("delimited"), - SINGLE_LINE("single_line"), - // TODO norelease, this can now be removed - ELASTICSEARCH("elasticsearch"); + SINGLE_LINE("single_line"); /** * Delimited used to be called delineated. We keep supporting that for backwards diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java index 0f1ce3d639d..a86f1ba364a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java @@ -196,8 +196,6 @@ public final class Messages { public static final String SCHEDULER_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "scheduler.does.not.support.job.with.latency"; public static final String SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD = "scheduler.aggregations.requires.job.with.summary.count.field"; - public static final String SCHEDULER_REQUIRES_JOB_WITH_DATAFORMAT_ELASTICSEARCH = - "scheduler.requires.job.with.dataformat.elasticsearch"; public static final String SCHEDULER_CANNOT_START = "scheduler.cannot.start"; public static final String SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE = "scheduler.cannot.stop.in.current.state"; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index c53058b220b..109485d3499 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.ml.job.AnalysisConfig; import org.elasticsearch.xpack.ml.job.DataCounts; +import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.ModelSizeStats; import org.elasticsearch.xpack.ml.job.messages.Messages; @@ -42,10 +43,9 @@ public class AutodetectCommunicator implements Closeable { private static final Logger LOGGER = Loggers.getLogger(AutodetectCommunicator.class); private static final int DEFAULT_TRY_TIMEOUT_SECS = 30; - private final String jobId; + private final Job job; private final StatusReporter statusReporter; private final AutodetectProcess autodetectProcess; - private final DataToProcessWriter autoDetectWriter; private final AutoDetectResultProcessor autoDetectResultProcessor; final AtomicReference inUse = new AtomicReference<>(); @@ -53,7 +53,7 @@ public class AutodetectCommunicator implements Closeable { public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process, StatusReporter statusReporter, AutoDetectResultProcessor autoDetectResultProcessor, StateProcessor stateProcessor) { - this.jobId = job.getId(); + this.job = job; this.autodetectProcess = process; this.statusReporter = statusReporter; this.autoDetectResultProcessor = autoDetectResultProcessor; @@ -61,29 +61,30 @@ public class AutodetectCommunicator implements Closeable { AnalysisConfig analysisConfig = job.getAnalysisConfig(); boolean usePerPartitionNormalization = analysisConfig.getUsePerPartitionNormalization(); autoDetectExecutor.execute(() -> - autoDetectResultProcessor.process(jobId, process.getProcessOutStream(), usePerPartitionNormalization) + autoDetectResultProcessor.process(job.getId(), process.getProcessOutStream(), usePerPartitionNormalization) ); autoDetectExecutor.execute(() -> - stateProcessor.process(jobId, process.getPersistStream()) + stateProcessor.process(job.getId(), process.getPersistStream()) ); - this.autoDetectWriter = createProcessWriter(job, process, statusReporter); - } - - private DataToProcessWriter createProcessWriter(Job job, AutodetectProcess process, StatusReporter statusReporter) { - return DataToProcessWriterFactory.create(true, process, job.getDataDescription(), job.getAnalysisConfig(), - new TransformConfigs(job.getTransforms()) , statusReporter, LOGGER); } public void writeJobInputHeader() throws IOException { - autoDetectWriter.writeHeader(); + createProcessWriter(Optional.empty()).writeHeader(); + } + + private DataToProcessWriter createProcessWriter(Optional dataDescription) { + return DataToProcessWriterFactory.create(true, autodetectProcess, dataDescription.orElse(job.getDataDescription()), + job.getAnalysisConfig(), new TransformConfigs(job.getTransforms()) , statusReporter, LOGGER); } public DataCounts writeToJob(InputStream inputStream, DataLoadParams params, Supplier cancelled) throws IOException { - return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, jobId), () -> { + return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, job.getId()), () -> { if (params.isResettingBuckets()) { autodetectProcess.writeResetBucketsControlMessage(params); } CountingInputStream countingStream = new CountingInputStream(inputStream, statusReporter); + + DataToProcessWriter autoDetectWriter = createProcessWriter(params.getDataDescription()); DataCounts results = autoDetectWriter.write(countingStream, cancelled); autoDetectWriter.flush(); return results; @@ -92,7 +93,7 @@ public class AutodetectCommunicator implements Closeable { @Override public void close() throws IOException { - checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_CLOSE, jobId), () -> { + checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_CLOSE, job.getId()), () -> { statusReporter.close(); autodetectProcess.close(); autoDetectResultProcessor.awaitCompletion(); @@ -101,7 +102,7 @@ public class AutodetectCommunicator implements Closeable { } public void writeUpdateConfigMessage(String config) throws IOException { - checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, jobId), () -> { + checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> { autodetectProcess.writeUpdateConfigMessage(config); return null; }, false); @@ -112,15 +113,15 @@ public class AutodetectCommunicator implements Closeable { } void flushJob(InterimResultsParams params, int tryTimeoutSecs) throws IOException { - checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_FLUSH, jobId), () -> { + checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_FLUSH, job.getId()), () -> { String flushId = autodetectProcess.flushJob(params); Duration timeout = Duration.ofSeconds(tryTimeoutSecs); - LOGGER.info("[{}] waiting for flush", jobId); + LOGGER.info("[{}] waiting for flush", job.getId()); boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, timeout); - LOGGER.info("[{}] isFlushComplete={}", jobId, isFlushComplete); + LOGGER.info("[{}] isFlushComplete={}", job.getId(), isFlushComplete); if (!isFlushComplete) { - String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_TIMEOUT, jobId) + " " + autodetectProcess.readError(); + String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_TIMEOUT, job.getId()) + " " + autodetectProcess.readError(); LOGGER.error(msg); throw ExceptionsHelper.serverError(msg); } @@ -138,7 +139,7 @@ public class AutodetectCommunicator implements Closeable { private void checkProcessIsAlive() { if (!autodetectProcess.isProcessAlive()) { ParameterizedMessage message = - new ParameterizedMessage("[{}] Unexpected death of autodetect: {}", jobId, autodetectProcess.readError()); + new ParameterizedMessage("[{}] Unexpected death of autodetect: {}", job.getId(), autodetectProcess.readError()); LOGGER.error(message); throw ExceptionsHelper.serverError(message.getFormattedMessage()); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParams.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParams.java index f64ca03e6ef..e2fc9f07f2f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParams.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParams.java @@ -5,19 +5,20 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.params; +import org.elasticsearch.xpack.ml.job.DataDescription; + import java.util.Objects; +import java.util.Optional; public class DataLoadParams { private final TimeRange resetTimeRange; private final boolean ignoreDowntime; + private final Optional dataDescription; - public DataLoadParams(TimeRange resetTimeRange) { - this(resetTimeRange, false); - } - - public DataLoadParams(TimeRange resetTimeRange, boolean ignoreDowntime) { + public DataLoadParams(TimeRange resetTimeRange, boolean ignoreDowntime, Optional dataDescription) { this.resetTimeRange = Objects.requireNonNull(resetTimeRange); this.ignoreDowntime = ignoreDowntime; + this.dataDescription = Objects.requireNonNull(dataDescription); } public boolean isResettingBuckets() { @@ -35,5 +36,9 @@ public class DataLoadParams { public boolean isIgnoreDowntime() { return ignoreDowntime; } + + public Optional getDataDescription() { + return dataDescription; + } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java index c30a51c2d39..ecb8875c798 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java @@ -5,9 +5,25 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.xpack.ml.job.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.DataDescription; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; +import org.elasticsearch.xpack.ml.job.status.StatusReporter; +import org.elasticsearch.xpack.ml.job.transform.TransformConfig; +import org.elasticsearch.xpack.ml.job.transform.TransformConfigs; +import org.elasticsearch.xpack.ml.transforms.DependencySorter; +import org.elasticsearch.xpack.ml.transforms.Transform; +import org.elasticsearch.xpack.ml.transforms.Transform.TransformIndex; +import org.elasticsearch.xpack.ml.transforms.Transform.TransformResult; +import org.elasticsearch.xpack.ml.transforms.TransformException; +import org.elasticsearch.xpack.ml.transforms.TransformFactory; +import org.elasticsearch.xpack.ml.transforms.date.DateFormatTransform; +import org.elasticsearch.xpack.ml.transforms.date.DateTransform; +import org.elasticsearch.xpack.ml.transforms.date.DoubleDateTransform; + import java.io.IOException; -import java.time.ZoneId; -import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -23,26 +39,6 @@ import java.util.Objects; import java.util.Set; import java.util.function.Supplier; -import org.apache.logging.log4j.Logger; - -import org.elasticsearch.tasks.TaskCancelledException; -import org.elasticsearch.xpack.ml.job.AnalysisConfig; -import org.elasticsearch.xpack.ml.job.DataDescription; -import org.elasticsearch.xpack.ml.job.DataDescription.DataFormat; -import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; -import org.elasticsearch.xpack.ml.job.status.StatusReporter; -import org.elasticsearch.xpack.ml.job.transform.TransformConfig; -import org.elasticsearch.xpack.ml.job.transform.TransformConfigs; -import org.elasticsearch.xpack.ml.transforms.DependencySorter; -import org.elasticsearch.xpack.ml.transforms.Transform; -import org.elasticsearch.xpack.ml.transforms.Transform.TransformIndex; -import org.elasticsearch.xpack.ml.transforms.Transform.TransformResult; -import org.elasticsearch.xpack.ml.transforms.TransformException; -import org.elasticsearch.xpack.ml.transforms.TransformFactory; -import org.elasticsearch.xpack.ml.transforms.date.DateFormatTransform; -import org.elasticsearch.xpack.ml.transforms.date.DateTransform; -import org.elasticsearch.xpack.ml.transforms.date.DoubleDateTransform; - public abstract class AbstractDataToProcessWriter implements DataToProcessWriter { protected static final int TIME_FIELD_OUT_INDEX = 0; @@ -175,9 +171,6 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter } protected void buildDateTransform(Map scratchAreaIndexes, Map outFieldIndexes) { - boolean isDateFormatString = dataDescription.isTransformTime() - && !dataDescription.isEpochMs(); - List readIndexes = new ArrayList<>(); Integer index = inFieldIndexes.get(dataDescription.getTimeField()); @@ -202,15 +195,11 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter writeIndexes.add(new TransformIndex(TransformFactory.OUTPUT_ARRAY_INDEX, outFieldIndexes.get(dataDescription.getTimeField()))); + boolean isDateFormatString = dataDescription.isTransformTime() && !dataDescription.isEpochMs(); if (isDateFormatString) { - // Elasticsearch assumes UTC for dates without timezone information. - ZoneId defaultTimezone = dataDescription.getFormat() == DataFormat.ELASTICSEARCH - ? ZoneOffset.UTC : ZoneOffset.systemDefault(); - dateTransform = new DateFormatTransform(dataDescription.getTimeFormat(), - defaultTimezone, readIndexes, writeIndexes, logger); + dateTransform = new DateFormatTransform(dataDescription.getTimeFormat(), readIndexes, writeIndexes, logger); } else { - dateTransform = new DoubleDateTransform(dataDescription.isEpochMs(), - readIndexes, writeIndexes, logger); + dateTransform = new DoubleDateTransform(dataDescription.isEpochMs(), readIndexes, writeIndexes, logger); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactory.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactory.java index 1dd99f61672..808dd67277e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactory.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactory.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; import org.apache.logging.log4j.Logger; - import org.elasticsearch.xpack.ml.job.AnalysisConfig; import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; @@ -36,7 +35,6 @@ public final class DataToProcessWriterFactory { TransformConfigs transforms, StatusReporter statusReporter, Logger logger) { switch (dataDescription.getFormat()) { case JSON: - case ELASTICSEARCH: return new JsonDataToProcessWriter(includeControlField, autodetectProcess, dataDescription, analysisConfig, transforms, statusReporter, logger); case DELIMITED: diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java index 07bfcd61874..712b4eed2ba 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java @@ -14,6 +14,7 @@ import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.xpack.ml.action.FlushJobAction; import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.job.DataCounts; +import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.audit.Auditor; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractor; @@ -22,6 +23,7 @@ import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractorFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,6 +36,7 @@ class ScheduledJob { private final Auditor auditor; private final String jobId; + private final DataDescription dataDescription; private final long frequencyMs; private final long queryDelayMs; private final Client client; @@ -44,10 +47,11 @@ class ScheduledJob { private volatile Long lastEndTimeMs; private AtomicBoolean running = new AtomicBoolean(true); - ScheduledJob(String jobId, long frequencyMs, long queryDelayMs, DataExtractorFactory dataExtractorFactory, - Client client, Auditor auditor, Supplier currentTimeSupplier, + ScheduledJob(String jobId, DataDescription dataDescription, long frequencyMs, long queryDelayMs, + DataExtractorFactory dataExtractorFactory, Client client, Auditor auditor, Supplier currentTimeSupplier, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { this.jobId = jobId; + this.dataDescription = Objects.requireNonNull(dataDescription); this.frequencyMs = frequencyMs; this.queryDelayMs = queryDelayMs; this.dataExtractorFactory = dataExtractorFactory; @@ -187,6 +191,7 @@ class ScheduledJob { private DataCounts postData(InputStream inputStream) throws IOException, ExecutionException, InterruptedException { PostDataAction.Request request = new PostDataAction.Request(jobId); + request.setDataDescription(dataDescription); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); Streams.copy(inputStream, outputStream); request.setContent(new BytesArray(outputStream.toByteArray())); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java index ef402b7abd7..bd43cf97700 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java @@ -20,6 +20,7 @@ import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.action.StartSchedulerAction; import org.elasticsearch.xpack.ml.action.UpdateSchedulerStatusAction; import org.elasticsearch.xpack.ml.job.DataCounts; +import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.JobStatus; import org.elasticsearch.xpack.ml.job.audit.Auditor; @@ -189,7 +190,7 @@ public class ScheduledJobRunner extends AbstractComponent { Duration frequency = getFrequencyOrDefault(scheduler, job); Duration queryDelay = Duration.ofSeconds(scheduler.getConfig().getQueryDelay()); DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(scheduler.getConfig(), job); - ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(), + ScheduledJob scheduledJob = new ScheduledJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(), dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs); Holder holder = new Holder(scheduler, scheduledJob, new ProblemTracker(() -> auditor), handler); task.setHolder(holder); @@ -200,6 +201,16 @@ public class ScheduledJobRunner extends AbstractComponent { return new ScrollDataExtractorFactory(client, schedulerConfig, job); } + private static DataDescription buildDataDescription(Job job) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.JSON); + if (job.getDataDescription() != null) { + dataDescription.setTimeField(job.getDataDescription().getTimeField()); + } + dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + return dataDescription.build(); + } + private void gatherInformation(String jobId, BiConsumer, DataCounts> handler, Consumer errorHandler) { BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder() .sortField(Bucket.TIMESTAMP.getPreferredName()) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidator.java index 66147883ddb..8e6d919c192 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidator.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.scheduler; import org.elasticsearch.xpack.ml.job.AnalysisConfig; -import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.messages.Messages; @@ -28,9 +27,5 @@ public final class ScheduledJobValidator { throw new IllegalArgumentException( Messages.getMessage(Messages.SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, SchedulerConfig.DOC_COUNT)); } - DataDescription dataDescription = job.getDataDescription(); - if (dataDescription == null || dataDescription.getFormat() != DataDescription.DataFormat.ELASTICSEARCH) { - throw new IllegalArgumentException(Messages.getMessage(Messages.SCHEDULER_REQUIRES_JOB_WITH_DATAFORMAT_ELASTICSEARCH)); - } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfig.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfig.java index f56e518bfe2..91f1a6a4bb3 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfig.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfig.java @@ -62,6 +62,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { public static final ParseField AGGREGATIONS = new ParseField("aggregations"); public static final ParseField AGGS = new ParseField("aggs"); public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields"); + public static final ParseField SOURCE = new ParseField("_source"); public static final ObjectParser PARSER = new ObjectParser<>("scheduler_config", Builder::new); @@ -86,6 +87,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { return parsedScriptFields; }, SCRIPT_FIELDS); PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE); + PARSER.declareBoolean(Builder::setSource, SOURCE); } private final String id; @@ -107,10 +109,11 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { private final AggregatorFactories.Builder aggregations; private final List scriptFields; private final Integer scrollSize; + private final boolean source; private SchedulerConfig(String id, String jobId, Long queryDelay, Long frequency, List indexes, List types, QueryBuilder query, AggregatorFactories.Builder aggregations, - List scriptFields, Integer scrollSize) { + List scriptFields, Integer scrollSize, boolean source) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -121,6 +124,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { this.aggregations = aggregations; this.scriptFields = scriptFields; this.scrollSize = scrollSize; + this.source = source; } public SchedulerConfig(StreamInput in) throws IOException { @@ -146,6 +150,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { this.scriptFields = null; } this.scrollSize = in.readOptionalVInt(); + this.source = in.readBoolean(); } public String getId() { @@ -171,7 +176,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { * @return The indexes to search, or null if not set. */ public List getIndexes() { - return this.indexes; + return indexes; } /** @@ -181,11 +186,15 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { * @return The types to search, or null if not set. */ public List getTypes() { - return this.types; + return types; } public Integer getScrollSize() { - return this.scrollSize; + return scrollSize; + } + + public boolean isSource() { + return source; } public QueryBuilder getQuery() { @@ -227,6 +236,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { out.writeBoolean(false); } out.writeOptionalVInt(scrollSize); + out.writeBoolean(source); } @Override @@ -258,6 +268,9 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { builder.endObject(); } builder.field(SCROLL_SIZE.getPreferredName(), scrollSize); + if (source) { + builder.field(SOURCE.getPreferredName(), source); + } return builder; } @@ -287,12 +300,13 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { && Objects.equals(this.query, that.query) && Objects.equals(this.scrollSize, that.scrollSize) && Objects.equals(this.aggregations, that.aggregations) - && Objects.equals(this.scriptFields, that.scriptFields); + && Objects.equals(this.scriptFields, that.scriptFields) + && Objects.equals(this.source, that.source); } @Override public int hashCode() { - return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, aggregations, scriptFields); + return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, aggregations, scriptFields, source); } public static class Builder { @@ -310,6 +324,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { private AggregatorFactories.Builder aggregations; private List scriptFields; private Integer scrollSize = DEFAULT_SCROLL_SIZE; + private boolean source = false; public Builder() { } @@ -331,6 +346,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { this.aggregations = config.aggregations; this.scriptFields = config.scriptFields; this.scrollSize = config.scrollSize; + this.source = config.source; } public void setId(String schedulerId) { @@ -390,6 +406,10 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { this.scrollSize = scrollSize; } + public void setSource(boolean enabled) { + this.source = enabled; + } + public SchedulerConfig build() { ExceptionsHelper.requireNonNull(id, ID.getPreferredName()); ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); @@ -402,7 +422,8 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { if (types == null || types.isEmpty() || types.contains(null) || types.contains("")) { throw invalidOptionValue(TYPES.getPreferredName(), types); } - return new SchedulerConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize); + return new SchedulerConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize, + source); } private static ElasticsearchException invalidOptionValue(String fieldName, Object value) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitFieldExtractor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitFieldExtractor.java deleted file mode 100644 index 045571cb8cc..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitFieldExtractor.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.scheduler.extractor; - -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHitField; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -public final class SearchHitFieldExtractor { - - private SearchHitFieldExtractor() {} - - public static Object[] extractField(SearchHit hit, String field) { - SearchHitField keyValue = hit.field(field); - if (keyValue != null) { - List values = keyValue.values(); - return values.toArray(new Object[values.size()]); - } else { - return extractFieldFromSource(hit.getSource(), field); - } - } - - private static Object[] extractFieldFromSource(Map source, String field) { - if (source != null) { - Object values = source.get(field); - if (values != null) { - if (values instanceof Object[]) { - return (Object[]) values; - } else { - return new Object[]{values}; - } - } - } - return new Object[0]; - } - - public static Long extractTimeField(SearchHit hit, String timeField) { - Object[] fields = extractField(hit, timeField); - if (fields.length != 1) { - throw new RuntimeException("Time field [" + timeField + "] expected a single value; actual was: " + Arrays.toString(fields)); - } - if (fields[0] instanceof Long) { - return (Long) fields[0]; - } - throw new RuntimeException("Time field [" + timeField + "] expected a long value; actual was: " + fields[0]); - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedField.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedField.java new file mode 100644 index 00000000000..781e2d1aa08 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedField.java @@ -0,0 +1,109 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.scheduler.extractor.scroll; + +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHitField; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +abstract class ExtractedField { + + public enum ExtractionMethod { + SOURCE, DOC_VALUE, SCRIPT_FIELD + } + + protected final String name; + private final ExtractionMethod extractionMethod; + + protected ExtractedField(String name, ExtractionMethod extractionMethod) { + this.name = Objects.requireNonNull(name); + this.extractionMethod = Objects.requireNonNull(extractionMethod); + } + + public String getName() { + return name; + } + + public ExtractionMethod getExtractionMethod() { + return extractionMethod; + } + + public abstract Object[] value(SearchHit hit); + + public static ExtractedField newField(String name, ExtractionMethod extractionMethod) { + switch (extractionMethod) { + case DOC_VALUE: + case SCRIPT_FIELD: + return new FromFields(name, extractionMethod); + case SOURCE: + return new FromSource(name, extractionMethod); + default: + throw new IllegalArgumentException("Invalid extraction method [" + extractionMethod + "]"); + } + } + + private static class FromFields extends ExtractedField { + + public FromFields(String name, ExtractionMethod extractionMethod) { + super(name, extractionMethod); + } + + @Override + public Object[] value(SearchHit hit) { + SearchHitField keyValue = hit.field(name); + if (keyValue != null) { + List values = keyValue.values(); + return values.toArray(new Object[values.size()]); + } + return new Object[0]; + } + } + + private static class FromSource extends ExtractedField { + + private String[] namePath; + + public FromSource(String name, ExtractionMethod extractionMethod) { + super(name, extractionMethod); + namePath = name.split("\\."); + } + + @Override + public Object[] value(SearchHit hit) { + Map source = hit.getSource(); + int level = 0; + while (source != null && level < namePath.length - 1) { + source = getNextLevel(source, namePath[level]); + level++; + } + if (source != null) { + Object values = source.get(namePath[level]); + if (values != null) { + if (values instanceof List) { + @SuppressWarnings("unchecked") + List asList = (List) values; + return asList.toArray(new Object[asList.size()]); + } else { + return new Object[]{values}; + } + } + } + return new Object[0]; + } + + @SuppressWarnings("unchecked") + private static Map getNextLevel(Map source, String key) { + Object nextLevel = source.get(key); + if (nextLevel instanceof Map) { + return (Map) source.get(key); + } + return null; + } + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFields.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFields.java new file mode 100644 index 00000000000..874be00c9cb --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFields.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.scheduler.extractor.scroll; + +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.ml.job.Job; +import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +class ExtractedFields { + + private final ExtractedField timeField; + private final List allFields; + + public ExtractedFields(ExtractedField timeField, List allFields) { + if (!allFields.contains(timeField)) { + throw new IllegalArgumentException("timeField should also be contained in allFields"); + } + this.timeField = Objects.requireNonNull(timeField); + this.allFields = Collections.unmodifiableList(allFields); + } + + public List getAllFields() { + return allFields; + } + + public String[] getSourceFields() { + return filterFields(ExtractedField.ExtractionMethod.SOURCE); + } + + public String[] getDocValueFields() { + return filterFields(ExtractedField.ExtractionMethod.DOC_VALUE); + } + + private String[] filterFields(ExtractedField.ExtractionMethod method) { + List result = new ArrayList<>(); + for (ExtractedField field : allFields) { + if (field.getExtractionMethod() == method) { + result.add(field.getName()); + } + } + return result.toArray(new String[result.size()]); + } + + public String timeField() { + return timeField.getName(); + } + + public Long timeFieldValue(SearchHit hit) { + Object[] value = timeField.value(hit); + if (value.length != 1) { + throw new RuntimeException("Time field [" + timeField.getName() + "] expected a single value; actual was: " + + Arrays.toString(value)); + } + if (value[0] instanceof Long) { + return (Long) value[0]; + } + throw new RuntimeException("Time field [" + timeField.getName() + "] expected a long value; actual was: " + value[0]); + } + + public static ExtractedFields build(Job job, SchedulerConfig schedulerConfig) { + Set scriptFields = schedulerConfig.getScriptFields().stream().map(sf -> sf.fieldName()).collect(Collectors.toSet()); + String timeField = job.getDataDescription().getTimeField(); + ExtractedField timeExtractedField = ExtractedField.newField(timeField, scriptFields.contains(timeField) ? + ExtractedField.ExtractionMethod.SCRIPT_FIELD : ExtractedField.ExtractionMethod.DOC_VALUE); + List remainingFields = job.allFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList()); + List allExtractedFields = new ArrayList<>(remainingFields.size()); + allExtractedFields.add(timeExtractedField); + for (String field : remainingFields) { + ExtractedField.ExtractionMethod method = scriptFields.contains(field) ? ExtractedField.ExtractionMethod.SCRIPT_FIELD : + schedulerConfig.isSource() ? ExtractedField.ExtractionMethod.SOURCE : ExtractedField.ExtractionMethod.DOC_VALUE; + allExtractedFields.add(ExtractedField.newField(field, method)); + } + return new ExtractedFields(timeExtractedField, allExtractedFields); + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java index d6af67da45b..736c3e06f4e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java @@ -19,13 +19,9 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.aggregations.AggregationBuilder; -import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractor; -import org.elasticsearch.xpack.ml.scheduler.extractor.SearchHitFieldExtractor; -import org.elasticsearch.xpack.ml.scheduler.extractor.SearchHitToJsonProcessor; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -104,20 +100,22 @@ class ScrollDataExtractor implements DataExtractor { private SearchRequestBuilder buildSearchRequest() { SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client) .setScroll(SCROLL_TIMEOUT) - .addSort(context.timeField, SortOrder.ASC) + .addSort(context.extractedFields.timeField(), SortOrder.ASC) .setIndices(context.indexes) .setTypes(context.types) .setSize(context.scrollSize) .setQuery(createQuery()); - if (context.aggregations != null) { - searchRequestBuilder.setSize(0); - for (AggregationBuilder aggregationBuilder : context.aggregations.getAggregatorFactories()) { - searchRequestBuilder.addAggregation(aggregationBuilder); - } - for (PipelineAggregationBuilder pipelineAggregationBuilder : context.aggregations.getPipelineAggregatorFactories()) { - searchRequestBuilder.addAggregation(pipelineAggregationBuilder); - } + + for (String docValueField : context.extractedFields.getDocValueFields()) { + searchRequestBuilder.addDocValueField(docValueField); } + String[] sourceFields = context.extractedFields.getSourceFields(); + if (sourceFields.length == 0) { + searchRequestBuilder.setFetchSource(false); + } else { + searchRequestBuilder.setFetchSource(sourceFields, null); + } + for (SearchSourceBuilder.ScriptField scriptField : context.scriptFields) { searchRequestBuilder.addScriptField(scriptField.fieldName(), scriptField.script()); } @@ -132,10 +130,10 @@ class ScrollDataExtractor implements DataExtractor { } ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(context.jobFields, outputStream)) { + try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(context.extractedFields, outputStream)) { for (SearchHit hit : searchResponse.getHits().hits()) { if (isCancelled) { - Long timestamp = SearchHitFieldExtractor.extractTimeField(hit, context.timeField); + Long timestamp = context.extractedFields.timeFieldValue(hit); if (timestamp != null) { if (timestampOnCancel == null) { timestampOnCancel = timestamp; @@ -170,7 +168,10 @@ class ScrollDataExtractor implements DataExtractor { private QueryBuilder createQuery() { QueryBuilder userQuery = context.query; - QueryBuilder timeQuery = new RangeQueryBuilder(context.timeField).gte(context.start).lt(context.end).format("epoch_millis"); + QueryBuilder timeQuery = new RangeQueryBuilder(context.extractedFields.timeField()) + .gte(context.start) + .lt(context.end) + .format("epoch_millis"); return new BoolQueryBuilder().filter(userQuery).filter(timeQuery); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorContext.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorContext.java index ca040e8b854..03a60474261 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorContext.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorContext.java @@ -5,39 +5,32 @@ */ package org.elasticsearch.xpack.ml.scheduler.extractor.scroll; -import org.elasticsearch.common.Nullable; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.util.List; import java.util.Objects; -public class ScrollDataExtractorContext { +class ScrollDataExtractorContext { final String jobId; - final String[] jobFields; - final String timeField; + final ExtractedFields extractedFields; final String[] indexes; final String[] types; final QueryBuilder query; - @Nullable - final AggregatorFactories.Builder aggregations; final List scriptFields; final int scrollSize; final long start; final long end; - public ScrollDataExtractorContext(String jobId, List jobFields, String timeField, List indexes, List types, - QueryBuilder query, @Nullable AggregatorFactories.Builder aggregations, - List scriptFields, int scrollSize, long start, long end) { + public ScrollDataExtractorContext(String jobId, ExtractedFields extractedFields, List indexes, List types, + QueryBuilder query, List scriptFields, int scrollSize, + long start, long end) { this.jobId = Objects.requireNonNull(jobId); - this.jobFields = jobFields.toArray(new String[jobFields.size()]); - this.timeField = Objects.requireNonNull(timeField); + this.extractedFields = Objects.requireNonNull(extractedFields); this.indexes = indexes.toArray(new String[indexes.size()]); this.types = types.toArray(new String[types.size()]); this.query = Objects.requireNonNull(query); - this.aggregations = aggregations; this.scriptFields = Objects.requireNonNull(scriptFields); this.scrollSize = scrollSize; this.start = start; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorFactory.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorFactory.java index 6f8fcc73cfb..fea27652ac4 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorFactory.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorFactory.java @@ -18,24 +18,23 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory { private final Client client; private final SchedulerConfig schedulerConfig; private final Job job; + private final ExtractedFields extractedFields; public ScrollDataExtractorFactory(Client client, SchedulerConfig schedulerConfig, Job job) { this.client = Objects.requireNonNull(client); this.schedulerConfig = Objects.requireNonNull(schedulerConfig); this.job = Objects.requireNonNull(job); + this.extractedFields = ExtractedFields.build(job, schedulerConfig); } @Override public DataExtractor newExtractor(long start, long end) { - String timeField = job.getDataDescription().getTimeField(); ScrollDataExtractorContext dataExtractorContext = new ScrollDataExtractorContext( job.getId(), - job.allFields(), - timeField, + extractedFields, schedulerConfig.getIndexes(), schedulerConfig.getTypes(), schedulerConfig.getQuery(), - schedulerConfig.getAggregations(), schedulerConfig.getScriptFields(), schedulerConfig.getScrollSize(), start, diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitToJsonProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/SearchHitToJsonProcessor.java similarity index 66% rename from elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitToJsonProcessor.java rename to elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/SearchHitToJsonProcessor.java index c7ceb41d76c..51003c62f75 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitToJsonProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/SearchHitToJsonProcessor.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.scheduler.extractor; +package org.elasticsearch.xpack.ml.scheduler.extractor.scroll; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -12,21 +12,22 @@ import org.elasticsearch.search.SearchHit; import java.io.IOException; import java.io.OutputStream; +import java.util.Objects; -public class SearchHitToJsonProcessor implements Releasable { +class SearchHitToJsonProcessor implements Releasable { - private final String[] fields; + private final ExtractedFields fields; private final XContentBuilder jsonBuilder; - public SearchHitToJsonProcessor(String[] fields, OutputStream outputStream) throws IOException { - this.fields = fields; - jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream); + public SearchHitToJsonProcessor(ExtractedFields fields, OutputStream outputStream) throws IOException { + this.fields = Objects.requireNonNull(fields); + this.jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream); } public void process(SearchHit hit) throws IOException { jsonBuilder.startObject(); - for (String field : fields) { - writeKeyValue(field, SearchHitFieldExtractor.extractField(hit, field)); + for (ExtractedField field : fields.getAllFields()) { + writeKeyValue(field.getName(), field.value(hit)); } jsonBuilder.endObject(); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransform.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransform.java index 6642f2775ec..90c1206b098 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransform.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransform.java @@ -5,17 +5,16 @@ */ package org.elasticsearch.xpack.ml.transforms.date; -import java.time.ZoneId; -import java.time.format.DateTimeParseException; -import java.util.List; -import java.util.Locale; - import org.apache.logging.log4j.Logger; - import org.elasticsearch.xpack.ml.transforms.TransformException; import org.elasticsearch.xpack.ml.utils.time.DateTimeFormatterTimestampConverter; import org.elasticsearch.xpack.ml.utils.time.TimestampConverter; +import java.time.ZoneOffset; +import java.time.format.DateTimeParseException; +import java.util.List; +import java.util.Locale; + /** * A transform that attempts to parse a String timestamp * according to a timeFormat. It converts that @@ -25,12 +24,11 @@ public class DateFormatTransform extends DateTransform { private final String timeFormat; private final TimestampConverter dateToEpochConverter; - public DateFormatTransform(String timeFormat, ZoneId defaultTimezone, - List readIndexes, List writeIndexes, Logger logger) { + public DateFormatTransform(String timeFormat, List readIndexes, List writeIndexes, Logger logger) { super(readIndexes, writeIndexes, logger); this.timeFormat = timeFormat; - dateToEpochConverter = DateTimeFormatterTimestampConverter.ofPattern(timeFormat, defaultTimezone); + dateToEpochConverter = DateTimeFormatterTimestampConverter.ofPattern(timeFormat, ZoneOffset.systemDefault()); } @Override @@ -38,9 +36,7 @@ public class DateFormatTransform extends DateTransform { try { return dateToEpochConverter.toEpochMillis(field); } catch (DateTimeParseException pe) { - String message = String.format(Locale.ROOT, "Cannot parse date '%s' with format string '%s'", - field, timeFormat); - + String message = String.format(Locale.ROOT, "Cannot parse date '%s' with format string '%s'", field, timeFormat); throw new ParseTimestampException(message); } } diff --git a/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties b/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties index 00305d0ff66..7cf953c4baa 100644 --- a/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties +++ b/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties @@ -144,7 +144,6 @@ scheduler.config.invalid.option.value = Invalid {0} value ''{1}'' in scheduler c scheduler.does.not.support.job.with.latency = A job configured with scheduler cannot support latency scheduler.aggregations.requires.job.with.summary.count.field = A job configured with a scheduler with aggregations must have summary_count_field_name ''{0}'' -scheduler.requires.job.with.dataformat.elasticsearch = A job configured with a scheduler must have dataFormat ''ELASTICSEARCH'' job.data.concurrent.use.close = Cannot close job {0} while another connection {2}is {1} the job job.data.concurrent.use.flush = Cannot flush job {0} while another connection {2}is {1} the job diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java index 38c10cdbcf6..bdd8a51f668 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java @@ -97,7 +97,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { client().execute(StartSchedulerAction.INSTANCE, startSchedulerRequest).get(); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); - assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs + numDocs2)); + assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2)); assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); MlMetadata mlMetadata = client().admin().cluster().prepareState().all().get() @@ -139,7 +139,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { t.start(); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); - assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs1)); + assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1)); assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); }); @@ -148,7 +148,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { indexDocs("data", numDocs2, now + 5000, now + 6000); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); - assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs1 + numDocs2)); + assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1 + numDocs2)); assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); }, 30, TimeUnit.SECONDS); @@ -182,8 +182,8 @@ public class ScheduledJobsIT extends ESIntegTestCase { private Job.Builder createJob() { DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + dataDescription.setFormat(DataDescription.DataFormat.JSON); + dataDescription.setTimeFormat("yyyy-MM-dd HH:mm:ss"); Detector.Builder d = new Detector.Builder("count", null); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledJobIT.java index b653d2c3849..f8220a4e787 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledJobIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.ml.MlPlugin; import org.junit.After; +import org.junit.Before; import java.io.BufferedReader; import java.io.IOException; @@ -25,35 +26,125 @@ import static org.hamcrest.Matchers.equalTo; public class ScheduledJobIT extends ESRestTestCase { - public void testStartJobScheduler_GivenLookbackOnly() throws Exception { - String jobId = "job-1"; - createAirlineDataIndex(); - createJob(jobId); - String schedulerId = "sched-1"; - createScheduler(schedulerId, jobId); - openJob(client(), jobId); + @Before + public void setUpData() throws Exception { + // Create index with source = enabled, doc_values = enabled, stored = false + String mappings = "{" + + " \"mappings\": {" + + " \"response\": {" + + " \"properties\": {" + + " \"time stamp\": { \"type\":\"date\"}," // space in 'time stamp' is intentional + + " \"airline\": { \"type\":\"keyword\"}," + + " \"responsetime\": { \"type\":\"float\"}" + + " }" + + " }" + + " }" + + "}"; + client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings)); - Response startSchedulerRequest = client().performRequest("post", - MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z"); - assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200)); - assertThat(responseEntityToString(startSchedulerRequest), containsString("{\"task\":\"")); - assertBusy(() -> { - try { - Response getJobResponse = client().performRequest("get", - MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); - assertThat(responseEntityToString(getJobResponse), containsString("\"input_record_count\":2")); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + client().performRequest("put", "airline-data/response/1", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}")); + client().performRequest("put", "airline-data/response/2", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}")); + + // Create index with source = enabled, doc_values = disabled (except time), stored = false + mappings = "{" + + " \"mappings\": {" + + " \"response\": {" + + " \"properties\": {" + + " \"time stamp\": { \"type\":\"date\"}," + + " \"airline\": { \"type\":\"keyword\", \"doc_values\":false}," + + " \"responsetime\": { \"type\":\"float\", \"doc_values\":false}" + + " }" + + " }" + + " }" + + "}"; + client().performRequest("put", "airline-data-disabled-doc-values", Collections.emptyMap(), new StringEntity(mappings)); + + client().performRequest("put", "airline-data-disabled-doc-values/response/1", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}")); + client().performRequest("put", "airline-data-disabled-doc-values/response/2", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}")); + + // Create index with source = disabled, doc_values = enabled (except time), stored = true + mappings = "{" + + " \"mappings\": {" + + " \"response\": {" + + " \"_source\":{\"enabled\":false}," + + " \"properties\": {" + + " \"time stamp\": { \"type\":\"date\", \"store\":true}," + + " \"airline\": { \"type\":\"keyword\", \"store\":true}," + + " \"responsetime\": { \"type\":\"float\", \"store\":true}" + + " }" + + " }" + + " }" + + "}"; + client().performRequest("put", "airline-data-disabled-source", Collections.emptyMap(), new StringEntity(mappings)); + + client().performRequest("put", "airline-data-disabled-source/response/1", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}")); + client().performRequest("put", "airline-data-disabled-source/response/2", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}")); + + // Create index with nested documents + mappings = "{" + + " \"mappings\": {" + + " \"response\": {" + + " \"properties\": {" + + " \"time\": { \"type\":\"date\"}" + + " }" + + " }" + + " }" + + "}"; + client().performRequest("put", "nested-data", Collections.emptyMap(), new StringEntity(mappings)); + + client().performRequest("put", "nested-data/response/1", Collections.emptyMap(), + new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\", \"responsetime\":{\"millis\":135.22}}")); + client().performRequest("put", "nested-data/response/2", Collections.emptyMap(), + new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"responsetime\":{\"millis\":222.0}}")); + + // Ensure all data is searchable + client().performRequest("post", "_refresh"); } - public void testStartJobScheduler_GivenRealtime() throws Exception { - String jobId = "job-2"; - createAirlineDataIndex(); + public void testLookbackOnly() throws Exception { + new LookbackOnlyTestHelper("lookback-1", "airline-data").setShouldSucceedProcessing(true).execute(); + } + + public void testLookbackOnlyWithSchedulerSourceEnabled() throws Exception { + new LookbackOnlyTestHelper("lookback-2", "airline-data").setEnableSchedulerSource(true).execute(); + } + + public void testLookbackOnlyWithDocValuesDisabledAndSchedulerSourceDisabled() throws Exception { + new LookbackOnlyTestHelper("lookback-3", "airline-data-disabled-doc-values").setShouldSucceedInput(false) + .setShouldSucceedProcessing(false).execute(); + } + + public void testLookbackOnlyWithDocValuesDisabledAndSchedulerSourceEnabled() throws Exception { + new LookbackOnlyTestHelper("lookback-4", "airline-data-disabled-doc-values").setEnableSchedulerSource(true).execute(); + } + + public void testLookbackOnlyWithSourceDisabled() throws Exception { + new LookbackOnlyTestHelper("lookback-5", "airline-data-disabled-source").execute(); + } + + public void testLookbackOnlyWithScriptFields() throws Exception { + new LookbackOnlyTestHelper("lookback-6", "airline-data-disabled-source").setAddScriptedFields(true).execute(); + } + + public void testLookbackOnlyWithNestedFieldsAndSchedulerSourceDisabled() throws Exception { + executeTestLookbackOnlyWithNestedFields("lookback-7", false); + } + + public void testLookbackOnlyWithNestedFieldsAndSchedulerSourceEnabled() throws Exception { + executeTestLookbackOnlyWithNestedFields("lookback-8", true); + } + + public void testRealtime() throws Exception { + String jobId = "job-realtime-1"; createJob(jobId); - String schedulerId = "sched-2"; - createScheduler(schedulerId, jobId); + String schedulerId = jobId + "-scheduler"; + createScheduler(schedulerId, jobId, "airline-data", false, false); openJob(client(), jobId); Response response = client().performRequest("post", @@ -65,7 +156,7 @@ public class ScheduledJobIT extends ESRestTestCase { Response getJobResponse = client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); String responseAsString = responseEntityToString(getJobResponse); - assertThat(responseAsString, containsString("\"input_record_count\":2")); + assertThat(responseAsString, containsString("\"processed_record_count\":2")); } catch (Exception e1) { throw new RuntimeException(e1); } @@ -93,35 +184,98 @@ public class ScheduledJobIT extends ESRestTestCase { assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}")); } - private void createAirlineDataIndex() throws Exception { - String airlineDataMappings = "{" + " \"mappings\": {" + " \"response\": {" + " \"properties\": {" - + " \"time\": { \"type\":\"date\"}," + " \"airline\": { \"type\":\"keyword\"}," - + " \"responsetime\": { \"type\":\"float\"}" + " }" + " }" + " }" + "}"; - client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(airlineDataMappings)); + private class LookbackOnlyTestHelper { + private String jobId; + private String dataIndex; + private boolean addScriptedFields; + private boolean enableSchedulerSource; + private boolean shouldSucceedInput; + private boolean shouldSucceedProcessing; - client().performRequest("put", "airline-data/response/1", Collections.emptyMap(), - new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}")); - client().performRequest("put", "airline-data/response/2", Collections.emptyMap(), - new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}")); + public LookbackOnlyTestHelper(String jobId, String dataIndex) { + this.jobId = jobId; + this.dataIndex = dataIndex; + this.shouldSucceedInput = true; + this.shouldSucceedProcessing = true; + } - client().performRequest("post", "airline-data/_refresh"); + public LookbackOnlyTestHelper setAddScriptedFields(boolean value) { + addScriptedFields = value; + return this; + } + + public LookbackOnlyTestHelper setEnableSchedulerSource(boolean value) { + enableSchedulerSource = value; + return this; + } + + public LookbackOnlyTestHelper setShouldSucceedInput(boolean value) { + shouldSucceedInput = value; + return this; + } + + public LookbackOnlyTestHelper setShouldSucceedProcessing(boolean value) { + shouldSucceedProcessing = value; + return this; + } + + public void execute() throws Exception { + createJob(jobId); + String schedulerId = "scheduler-" + jobId; + createScheduler(schedulerId, jobId, dataIndex, enableSchedulerSource, addScriptedFields); + openJob(client(), jobId); + + startSchedulerAndWaitUntilStopped(schedulerId); + Response jobStatsResponse = client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); + String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); + if (shouldSucceedInput) { + assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2")); + } else { + assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":0")); + } + if (shouldSucceedProcessing) { + assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2")); + } else { + assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":0")); + } + assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); + } + } + + private void startSchedulerAndWaitUntilStopped(String schedulerId) throws Exception { + Response startSchedulerRequest = client().performRequest("post", + MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z"); + assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(responseEntityToString(startSchedulerRequest), containsString("{\"task\":\"")); + assertBusy(() -> { + try { + Response schedulerStatsResponse = client().performRequest("get", + MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_stats"); + assertThat(responseEntityToString(schedulerStatsResponse), containsString("\"status\":\"STOPPED\"")); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } private Response createJob(String id) throws Exception { String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n" + " \"analysis_config\" : {\n" + " \"bucket_span\":3600,\n" + " \"detectors\" :[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n" - + " },\n" + " \"data_description\" : {\n" + " \"format\":\"ELASTICSEARCH\",\n" - + " \"time_field\":\"time\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n" + + " },\n" + " \"data_description\" : {\n" + " \"format\":\"JSON\",\n" + + " \"time_field\":\"time stamp\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n" + "}"; return client().performRequest("put", MlPlugin.BASE_PATH + "anomaly_detectors/" + id, Collections.emptyMap(), new StringEntity(job)); } - private Response createScheduler(String schedulerId, String jobId) throws IOException { - String schedulerConfig = "{" + "\"job_id\": \"" + jobId + "\",\n" + "\"indexes\":[\"airline-data\"],\n" - + "\"types\":[\"response\"]\n" + "}"; + private Response createScheduler(String schedulerId, String jobId, String dataIndex, boolean source, boolean addScriptedFields) + throws IOException { + String schedulerConfig = "{" + "\"job_id\": \"" + jobId + "\",\n" + "\"indexes\":[\"" + dataIndex + "\"],\n" + + "\"types\":[\"response\"]" + (source ? ",\"_source\":true" : "") + (addScriptedFields ? + ",\"script_fields\":{\"airline\":{\"script\":{\"lang\":\"painless\",\"inline\":\"doc['airline'].value\"}}}" : "") + +"}"; return client().performRequest("put", MlPlugin.BASE_PATH + "schedulers/" + schedulerId, Collections.emptyMap(), new StringEntity(schedulerConfig)); } @@ -137,6 +291,24 @@ public class ScheduledJobIT extends ESRestTestCase { assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); } + private void executeTestLookbackOnlyWithNestedFields(String jobId, boolean source) throws Exception { + String job = "{\"description\":\"Nested job\", \"analysis_config\" : {\"bucket_span\":3600,\"detectors\" :" + + "[{\"function\":\"mean\",\"field_name\":\"responsetime.millis\"}]}, \"data_description\" : {\"time_field\":\"time\"}" + + "}"; + client().performRequest("put", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), new StringEntity(job)); + + String schedulerId = jobId + "-scheduler"; + createScheduler(schedulerId, jobId, "nested-data", source, false); + openJob(client(), jobId); + + startSchedulerAndWaitUntilStopped(schedulerId); + Response jobStatsResponse = client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); + String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); + assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2")); + assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2")); + assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); + } + @After public void clearMlState() throws Exception { new MlRestTestStateCleaner(client(), this).clearMlMetadata(); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/DataFormatTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/DataFormatTests.java index a2b7c7f6706..adbcd7a4b65 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/DataFormatTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/DataFormatTests.java @@ -34,7 +34,6 @@ public class DataFormatTests extends ESTestCase { assertThat(DataFormat.JSON.ordinal(), equalTo(0)); assertThat(DataFormat.DELIMITED.ordinal(), equalTo(1)); assertThat(DataFormat.SINGLE_LINE.ordinal(), equalTo(2)); - assertThat(DataFormat.ELASTICSEARCH.ordinal(), equalTo(3)); } public void testwriteTo() throws Exception { @@ -58,13 +57,6 @@ public class DataFormatTests extends ESTestCase { assertThat(in.readVInt(), equalTo(2)); } } - - try (BytesStreamOutput out = new BytesStreamOutput()) { - DataFormat.ELASTICSEARCH.writeTo(out); - try (StreamInput in = out.bytes().streamInput()) { - assertThat(in.readVInt(), equalTo(3)); - } - } } public void testReadFrom() throws Exception { @@ -86,12 +78,6 @@ public class DataFormatTests extends ESTestCase { assertThat(DataFormat.readFromStream(in), equalTo(DataFormat.SINGLE_LINE)); } } - try (BytesStreamOutput out = new BytesStreamOutput()) { - out.writeVInt(3); - try (StreamInput in = out.bytes().streamInput()) { - assertThat(DataFormat.readFromStream(in), equalTo(DataFormat.ELASTICSEARCH)); - } - } } public void testInvalidReadFrom() throws Exception { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/manager/AutodetectProcessManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/manager/AutodetectProcessManagerTests.java index 2f7defd0634..d89c4eec243 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/manager/AutodetectProcessManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/manager/AutodetectProcessManagerTests.java @@ -44,6 +44,7 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Iterator; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import java.util.function.Supplier; @@ -162,7 +163,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = createManager(communicator); assertEquals(0, manager.numberOfOpenJobs()); - DataLoadParams params = new DataLoadParams(TimeRange.builder().build()); + DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()); manager.openJob("foo", false); manager.processData("foo", createInputStream(""), params, () -> false); assertEquals(1, manager.numberOfOpenJobs()); @@ -202,7 +203,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = createManager(communicator); Supplier cancellable = () -> false; - DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), true); + DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), true, Optional.empty()); InputStream inputStream = createInputStream(""); manager.openJob("foo", false); manager.processData("foo", inputStream, params, cancellable); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java index 4e756c500ca..a9a61c0c39b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.ml.job.AnalysisConfig; -import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.JobStatus; import org.elasticsearch.xpack.ml.job.JobTests; @@ -43,7 +42,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { int numJobs = randomIntBetween(0, 10); for (int i = 0; i < numJobs; i++) { Job job = JobTests.createRandomizedJob(); - if (job.getDataDescription() != null && job.getDataDescription().getFormat() == DataDescription.DataFormat.ELASTICSEARCH) { + if (randomBoolean()) { SchedulerConfig schedulerConfig = SchedulerConfigTests.createRandomizedSchedulerConfig(job.getId()); if (schedulerConfig.getAggregations() != null) { AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job.getAnalysisConfig().getDetectors()); @@ -238,9 +237,9 @@ public class MlMetadataTests extends AbstractSerializingTestCase { public void testPutScheduler_failBecauseJobIsNotCompatibleForScheduler() { Job.Builder job1 = createScheduledJob(); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.DELIMITED); - job1.setDataDescription(dataDescription); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job1.build().getAnalysisConfig()); + analysisConfig.setLatency(3600L); + job1.setAnalysisConfig(analysisConfig); SchedulerConfig schedulerConfig1 = createSchedulerConfig("scheduler1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1.build(), false); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 2bfaba62b3b..f5c2c80119e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -24,6 +24,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Collections; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -39,7 +40,7 @@ import static org.mockito.Mockito.when; public class AutodetectCommunicatorTests extends ESTestCase { public void testWriteResetBucketsControlMessage() throws IOException { - DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build()); + DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build(), false, Optional.empty()); AutodetectProcess process = mockAutodetectProcessWithOutputStream(); try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) { communicator.writeToJob(new ByteArrayInputStream(new byte[0]), params, () -> false); @@ -147,7 +148,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { () -> communicator.writeToJob(in, mock(DataLoadParams.class), () -> false)); communicator.inUse.set(null); - communicator.writeToJob(in, mock(DataLoadParams.class), () -> false); + communicator.writeToJob(in, new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()), () -> false); } public void testFlushInUse() throws IOException { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 3827287416f..30a69a41e5e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; import java.util.Collections; +import java.util.Optional; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; @@ -105,7 +106,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) { - DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), true); + DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), true, Optional.empty()); process.writeResetBucketsControlMessage(params); process.flushStream(); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParamsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParamsTests.java index e68b5a22e2e..c71b28bd37d 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParamsTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParamsTests.java @@ -7,19 +7,21 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.params; import org.elasticsearch.test.ESTestCase; +import java.util.Optional; + public class DataLoadParamsTests extends ESTestCase { public void testGetStart() { - assertEquals("", new DataLoadParams(TimeRange.builder().build()).getStart()); - assertEquals("3", new DataLoadParams(TimeRange.builder().startTime("3").build()).getStart()); + assertEquals("", new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()).getStart()); + assertEquals("3", new DataLoadParams(TimeRange.builder().startTime("3").build(), false, Optional.empty()).getStart()); } public void testGetEnd() { - assertEquals("", new DataLoadParams(TimeRange.builder().build()).getEnd()); - assertEquals("1", new DataLoadParams(TimeRange.builder().endTime("1").build()).getEnd()); + assertEquals("", new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()).getEnd()); + assertEquals("1", new DataLoadParams(TimeRange.builder().endTime("1").build(), false, Optional.empty()).getEnd()); } public void testIsResettingBuckets() { - assertFalse(new DataLoadParams(TimeRange.builder().build()).isResettingBuckets()); - assertTrue(new DataLoadParams(TimeRange.builder().startTime("5").build()).isResettingBuckets()); + assertFalse(new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()).isResettingBuckets()); + assertTrue(new DataLoadParams(TimeRange.builder().startTime("5").build(), false, Optional.empty()).isResettingBuckets()); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java index cfdd8bbccef..968a9a240ac 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java @@ -10,6 +10,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verifyNoMoreInteractions; import java.io.IOException; +import java.util.Optional; import java.util.stream.IntStream; import org.elasticsearch.test.ESTestCase; @@ -127,7 +128,8 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { public void testWriteResetBucketsMessage() throws IOException { ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); - writer.writeResetBucketsMessage(new DataLoadParams(TimeRange.builder().startTime("0").endTime("600").build())); + writer.writeResetBucketsMessage( + new DataLoadParams(TimeRange.builder().startTime("0").endTime("600").build(), false, Optional.empty())); InOrder inOrder = inOrder(lengthEncodedWriter); inOrder.verify(lengthEncodedWriter).writeNumFields(4); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactoryTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactoryTests.java index da3983e99d0..0fbba65c35b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactoryTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactoryTests.java @@ -24,13 +24,6 @@ public class DataToProcessWriterFactoryTests extends ESTestCase { assertTrue(createWriter(dataDescription.build()) instanceof JsonDataToProcessWriter); } - public void testCreate_GivenDataFormatIsElasticsearch() { - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataFormat.ELASTICSEARCH); - - assertTrue(createWriter(dataDescription.build()) instanceof JsonDataToProcessWriter); - } - public void testCreate_GivenDataFormatIsCsv() { DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setFormat(DataFormat.DELIMITED); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunnerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunnerTests.java index 51f173ccc94..c3b7798346b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunnerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunnerTests.java @@ -146,12 +146,21 @@ public class ScheduledJobRunnerTests extends ESTestCase { verify(threadPool, times(1)).executor(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); - verify(client).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo"))); + verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest(job))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STARTED)), any()); verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STOPPED)), any()); } + private static PostDataAction.Request createExpectedPostDataRequest(Job job) { + DataDescription.Builder expectedDataDescription = new DataDescription.Builder(); + expectedDataDescription.setTimeFormat("epoch_ms"); + expectedDataDescription.setFormat(DataDescription.DataFormat.JSON); + PostDataAction.Request expectedPostDataRequest = new PostDataAction.Request(job.getId()); + expectedPostDataRequest.setDataDescription(expectedDataDescription.build()); + return expectedPostDataRequest; + } + public void testStart_extractionProblem() throws Exception { Job.Builder jobBuilder = createScheduledJob(); SchedulerConfig schedulerConfig = createSchedulerConfig("scheduler1", "foo").build(); @@ -213,7 +222,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { task.stop(); verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STOPPED)), any()); } else { - verify(client).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo"))); + verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest(job))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME), any()); } @@ -233,9 +242,6 @@ public class ScheduledJobRunnerTests extends ESTestCase { Job.Builder builder = new Job.Builder("foo"); builder.setAnalysisConfig(acBuilder); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - builder.setDataDescription(dataDescription); return builder; } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobTests.java index 5d0c87ce466..131eb9ec628 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.action.FlushJobAction; import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.job.DataCounts; +import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.audit.Auditor; import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractor; import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractorFactory; @@ -40,6 +41,7 @@ public class ScheduledJobTests extends ESTestCase { private DataExtractorFactory dataExtractorFactory; private DataExtractor dataExtractor; private Client client; + private DataDescription.Builder dataDescription; private ActionFuture flushJobFuture; private long currentTime; @@ -52,6 +54,8 @@ public class ScheduledJobTests extends ESTestCase { dataExtractor = mock(DataExtractor.class); when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor); client = mock(Client.class); + dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.JSON); ActionFuture jobDataFuture = mock(ActionFuture.class); flushJobFuture = mock(ActionFuture.class); currentTime = 0; @@ -60,7 +64,10 @@ public class ScheduledJobTests extends ESTestCase { InputStream inputStream = new ByteArrayInputStream("content".getBytes(StandardCharsets.UTF_8)); when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); - when(client.execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("_job_id")))).thenReturn(jobDataFuture); + + PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id"); + expectedRequest.setDataDescription(dataDescription.build()); + when(client.execute(same(PostDataAction.INSTANCE), eq(expectedRequest))).thenReturn(jobDataFuture); when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); } @@ -176,7 +183,7 @@ public class ScheduledJobTests extends ESTestCase { private ScheduledJob createScheduledJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { Supplier currentTimeSupplier = () -> currentTime; - return new ScheduledJob("_job_id", frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor, + return new ScheduledJob("_job_id", dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor, currentTimeSupplier, latestFinalBucketEndTimeMs, latestRecordTimeMs); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidatorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidatorTests.java index 87a38c2dd4a..20cb881a629 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidatorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidatorTests.java @@ -9,7 +9,6 @@ import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.AnalysisConfig; -import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.Detector; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.messages.Messages; @@ -39,9 +38,6 @@ public class ScheduledJobValidatorTests extends ESTestCase { public void testVerify_GivenZeroLatency() { Job.Builder builder = buildJobBuilder("foo"); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - builder.setDataDescription(dataDescription); AnalysisConfig.Builder ac = createAnalysisConfig(); ac.setBucketSpan(1800L); ac.setLatency(0L); @@ -54,9 +50,6 @@ public class ScheduledJobValidatorTests extends ESTestCase { public void testVerify_GivenNoLatency() { Job.Builder builder = buildJobBuilder("foo"); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - builder.setDataDescription(dataDescription); AnalysisConfig.Builder ac = createAnalysisConfig(); ac.setBatchSpan(1800L); ac.setBucketSpan(100L); @@ -69,9 +62,6 @@ public class ScheduledJobValidatorTests extends ESTestCase { public void testVerify_GivenAggsAndCorrectSummaryCountField() throws IOException { Job.Builder builder = buildJobBuilder("foo"); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - builder.setDataDescription(dataDescription); AnalysisConfig.Builder ac = createAnalysisConfig(); ac.setBucketSpan(1800L); ac.setSummaryCountFieldName("doc_count"); @@ -86,9 +76,6 @@ public class ScheduledJobValidatorTests extends ESTestCase { String errorMessage = Messages.getMessage(Messages.SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, SchedulerConfig.DOC_COUNT); Job.Builder builder = buildJobBuilder("foo"); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - builder.setDataDescription(dataDescription); AnalysisConfig.Builder ac = createAnalysisConfig(); ac.setBucketSpan(1800L); builder.setAnalysisConfig(ac); @@ -105,9 +92,6 @@ public class ScheduledJobValidatorTests extends ESTestCase { String errorMessage = Messages.getMessage( Messages.SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, SchedulerConfig.DOC_COUNT); Job.Builder builder = buildJobBuilder("foo"); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - builder.setDataDescription(dataDescription); AnalysisConfig.Builder ac = createAnalysisConfig(); ac.setBucketSpan(1800L); ac.setSummaryCountFieldName("wrong"); @@ -125,9 +109,7 @@ public class ScheduledJobValidatorTests extends ESTestCase { Job.Builder builder = new Job.Builder(id); builder.setCreateTime(new Date()); AnalysisConfig.Builder ac = createAnalysisConfig(); - DataDescription.Builder dc = new DataDescription.Builder(); builder.setAnalysisConfig(ac); - builder.setDataDescription(dc); return builder; } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfigTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfigTests.java index 615bbedb408..70eec105688 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfigTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfigTests.java @@ -61,6 +61,9 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase SearchHitFieldExtractor.extractTimeField(searchHit, "time")); - } - - public void testExtractTimeFieldGivenSingleValueInFields() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - Map fields = new HashMap<>(); - fields.put("time", new InternalSearchHitField("time", Arrays.asList(3L))); - searchHit.fields(fields); - - assertThat(SearchHitFieldExtractor.extractTimeField(searchHit, "time"), equalTo(3L)); - } - - public void testExtractTimeFieldGivenSingleValueInSource() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - searchHit.sourceRef(new BytesArray("{\"time\":1482418307000}")); - - assertThat(SearchHitFieldExtractor.extractTimeField(searchHit, "time"), equalTo(1482418307000L)); - } - - public void testExtractTimeFieldGivenArrayValue() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - Map fields = new HashMap<>(); - fields.put("time", new InternalSearchHitField("time", Arrays.asList(3L, 5L))); - searchHit.fields(fields); - - expectThrows(RuntimeException.class, () -> SearchHitFieldExtractor.extractTimeField(searchHit, "time")); - } - - public void testExtractTimeFieldGivenSingleNonLongValue() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - Map fields = new HashMap<>(); - fields.put("time", new InternalSearchHitField("time", Arrays.asList(3))); - searchHit.fields(fields); - - expectThrows(RuntimeException.class, () -> SearchHitFieldExtractor.extractTimeField(searchHit, "time")); - } -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitToJsonProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitToJsonProcessorTests.java deleted file mode 100644 index f9788ee9f8a..00000000000 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitToJsonProcessorTests.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.scheduler.extractor; - -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHitField; -import org.elasticsearch.search.internal.InternalSearchHit; -import org.elasticsearch.search.internal.InternalSearchHitField; -import org.elasticsearch.test.ESTestCase; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import static org.hamcrest.Matchers.equalTo; - -public class SearchHitToJsonProcessorTests extends ESTestCase { - - public void testProcessGivenHitContainsNothing() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - - String json = searchHitToString(new String[] {"field_1", "field_2"}, searchHit); - - assertThat(json, equalTo("{}")); - } - - public void testProcessGivenHitContainsEmptySource() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - searchHit.sourceRef(new BytesArray("{}")); - - String json = searchHitToString(new String[] {"field_1", "field_2"}, searchHit); - - assertThat(json, equalTo("{}")); - } - - public void testProcessGivenHitContainsSingleValueInFields() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - Map fields = new HashMap<>(); - fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(3))); - searchHit.fields(fields); - - String json = searchHitToString(new String[] {"field_1"}, searchHit); - - assertThat(json, equalTo("{\"field_1\":3}")); - } - - public void testProcessGivenHitContainsArrayValueInFields() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - Map fields = new HashMap<>(); - fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(3, 9))); - searchHit.fields(fields); - - String json = searchHitToString(new String[] {"field_1"}, searchHit); - - assertThat(json, equalTo("{\"field_1\":[3,9]}")); - } - - public void testProcessGivenHitContainsSingleValueInSource() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - String hitSource = "{\"field_1\":\"foo\"}"; - searchHit.sourceRef(new BytesArray(hitSource)); - - String json = searchHitToString(new String[] {"field_1"}, searchHit); - - assertThat(json, equalTo("{\"field_1\":\"foo\"}")); - } - - public void testProcessGivenHitContainsArrayValueInSource() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - String hitSource = "{\"field_1\":[\"foo\",\"bar\"]}"; - searchHit.sourceRef(new BytesArray(hitSource)); - - String json = searchHitToString(new String[] {"field_1"}, searchHit); - - assertThat(json, equalTo("{\"field_1\":[\"foo\",\"bar\"]}")); - } - - public void testProcessGivenHitContainsFieldsAndSource() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - String hitSource = "{\"field_1\":\"foo\"}"; - searchHit.sourceRef(new BytesArray(hitSource)); - Map fields = new HashMap<>(); - fields.put("field_2", new InternalSearchHitField("field_2", Arrays.asList("bar"))); - searchHit.fields(fields); - - String json = searchHitToString(new String[] {"field_1", "field_2"}, searchHit); - - assertThat(json, equalTo("{\"field_1\":\"foo\",\"field_2\":\"bar\"}")); - } - - public void testProcessGivenMultipleHits() throws IOException { - InternalSearchHit searchHit1 = new InternalSearchHit(42); - Map fields = new HashMap<>(); - fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(3))); - searchHit1.fields(fields); - InternalSearchHit searchHit2 = new InternalSearchHit(42); - fields = new HashMap<>(); - fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(5))); - searchHit2.fields(fields); - - String json = searchHitToString(new String[] {"field_1"}, searchHit1, searchHit2); - - assertThat(json, equalTo("{\"field_1\":3} {\"field_1\":5}")); - } - - private String searchHitToString(String[] fields, SearchHit... searchHits) throws IOException { - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(fields, outputStream)) { - for (int i = 0; i < searchHits.length; i++) { - hitProcessor.process(searchHits[i]); - } - } - return outputStream.toString(StandardCharsets.UTF_8.name()); - } -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFieldTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFieldTests.java new file mode 100644 index 00000000000..48e78c19422 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFieldTests.java @@ -0,0 +1,130 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.scheduler.extractor.scroll; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHitField; +import org.elasticsearch.search.internal.InternalSearchHit; +import org.elasticsearch.search.internal.InternalSearchHitField; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class ExtractedFieldTests extends ESTestCase { + + public void testValueGivenDocValue() { + SearchHit hit = new SearchHitBuilder(42).addField("single", "bar").addField("array", Arrays.asList("a", "b")).build(); + + ExtractedField single = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE); + assertThat(single.value(hit), equalTo(new String[] { "bar" })); + + ExtractedField array = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE); + assertThat(array.value(hit), equalTo(new String[] { "a", "b" })); + + ExtractedField missing = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE); + assertThat(missing.value(hit), equalTo(new Object[0])); + } + + public void testValueGivenScriptField() { + SearchHit hit = new SearchHitBuilder(42).addField("single", "bar").addField("array", Arrays.asList("a", "b")).build(); + + ExtractedField single = ExtractedField.newField("single", ExtractedField.ExtractionMethod.SCRIPT_FIELD); + assertThat(single.value(hit), equalTo(new String[] { "bar" })); + + ExtractedField array = ExtractedField.newField("array", ExtractedField.ExtractionMethod.SCRIPT_FIELD); + assertThat(array.value(hit), equalTo(new String[] { "a", "b" })); + + ExtractedField missing = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.SCRIPT_FIELD); + assertThat(missing.value(hit), equalTo(new Object[0])); + } + + public void testValueGivenSource() { + SearchHit hit = new SearchHitBuilder(42).setSource("{\"single\":\"bar\",\"array\":[\"a\",\"b\"]}").build(); + + ExtractedField single = ExtractedField.newField("single", ExtractedField.ExtractionMethod.SOURCE); + assertThat(single.value(hit), equalTo(new String[] { "bar" })); + + ExtractedField array = ExtractedField.newField("array", ExtractedField.ExtractionMethod.SOURCE); + assertThat(array.value(hit), equalTo(new String[] { "a", "b" })); + + ExtractedField missing = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.SOURCE); + assertThat(missing.value(hit), equalTo(new Object[0])); + } + + public void testValueGivenNestedSource() { + SearchHit hit = new SearchHitBuilder(42).setSource("{\"level_1\":{\"level_2\":{\"foo\":\"bar\"}}}").build(); + + ExtractedField nested = ExtractedField.newField("level_1.level_2.foo", ExtractedField.ExtractionMethod.SOURCE); + assertThat(nested.value(hit), equalTo(new String[] { "bar" })); + } + + public void testValueGivenSourceAndHitWithNoSource() { + ExtractedField missing = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.SOURCE); + assertThat(missing.value(new SearchHitBuilder(3).build()), equalTo(new Object[0])); + } + + public void testValueGivenMismatchingMethod() { + SearchHit hit = new SearchHitBuilder(42).addField("a", 1).setSource("{\"b\":2}").build(); + + ExtractedField invalidA = ExtractedField.newField("a", ExtractedField.ExtractionMethod.SOURCE); + assertThat(invalidA.value(hit), equalTo(new Object[0])); + ExtractedField validA = ExtractedField.newField("a", ExtractedField.ExtractionMethod.DOC_VALUE); + assertThat(validA.value(hit), equalTo(new Integer[] { 1 })); + + ExtractedField invalidB = ExtractedField.newField("b", ExtractedField.ExtractionMethod.DOC_VALUE); + assertThat(invalidB.value(hit), equalTo(new Object[0])); + ExtractedField validB = ExtractedField.newField("b", ExtractedField.ExtractionMethod.SOURCE); + assertThat(validB.value(hit), equalTo(new Integer[] { 2 })); + } + + public void testValueGivenEmptyHit() { + SearchHit hit = new SearchHitBuilder(42).build(); + + ExtractedField docValue = ExtractedField.newField("a", ExtractedField.ExtractionMethod.SOURCE); + assertThat(docValue.value(hit), equalTo(new Object[0])); + + ExtractedField sourceField = ExtractedField.newField("b", ExtractedField.ExtractionMethod.DOC_VALUE); + assertThat(sourceField.value(hit), equalTo(new Object[0])); + } + + static class SearchHitBuilder { + + private final InternalSearchHit hit; + private final Map fields; + + SearchHitBuilder(int docId) { + hit = new InternalSearchHit(docId); + fields = new HashMap<>(); + } + + SearchHitBuilder addField(String name, Object value) { + return addField(name, Arrays.asList(value)); + } + + SearchHitBuilder addField(String name, List values) { + fields.put(name, new InternalSearchHitField(name, values)); + return this; + } + + SearchHitBuilder setSource(String sourceJson) { + hit.sourceRef(new BytesArray(sourceJson)); + return this; + } + + SearchHit build() { + if (!fields.isEmpty()) { + hit.fields(fields); + } + return hit; + } + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFieldsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFieldsTests.java new file mode 100644 index 00000000000..98861c4b3e1 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFieldsTests.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.scheduler.extractor.scroll; + +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; + +public class ExtractedFieldsTests extends ESTestCase { + + private ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE); + + public void testInvalidConstruction() { + expectThrows(IllegalArgumentException.class, () -> new ExtractedFields(timeField, Collections.emptyList())); + } + + public void testTimeFieldOnly() { + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); + + assertThat(extractedFields.getAllFields(), equalTo(Arrays.asList(timeField))); + assertThat(extractedFields.timeField(), equalTo("time")); + assertThat(extractedFields.getDocValueFields(), equalTo(new String[] { timeField.getName() })); + assertThat(extractedFields.getSourceFields().length, equalTo(0)); + } + + public void testAllTypesOfFields() { + ExtractedField docValue1 = ExtractedField.newField("doc1", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField docValue2 = ExtractedField.newField("doc2", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField scriptField1 = ExtractedField.newField("scripted1", ExtractedField.ExtractionMethod.SCRIPT_FIELD); + ExtractedField scriptField2 = ExtractedField.newField("scripted2", ExtractedField.ExtractionMethod.SCRIPT_FIELD); + ExtractedField sourceField1 = ExtractedField.newField("src1", ExtractedField.ExtractionMethod.SOURCE); + ExtractedField sourceField2 = ExtractedField.newField("src2", ExtractedField.ExtractionMethod.SOURCE); + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField, + docValue1, docValue2, scriptField1, scriptField2, sourceField1, sourceField2)); + + assertThat(extractedFields.getAllFields().size(), equalTo(7)); + assertThat(extractedFields.timeField(), equalTo("time")); + assertThat(extractedFields.getDocValueFields(), equalTo(new String[] {"time", "doc1", "doc2"})); + assertThat(extractedFields.getSourceFields(), equalTo(new String[] {"src1", "src2"})); + } + + public void testTimeFieldValue() { + SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(1).addField("time", 1000L).build(); + + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); + + assertThat(extractedFields.timeFieldValue(hit), equalTo(1000L)); + } + + public void testTimeFieldValueGivenEmptyArray() { + SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(1).addField("time", Collections.emptyList()).build(); + + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); + + expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit)); + } + + public void testTimeFieldValueGivenValueHasTwoElements() { + SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(1).addField("time", Arrays.asList(1L, 2L)).build(); + + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); + + expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit)); + } + + public void testTimeFieldValueGivenValueIsString() { + SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(1).addField("time", "a string").build(); + + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); + + expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit)); + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorTests.java index 4bfb2e4824c..f8451c8bb76 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHitField; @@ -48,12 +47,10 @@ public class ScrollDataExtractorTests extends ESTestCase { private List capturedContinueScrollIds; private List capturedClearScrollIds; private String jobId; - private List jobFields; - private String timeField; + private ExtractedFields extractedFields; private List types; private List indexes; private QueryBuilder query; - private AggregatorFactories.Builder aggregations; private List scriptFields; private int scrollSize; @@ -93,13 +90,13 @@ public class ScrollDataExtractorTests extends ESTestCase { capturedSearchRequests = new ArrayList<>(); capturedContinueScrollIds = new ArrayList<>(); capturedClearScrollIds = new ArrayList<>(); - timeField = "time"; jobId = "test-job"; - jobFields = Arrays.asList(timeField, "field_1"); + ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE); + extractedFields = new ExtractedFields(timeField, + Arrays.asList(timeField, ExtractedField.newField("field_1", ExtractedField.ExtractionMethod.DOC_VALUE))); indexes = Arrays.asList("index-1", "index-2"); types = Arrays.asList("type-1", "type-2"); query = QueryBuilders.matchAllQuery(); - aggregations = null; scriptFields = Collections.emptyList(); scrollSize = 1000; } @@ -253,8 +250,7 @@ public class ScrollDataExtractorTests extends ESTestCase { } private ScrollDataExtractorContext createContext(long start, long end) { - return new ScrollDataExtractorContext(jobId, jobFields, timeField, indexes, types, query, aggregations, scriptFields, scrollSize, - start, end); + return new ScrollDataExtractorContext(jobId, extractedFields, indexes, types, query, scriptFields, scrollSize, start, end); } private SearchResponse createEmptySearchResponse() { @@ -270,7 +266,7 @@ public class ScrollDataExtractorTests extends ESTestCase { for (int i = 0; i < timestamps.size(); i++) { InternalSearchHit hit = new InternalSearchHit(randomInt()); Map fields = new HashMap<>(); - fields.put(timeField, new InternalSearchHitField("time", Arrays.asList(timestamps.get(i)))); + fields.put(extractedFields.timeField(), new InternalSearchHitField("time", Arrays.asList(timestamps.get(i)))); fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(field1Values.get(i)))); fields.put("field_2", new InternalSearchHitField("field_2", Arrays.asList(field2Values.get(i)))); hit.fields(fields); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/SearchHitToJsonProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/SearchHitToJsonProcessorTests.java new file mode 100644 index 00000000000..d805129df32 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/SearchHitToJsonProcessorTests.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.scheduler.extractor.scroll; + +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.test.ESTestCase; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.hamcrest.Matchers.equalTo; + +public class SearchHitToJsonProcessorTests extends ESTestCase { + + public void testProcessGivenSingleHit() throws IOException { + ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField missingField = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField singleField = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField arrayField = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField, missingField, singleField, arrayField)); + + SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(8) + .addField("time", 1000L) + .addField("single", "a") + .addField("array", Arrays.asList("b", "c")) + .build(); + + String json = searchHitToString(extractedFields, hit); + + assertThat(json, equalTo("{\"time\":1000,\"single\":\"a\",\"array\":[\"b\",\"c\"]}")); + } + + public void testProcessGivenMultipleHits() throws IOException { + ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField missingField = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField singleField = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField arrayField = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField, missingField, singleField, arrayField)); + + SearchHit hit1 = new ExtractedFieldTests.SearchHitBuilder(8) + .addField("time", 1000L) + .addField("single", "a1") + .addField("array", Arrays.asList("b1", "c1")) + .build(); + + SearchHit hit2 = new ExtractedFieldTests.SearchHitBuilder(8) + .addField("time", 2000L) + .addField("single", "a2") + .addField("array", Arrays.asList("b2", "c2")) + .build(); + + String json = searchHitToString(extractedFields, hit1, hit2); + + assertThat(json, equalTo("{\"time\":1000,\"single\":\"a1\",\"array\":[\"b1\",\"c1\"]} " + + "{\"time\":2000,\"single\":\"a2\",\"array\":[\"b2\",\"c2\"]}")); + } + + private String searchHitToString(ExtractedFields fields, SearchHit... searchHits) throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(fields, outputStream)) { + for (int i = 0; i < searchHits.length; i++) { + hitProcessor.process(searchHits[i]); + } + } + return outputStream.toString(StandardCharsets.UTF_8.name()); + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransformTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransformTests.java index 2176d51470c..0678a5ff2d4 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransformTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransformTests.java @@ -5,19 +5,16 @@ */ package org.elasticsearch.xpack.ml.transforms.date; -import static org.elasticsearch.xpack.ml.transforms.TransformTestUtils.createIndexArray; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.transforms.Transform.TransformIndex; +import org.elasticsearch.xpack.ml.transforms.TransformException; -import static org.mockito.Mockito.mock; - -import java.time.ZoneOffset; import java.util.Collections; import java.util.List; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.test.ESTestCase; - -import org.elasticsearch.xpack.ml.transforms.Transform.TransformIndex; -import org.elasticsearch.xpack.ml.transforms.TransformException; +import static org.elasticsearch.xpack.ml.transforms.TransformTestUtils.createIndexArray; +import static org.mockito.Mockito.mock; public class DateFormatTransformTests extends ESTestCase { @@ -26,7 +23,7 @@ public class DateFormatTransformTests extends ESTestCase { List writeIndexes = createIndexArray(new TransformIndex(2, 0)); DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss.SSSXXX", - ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class)); + readIndexes, writeIndexes, mock(Logger.class)); String[] input = {"2014-01-01 13:42:56.500Z"}; String[] scratch = {}; @@ -42,8 +39,7 @@ public class DateFormatTransformTests extends ESTestCase { public void testTransform_GivenInvalidFormat() throws TransformException { IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, - () -> new DateFormatTransform("yyyy-MM HH:mm:ss", ZoneOffset.systemDefault(), - Collections.emptyList(), Collections.emptyList(), mock(Logger.class))); + () -> new DateFormatTransform("yyyy-MM HH:mm:ss", Collections.emptyList(), Collections.emptyList(), mock(Logger.class))); assertEquals("Timestamp cannot be derived from pattern: yyyy-MM HH:mm:ss", e.getMessage()); @@ -54,8 +50,7 @@ public class DateFormatTransformTests extends ESTestCase { List readIndexes = createIndexArray(new TransformIndex(0, 0)); List writeIndexes = createIndexArray(new TransformIndex(2, 0)); - DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss", - ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class)); + DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss", readIndexes, writeIndexes, mock(Logger.class)); String[] input = {"invalid"}; String[] scratch = {}; @@ -73,8 +68,7 @@ public class DateFormatTransformTests extends ESTestCase { List readIndexes = createIndexArray(new TransformIndex(1, 0)); List writeIndexes = createIndexArray(new TransformIndex(2, 0)); - DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss", - ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class)); + DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss", readIndexes, writeIndexes, mock(Logger.class)); String[] input = {}; String[] scratch = {null}; @@ -90,15 +84,14 @@ public class DateFormatTransformTests extends ESTestCase { List writeIndexes = createIndexArray(new TransformIndex(2, 0)); ESTestCase.expectThrows(IllegalArgumentException.class, - () -> new DateFormatTransform("e", ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class))); + () -> new DateFormatTransform("e", readIndexes, writeIndexes, mock(Logger.class))); } public void testTransform_FromScratchArea() throws TransformException { List readIndexes = createIndexArray(new TransformIndex(1, 0)); List writeIndexes = createIndexArray(new TransformIndex(2, 0)); - DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ssXXX", - ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class)); + DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ssXXX", readIndexes, writeIndexes, mock(Logger.class)); String[] input = {}; String[] scratch = {"2014-01-01 00:00:00Z"}; @@ -116,7 +109,7 @@ public class DateFormatTransformTests extends ESTestCase { List writeIndexes = createIndexArray(new TransformIndex(2, 0)); DateFormatTransform transformer = new DateFormatTransform("'['yyyy-MM-dd HH:mm:ssX']'", - ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class)); + readIndexes, writeIndexes, mock(Logger.class)); String[] input = {"[2014-06-23 00:00:00Z]"}; String[] scratch = {}; diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers.yaml index 40fd17db5ba..c9d58a2d5c8 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers.yaml @@ -10,7 +10,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"ELASTICSEARCH", + "format":"JSON", "time_field":"time", "time_format":"epoch" } @@ -27,9 +27,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"ELASTICSEARCH", - "time_field":"time", - "time_format":"epoch" + "time_field":"time" } } diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers_stats.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers_stats.yaml index 87755dcc048..224605d526e 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers_stats.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers_stats.yaml @@ -10,7 +10,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"ELASTICSEARCH", + "format":"JSON", "time_field":"time", "time_format":"epoch" } @@ -27,9 +27,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"ELASTICSEARCH", - "time_field":"time", - "time_format":"epoch" + "time_field":"time" } } diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml index 6fd7b973575..e0475d53ab9 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml @@ -195,7 +195,7 @@ "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format":"ELASTICSEARCH", + "format":"JSON", "time_field":"time", "time_format":"yyyy-MM-dd HH:mm:ssX" } diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml index 508764211bc..06b0a617149 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml @@ -33,7 +33,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format" : "ELASTICSEARCH", + "format" : "JSON", "time_field":"time", "time_format":"yyyy-MM-dd'T'HH:mm:ssX" } diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/schedulers_crud.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/schedulers_crud.yaml index 40f1fd0c1c3..3ab3d518364 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/schedulers_crud.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/schedulers_crud.yaml @@ -10,7 +10,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"ELASTICSEARCH", + "format":"JSON", "time_field":"time", "time_format":"epoch" } @@ -26,9 +26,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"ELASTICSEARCH", - "time_field":"time", - "time_format":"epoch" + "time_field":"time" } } diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/start_stop_scheduler.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/start_stop_scheduler.yaml index 51dc41aa2e5..5aa57256d18 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/start_stop_scheduler.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/start_stop_scheduler.yaml @@ -21,7 +21,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format":"ELASTICSEARCH", + "format":"JSON", "time_field":"time", "time_format":"epoch" }