diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java index e72f05ca233..57e18ca485f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.job.DataCounts; +import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.manager.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -39,6 +40,7 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; import java.util.Objects; +import java.util.Optional; public class PostDataAction extends Action { @@ -148,6 +150,7 @@ public class PostDataAction extends Action listener) { PostDataTask postDataTask = (PostDataTask) task; TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build(); - DataLoadParams params = new DataLoadParams(timeRange, request.isIgnoreDowntime()); + DataLoadParams params = new DataLoadParams(timeRange, request.isIgnoreDowntime(), + Optional.ofNullable(request.getDataDescription())); threadPool.executor(MlPlugin.THREAD_POOL_NAME).execute(() -> { try { DataCounts dataCounts = processManager.processData(request.getJobId(), request.content.streamInput(), params, diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/AnalysisConfig.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/AnalysisConfig.java index 76eac37913f..025b6351e99 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/AnalysisConfig.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/AnalysisConfig.java @@ -459,7 +459,7 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable { this.detectors = detectors; } - Builder(AnalysisConfig analysisConfig) { + public Builder(AnalysisConfig analysisConfig) { this.detectors = analysisConfig.detectors; this.bucketSpan = analysisConfig.bucketSpan; this.batchSpan = analysisConfig.batchSpan; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/DataDescription.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/DataDescription.java index 72af1eadf9d..323c0274a14 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/DataDescription.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/DataDescription.java @@ -41,9 +41,7 @@ public class DataDescription extends ToXContentToBytes implements Writeable { public enum DataFormat implements Writeable { JSON("json"), DELIMITED("delimited"), - SINGLE_LINE("single_line"), - // TODO norelease, this can now be removed - ELASTICSEARCH("elasticsearch"); + SINGLE_LINE("single_line"); /** * Delimited used to be called delineated. We keep supporting that for backwards diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java index 0f1ce3d639d..a86f1ba364a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java @@ -196,8 +196,6 @@ public final class Messages { public static final String SCHEDULER_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "scheduler.does.not.support.job.with.latency"; public static final String SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD = "scheduler.aggregations.requires.job.with.summary.count.field"; - public static final String SCHEDULER_REQUIRES_JOB_WITH_DATAFORMAT_ELASTICSEARCH = - "scheduler.requires.job.with.dataformat.elasticsearch"; public static final String SCHEDULER_CANNOT_START = "scheduler.cannot.start"; public static final String SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE = "scheduler.cannot.stop.in.current.state"; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index c53058b220b..109485d3499 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.ml.job.AnalysisConfig; import org.elasticsearch.xpack.ml.job.DataCounts; +import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.ModelSizeStats; import org.elasticsearch.xpack.ml.job.messages.Messages; @@ -42,10 +43,9 @@ public class AutodetectCommunicator implements Closeable { private static final Logger LOGGER = Loggers.getLogger(AutodetectCommunicator.class); private static final int DEFAULT_TRY_TIMEOUT_SECS = 30; - private final String jobId; + private final Job job; private final StatusReporter statusReporter; private final AutodetectProcess autodetectProcess; - private final DataToProcessWriter autoDetectWriter; private final AutoDetectResultProcessor autoDetectResultProcessor; final AtomicReference inUse = new AtomicReference<>(); @@ -53,7 +53,7 @@ public class AutodetectCommunicator implements Closeable { public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process, StatusReporter statusReporter, AutoDetectResultProcessor autoDetectResultProcessor, StateProcessor stateProcessor) { - this.jobId = job.getId(); + this.job = job; this.autodetectProcess = process; this.statusReporter = statusReporter; this.autoDetectResultProcessor = autoDetectResultProcessor; @@ -61,29 +61,30 @@ public class AutodetectCommunicator implements Closeable { AnalysisConfig analysisConfig = job.getAnalysisConfig(); boolean usePerPartitionNormalization = analysisConfig.getUsePerPartitionNormalization(); autoDetectExecutor.execute(() -> - autoDetectResultProcessor.process(jobId, process.getProcessOutStream(), usePerPartitionNormalization) + autoDetectResultProcessor.process(job.getId(), process.getProcessOutStream(), usePerPartitionNormalization) ); autoDetectExecutor.execute(() -> - stateProcessor.process(jobId, process.getPersistStream()) + stateProcessor.process(job.getId(), process.getPersistStream()) ); - this.autoDetectWriter = createProcessWriter(job, process, statusReporter); - } - - private DataToProcessWriter createProcessWriter(Job job, AutodetectProcess process, StatusReporter statusReporter) { - return DataToProcessWriterFactory.create(true, process, job.getDataDescription(), job.getAnalysisConfig(), - new TransformConfigs(job.getTransforms()) , statusReporter, LOGGER); } public void writeJobInputHeader() throws IOException { - autoDetectWriter.writeHeader(); + createProcessWriter(Optional.empty()).writeHeader(); + } + + private DataToProcessWriter createProcessWriter(Optional dataDescription) { + return DataToProcessWriterFactory.create(true, autodetectProcess, dataDescription.orElse(job.getDataDescription()), + job.getAnalysisConfig(), new TransformConfigs(job.getTransforms()) , statusReporter, LOGGER); } public DataCounts writeToJob(InputStream inputStream, DataLoadParams params, Supplier cancelled) throws IOException { - return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, jobId), () -> { + return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, job.getId()), () -> { if (params.isResettingBuckets()) { autodetectProcess.writeResetBucketsControlMessage(params); } CountingInputStream countingStream = new CountingInputStream(inputStream, statusReporter); + + DataToProcessWriter autoDetectWriter = createProcessWriter(params.getDataDescription()); DataCounts results = autoDetectWriter.write(countingStream, cancelled); autoDetectWriter.flush(); return results; @@ -92,7 +93,7 @@ public class AutodetectCommunicator implements Closeable { @Override public void close() throws IOException { - checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_CLOSE, jobId), () -> { + checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_CLOSE, job.getId()), () -> { statusReporter.close(); autodetectProcess.close(); autoDetectResultProcessor.awaitCompletion(); @@ -101,7 +102,7 @@ public class AutodetectCommunicator implements Closeable { } public void writeUpdateConfigMessage(String config) throws IOException { - checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, jobId), () -> { + checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> { autodetectProcess.writeUpdateConfigMessage(config); return null; }, false); @@ -112,15 +113,15 @@ public class AutodetectCommunicator implements Closeable { } void flushJob(InterimResultsParams params, int tryTimeoutSecs) throws IOException { - checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_FLUSH, jobId), () -> { + checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_FLUSH, job.getId()), () -> { String flushId = autodetectProcess.flushJob(params); Duration timeout = Duration.ofSeconds(tryTimeoutSecs); - LOGGER.info("[{}] waiting for flush", jobId); + LOGGER.info("[{}] waiting for flush", job.getId()); boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, timeout); - LOGGER.info("[{}] isFlushComplete={}", jobId, isFlushComplete); + LOGGER.info("[{}] isFlushComplete={}", job.getId(), isFlushComplete); if (!isFlushComplete) { - String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_TIMEOUT, jobId) + " " + autodetectProcess.readError(); + String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_TIMEOUT, job.getId()) + " " + autodetectProcess.readError(); LOGGER.error(msg); throw ExceptionsHelper.serverError(msg); } @@ -138,7 +139,7 @@ public class AutodetectCommunicator implements Closeable { private void checkProcessIsAlive() { if (!autodetectProcess.isProcessAlive()) { ParameterizedMessage message = - new ParameterizedMessage("[{}] Unexpected death of autodetect: {}", jobId, autodetectProcess.readError()); + new ParameterizedMessage("[{}] Unexpected death of autodetect: {}", job.getId(), autodetectProcess.readError()); LOGGER.error(message); throw ExceptionsHelper.serverError(message.getFormattedMessage()); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParams.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParams.java index f64ca03e6ef..e2fc9f07f2f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParams.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParams.java @@ -5,19 +5,20 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.params; +import org.elasticsearch.xpack.ml.job.DataDescription; + import java.util.Objects; +import java.util.Optional; public class DataLoadParams { private final TimeRange resetTimeRange; private final boolean ignoreDowntime; + private final Optional dataDescription; - public DataLoadParams(TimeRange resetTimeRange) { - this(resetTimeRange, false); - } - - public DataLoadParams(TimeRange resetTimeRange, boolean ignoreDowntime) { + public DataLoadParams(TimeRange resetTimeRange, boolean ignoreDowntime, Optional dataDescription) { this.resetTimeRange = Objects.requireNonNull(resetTimeRange); this.ignoreDowntime = ignoreDowntime; + this.dataDescription = Objects.requireNonNull(dataDescription); } public boolean isResettingBuckets() { @@ -35,5 +36,9 @@ public class DataLoadParams { public boolean isIgnoreDowntime() { return ignoreDowntime; } + + public Optional getDataDescription() { + return dataDescription; + } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java index c30a51c2d39..ecb8875c798 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java @@ -5,9 +5,25 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.xpack.ml.job.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.DataDescription; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; +import org.elasticsearch.xpack.ml.job.status.StatusReporter; +import org.elasticsearch.xpack.ml.job.transform.TransformConfig; +import org.elasticsearch.xpack.ml.job.transform.TransformConfigs; +import org.elasticsearch.xpack.ml.transforms.DependencySorter; +import org.elasticsearch.xpack.ml.transforms.Transform; +import org.elasticsearch.xpack.ml.transforms.Transform.TransformIndex; +import org.elasticsearch.xpack.ml.transforms.Transform.TransformResult; +import org.elasticsearch.xpack.ml.transforms.TransformException; +import org.elasticsearch.xpack.ml.transforms.TransformFactory; +import org.elasticsearch.xpack.ml.transforms.date.DateFormatTransform; +import org.elasticsearch.xpack.ml.transforms.date.DateTransform; +import org.elasticsearch.xpack.ml.transforms.date.DoubleDateTransform; + import java.io.IOException; -import java.time.ZoneId; -import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -23,26 +39,6 @@ import java.util.Objects; import java.util.Set; import java.util.function.Supplier; -import org.apache.logging.log4j.Logger; - -import org.elasticsearch.tasks.TaskCancelledException; -import org.elasticsearch.xpack.ml.job.AnalysisConfig; -import org.elasticsearch.xpack.ml.job.DataDescription; -import org.elasticsearch.xpack.ml.job.DataDescription.DataFormat; -import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; -import org.elasticsearch.xpack.ml.job.status.StatusReporter; -import org.elasticsearch.xpack.ml.job.transform.TransformConfig; -import org.elasticsearch.xpack.ml.job.transform.TransformConfigs; -import org.elasticsearch.xpack.ml.transforms.DependencySorter; -import org.elasticsearch.xpack.ml.transforms.Transform; -import org.elasticsearch.xpack.ml.transforms.Transform.TransformIndex; -import org.elasticsearch.xpack.ml.transforms.Transform.TransformResult; -import org.elasticsearch.xpack.ml.transforms.TransformException; -import org.elasticsearch.xpack.ml.transforms.TransformFactory; -import org.elasticsearch.xpack.ml.transforms.date.DateFormatTransform; -import org.elasticsearch.xpack.ml.transforms.date.DateTransform; -import org.elasticsearch.xpack.ml.transforms.date.DoubleDateTransform; - public abstract class AbstractDataToProcessWriter implements DataToProcessWriter { protected static final int TIME_FIELD_OUT_INDEX = 0; @@ -175,9 +171,6 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter } protected void buildDateTransform(Map scratchAreaIndexes, Map outFieldIndexes) { - boolean isDateFormatString = dataDescription.isTransformTime() - && !dataDescription.isEpochMs(); - List readIndexes = new ArrayList<>(); Integer index = inFieldIndexes.get(dataDescription.getTimeField()); @@ -202,15 +195,11 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter writeIndexes.add(new TransformIndex(TransformFactory.OUTPUT_ARRAY_INDEX, outFieldIndexes.get(dataDescription.getTimeField()))); + boolean isDateFormatString = dataDescription.isTransformTime() && !dataDescription.isEpochMs(); if (isDateFormatString) { - // Elasticsearch assumes UTC for dates without timezone information. - ZoneId defaultTimezone = dataDescription.getFormat() == DataFormat.ELASTICSEARCH - ? ZoneOffset.UTC : ZoneOffset.systemDefault(); - dateTransform = new DateFormatTransform(dataDescription.getTimeFormat(), - defaultTimezone, readIndexes, writeIndexes, logger); + dateTransform = new DateFormatTransform(dataDescription.getTimeFormat(), readIndexes, writeIndexes, logger); } else { - dateTransform = new DoubleDateTransform(dataDescription.isEpochMs(), - readIndexes, writeIndexes, logger); + dateTransform = new DoubleDateTransform(dataDescription.isEpochMs(), readIndexes, writeIndexes, logger); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactory.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactory.java index 1dd99f61672..808dd67277e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactory.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactory.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; import org.apache.logging.log4j.Logger; - import org.elasticsearch.xpack.ml.job.AnalysisConfig; import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; @@ -36,7 +35,6 @@ public final class DataToProcessWriterFactory { TransformConfigs transforms, StatusReporter statusReporter, Logger logger) { switch (dataDescription.getFormat()) { case JSON: - case ELASTICSEARCH: return new JsonDataToProcessWriter(includeControlField, autodetectProcess, dataDescription, analysisConfig, transforms, statusReporter, logger); case DELIMITED: diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java index 07bfcd61874..712b4eed2ba 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJob.java @@ -14,6 +14,7 @@ import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.xpack.ml.action.FlushJobAction; import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.job.DataCounts; +import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.audit.Auditor; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractor; @@ -22,6 +23,7 @@ import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractorFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,6 +36,7 @@ class ScheduledJob { private final Auditor auditor; private final String jobId; + private final DataDescription dataDescription; private final long frequencyMs; private final long queryDelayMs; private final Client client; @@ -44,10 +47,11 @@ class ScheduledJob { private volatile Long lastEndTimeMs; private AtomicBoolean running = new AtomicBoolean(true); - ScheduledJob(String jobId, long frequencyMs, long queryDelayMs, DataExtractorFactory dataExtractorFactory, - Client client, Auditor auditor, Supplier currentTimeSupplier, + ScheduledJob(String jobId, DataDescription dataDescription, long frequencyMs, long queryDelayMs, + DataExtractorFactory dataExtractorFactory, Client client, Auditor auditor, Supplier currentTimeSupplier, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { this.jobId = jobId; + this.dataDescription = Objects.requireNonNull(dataDescription); this.frequencyMs = frequencyMs; this.queryDelayMs = queryDelayMs; this.dataExtractorFactory = dataExtractorFactory; @@ -187,6 +191,7 @@ class ScheduledJob { private DataCounts postData(InputStream inputStream) throws IOException, ExecutionException, InterruptedException { PostDataAction.Request request = new PostDataAction.Request(jobId); + request.setDataDescription(dataDescription); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); Streams.copy(inputStream, outputStream); request.setContent(new BytesArray(outputStream.toByteArray())); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java index ef402b7abd7..bd43cf97700 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java @@ -20,6 +20,7 @@ import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.action.StartSchedulerAction; import org.elasticsearch.xpack.ml.action.UpdateSchedulerStatusAction; import org.elasticsearch.xpack.ml.job.DataCounts; +import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.JobStatus; import org.elasticsearch.xpack.ml.job.audit.Auditor; @@ -189,7 +190,7 @@ public class ScheduledJobRunner extends AbstractComponent { Duration frequency = getFrequencyOrDefault(scheduler, job); Duration queryDelay = Duration.ofSeconds(scheduler.getConfig().getQueryDelay()); DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(scheduler.getConfig(), job); - ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(), + ScheduledJob scheduledJob = new ScheduledJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(), dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs); Holder holder = new Holder(scheduler, scheduledJob, new ProblemTracker(() -> auditor), handler); task.setHolder(holder); @@ -200,6 +201,16 @@ public class ScheduledJobRunner extends AbstractComponent { return new ScrollDataExtractorFactory(client, schedulerConfig, job); } + private static DataDescription buildDataDescription(Job job) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.JSON); + if (job.getDataDescription() != null) { + dataDescription.setTimeField(job.getDataDescription().getTimeField()); + } + dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + return dataDescription.build(); + } + private void gatherInformation(String jobId, BiConsumer, DataCounts> handler, Consumer errorHandler) { BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder() .sortField(Bucket.TIMESTAMP.getPreferredName()) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidator.java index 66147883ddb..8e6d919c192 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidator.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.scheduler; import org.elasticsearch.xpack.ml.job.AnalysisConfig; -import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.messages.Messages; @@ -28,9 +27,5 @@ public final class ScheduledJobValidator { throw new IllegalArgumentException( Messages.getMessage(Messages.SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, SchedulerConfig.DOC_COUNT)); } - DataDescription dataDescription = job.getDataDescription(); - if (dataDescription == null || dataDescription.getFormat() != DataDescription.DataFormat.ELASTICSEARCH) { - throw new IllegalArgumentException(Messages.getMessage(Messages.SCHEDULER_REQUIRES_JOB_WITH_DATAFORMAT_ELASTICSEARCH)); - } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfig.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfig.java index f56e518bfe2..91f1a6a4bb3 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfig.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfig.java @@ -62,6 +62,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { public static final ParseField AGGREGATIONS = new ParseField("aggregations"); public static final ParseField AGGS = new ParseField("aggs"); public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields"); + public static final ParseField SOURCE = new ParseField("_source"); public static final ObjectParser PARSER = new ObjectParser<>("scheduler_config", Builder::new); @@ -86,6 +87,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { return parsedScriptFields; }, SCRIPT_FIELDS); PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE); + PARSER.declareBoolean(Builder::setSource, SOURCE); } private final String id; @@ -107,10 +109,11 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { private final AggregatorFactories.Builder aggregations; private final List scriptFields; private final Integer scrollSize; + private final boolean source; private SchedulerConfig(String id, String jobId, Long queryDelay, Long frequency, List indexes, List types, QueryBuilder query, AggregatorFactories.Builder aggregations, - List scriptFields, Integer scrollSize) { + List scriptFields, Integer scrollSize, boolean source) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -121,6 +124,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { this.aggregations = aggregations; this.scriptFields = scriptFields; this.scrollSize = scrollSize; + this.source = source; } public SchedulerConfig(StreamInput in) throws IOException { @@ -146,6 +150,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { this.scriptFields = null; } this.scrollSize = in.readOptionalVInt(); + this.source = in.readBoolean(); } public String getId() { @@ -171,7 +176,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { * @return The indexes to search, or null if not set. */ public List getIndexes() { - return this.indexes; + return indexes; } /** @@ -181,11 +186,15 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { * @return The types to search, or null if not set. */ public List getTypes() { - return this.types; + return types; } public Integer getScrollSize() { - return this.scrollSize; + return scrollSize; + } + + public boolean isSource() { + return source; } public QueryBuilder getQuery() { @@ -227,6 +236,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { out.writeBoolean(false); } out.writeOptionalVInt(scrollSize); + out.writeBoolean(source); } @Override @@ -258,6 +268,9 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { builder.endObject(); } builder.field(SCROLL_SIZE.getPreferredName(), scrollSize); + if (source) { + builder.field(SOURCE.getPreferredName(), source); + } return builder; } @@ -287,12 +300,13 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { && Objects.equals(this.query, that.query) && Objects.equals(this.scrollSize, that.scrollSize) && Objects.equals(this.aggregations, that.aggregations) - && Objects.equals(this.scriptFields, that.scriptFields); + && Objects.equals(this.scriptFields, that.scriptFields) + && Objects.equals(this.source, that.source); } @Override public int hashCode() { - return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, aggregations, scriptFields); + return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, aggregations, scriptFields, source); } public static class Builder { @@ -310,6 +324,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { private AggregatorFactories.Builder aggregations; private List scriptFields; private Integer scrollSize = DEFAULT_SCROLL_SIZE; + private boolean source = false; public Builder() { } @@ -331,6 +346,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { this.aggregations = config.aggregations; this.scriptFields = config.scriptFields; this.scrollSize = config.scrollSize; + this.source = config.source; } public void setId(String schedulerId) { @@ -390,6 +406,10 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { this.scrollSize = scrollSize; } + public void setSource(boolean enabled) { + this.source = enabled; + } + public SchedulerConfig build() { ExceptionsHelper.requireNonNull(id, ID.getPreferredName()); ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); @@ -402,7 +422,8 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { if (types == null || types.isEmpty() || types.contains(null) || types.contains("")) { throw invalidOptionValue(TYPES.getPreferredName(), types); } - return new SchedulerConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize); + return new SchedulerConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize, + source); } private static ElasticsearchException invalidOptionValue(String fieldName, Object value) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitFieldExtractor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitFieldExtractor.java deleted file mode 100644 index 045571cb8cc..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitFieldExtractor.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.scheduler.extractor; - -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHitField; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -public final class SearchHitFieldExtractor { - - private SearchHitFieldExtractor() {} - - public static Object[] extractField(SearchHit hit, String field) { - SearchHitField keyValue = hit.field(field); - if (keyValue != null) { - List values = keyValue.values(); - return values.toArray(new Object[values.size()]); - } else { - return extractFieldFromSource(hit.getSource(), field); - } - } - - private static Object[] extractFieldFromSource(Map source, String field) { - if (source != null) { - Object values = source.get(field); - if (values != null) { - if (values instanceof Object[]) { - return (Object[]) values; - } else { - return new Object[]{values}; - } - } - } - return new Object[0]; - } - - public static Long extractTimeField(SearchHit hit, String timeField) { - Object[] fields = extractField(hit, timeField); - if (fields.length != 1) { - throw new RuntimeException("Time field [" + timeField + "] expected a single value; actual was: " + Arrays.toString(fields)); - } - if (fields[0] instanceof Long) { - return (Long) fields[0]; - } - throw new RuntimeException("Time field [" + timeField + "] expected a long value; actual was: " + fields[0]); - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedField.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedField.java new file mode 100644 index 00000000000..781e2d1aa08 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedField.java @@ -0,0 +1,109 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.scheduler.extractor.scroll; + +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHitField; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +abstract class ExtractedField { + + public enum ExtractionMethod { + SOURCE, DOC_VALUE, SCRIPT_FIELD + } + + protected final String name; + private final ExtractionMethod extractionMethod; + + protected ExtractedField(String name, ExtractionMethod extractionMethod) { + this.name = Objects.requireNonNull(name); + this.extractionMethod = Objects.requireNonNull(extractionMethod); + } + + public String getName() { + return name; + } + + public ExtractionMethod getExtractionMethod() { + return extractionMethod; + } + + public abstract Object[] value(SearchHit hit); + + public static ExtractedField newField(String name, ExtractionMethod extractionMethod) { + switch (extractionMethod) { + case DOC_VALUE: + case SCRIPT_FIELD: + return new FromFields(name, extractionMethod); + case SOURCE: + return new FromSource(name, extractionMethod); + default: + throw new IllegalArgumentException("Invalid extraction method [" + extractionMethod + "]"); + } + } + + private static class FromFields extends ExtractedField { + + public FromFields(String name, ExtractionMethod extractionMethod) { + super(name, extractionMethod); + } + + @Override + public Object[] value(SearchHit hit) { + SearchHitField keyValue = hit.field(name); + if (keyValue != null) { + List values = keyValue.values(); + return values.toArray(new Object[values.size()]); + } + return new Object[0]; + } + } + + private static class FromSource extends ExtractedField { + + private String[] namePath; + + public FromSource(String name, ExtractionMethod extractionMethod) { + super(name, extractionMethod); + namePath = name.split("\\."); + } + + @Override + public Object[] value(SearchHit hit) { + Map source = hit.getSource(); + int level = 0; + while (source != null && level < namePath.length - 1) { + source = getNextLevel(source, namePath[level]); + level++; + } + if (source != null) { + Object values = source.get(namePath[level]); + if (values != null) { + if (values instanceof List) { + @SuppressWarnings("unchecked") + List asList = (List) values; + return asList.toArray(new Object[asList.size()]); + } else { + return new Object[]{values}; + } + } + } + return new Object[0]; + } + + @SuppressWarnings("unchecked") + private static Map getNextLevel(Map source, String key) { + Object nextLevel = source.get(key); + if (nextLevel instanceof Map) { + return (Map) source.get(key); + } + return null; + } + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFields.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFields.java new file mode 100644 index 00000000000..874be00c9cb --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFields.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.scheduler.extractor.scroll; + +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.ml.job.Job; +import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +class ExtractedFields { + + private final ExtractedField timeField; + private final List allFields; + + public ExtractedFields(ExtractedField timeField, List allFields) { + if (!allFields.contains(timeField)) { + throw new IllegalArgumentException("timeField should also be contained in allFields"); + } + this.timeField = Objects.requireNonNull(timeField); + this.allFields = Collections.unmodifiableList(allFields); + } + + public List getAllFields() { + return allFields; + } + + public String[] getSourceFields() { + return filterFields(ExtractedField.ExtractionMethod.SOURCE); + } + + public String[] getDocValueFields() { + return filterFields(ExtractedField.ExtractionMethod.DOC_VALUE); + } + + private String[] filterFields(ExtractedField.ExtractionMethod method) { + List result = new ArrayList<>(); + for (ExtractedField field : allFields) { + if (field.getExtractionMethod() == method) { + result.add(field.getName()); + } + } + return result.toArray(new String[result.size()]); + } + + public String timeField() { + return timeField.getName(); + } + + public Long timeFieldValue(SearchHit hit) { + Object[] value = timeField.value(hit); + if (value.length != 1) { + throw new RuntimeException("Time field [" + timeField.getName() + "] expected a single value; actual was: " + + Arrays.toString(value)); + } + if (value[0] instanceof Long) { + return (Long) value[0]; + } + throw new RuntimeException("Time field [" + timeField.getName() + "] expected a long value; actual was: " + value[0]); + } + + public static ExtractedFields build(Job job, SchedulerConfig schedulerConfig) { + Set scriptFields = schedulerConfig.getScriptFields().stream().map(sf -> sf.fieldName()).collect(Collectors.toSet()); + String timeField = job.getDataDescription().getTimeField(); + ExtractedField timeExtractedField = ExtractedField.newField(timeField, scriptFields.contains(timeField) ? + ExtractedField.ExtractionMethod.SCRIPT_FIELD : ExtractedField.ExtractionMethod.DOC_VALUE); + List remainingFields = job.allFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList()); + List allExtractedFields = new ArrayList<>(remainingFields.size()); + allExtractedFields.add(timeExtractedField); + for (String field : remainingFields) { + ExtractedField.ExtractionMethod method = scriptFields.contains(field) ? ExtractedField.ExtractionMethod.SCRIPT_FIELD : + schedulerConfig.isSource() ? ExtractedField.ExtractionMethod.SOURCE : ExtractedField.ExtractionMethod.DOC_VALUE; + allExtractedFields.add(ExtractedField.newField(field, method)); + } + return new ExtractedFields(timeExtractedField, allExtractedFields); + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java index d6af67da45b..736c3e06f4e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java @@ -19,13 +19,9 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.aggregations.AggregationBuilder; -import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractor; -import org.elasticsearch.xpack.ml.scheduler.extractor.SearchHitFieldExtractor; -import org.elasticsearch.xpack.ml.scheduler.extractor.SearchHitToJsonProcessor; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -104,20 +100,22 @@ class ScrollDataExtractor implements DataExtractor { private SearchRequestBuilder buildSearchRequest() { SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client) .setScroll(SCROLL_TIMEOUT) - .addSort(context.timeField, SortOrder.ASC) + .addSort(context.extractedFields.timeField(), SortOrder.ASC) .setIndices(context.indexes) .setTypes(context.types) .setSize(context.scrollSize) .setQuery(createQuery()); - if (context.aggregations != null) { - searchRequestBuilder.setSize(0); - for (AggregationBuilder aggregationBuilder : context.aggregations.getAggregatorFactories()) { - searchRequestBuilder.addAggregation(aggregationBuilder); - } - for (PipelineAggregationBuilder pipelineAggregationBuilder : context.aggregations.getPipelineAggregatorFactories()) { - searchRequestBuilder.addAggregation(pipelineAggregationBuilder); - } + + for (String docValueField : context.extractedFields.getDocValueFields()) { + searchRequestBuilder.addDocValueField(docValueField); } + String[] sourceFields = context.extractedFields.getSourceFields(); + if (sourceFields.length == 0) { + searchRequestBuilder.setFetchSource(false); + } else { + searchRequestBuilder.setFetchSource(sourceFields, null); + } + for (SearchSourceBuilder.ScriptField scriptField : context.scriptFields) { searchRequestBuilder.addScriptField(scriptField.fieldName(), scriptField.script()); } @@ -132,10 +130,10 @@ class ScrollDataExtractor implements DataExtractor { } ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(context.jobFields, outputStream)) { + try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(context.extractedFields, outputStream)) { for (SearchHit hit : searchResponse.getHits().hits()) { if (isCancelled) { - Long timestamp = SearchHitFieldExtractor.extractTimeField(hit, context.timeField); + Long timestamp = context.extractedFields.timeFieldValue(hit); if (timestamp != null) { if (timestampOnCancel == null) { timestampOnCancel = timestamp; @@ -170,7 +168,10 @@ class ScrollDataExtractor implements DataExtractor { private QueryBuilder createQuery() { QueryBuilder userQuery = context.query; - QueryBuilder timeQuery = new RangeQueryBuilder(context.timeField).gte(context.start).lt(context.end).format("epoch_millis"); + QueryBuilder timeQuery = new RangeQueryBuilder(context.extractedFields.timeField()) + .gte(context.start) + .lt(context.end) + .format("epoch_millis"); return new BoolQueryBuilder().filter(userQuery).filter(timeQuery); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorContext.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorContext.java index ca040e8b854..03a60474261 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorContext.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorContext.java @@ -5,39 +5,32 @@ */ package org.elasticsearch.xpack.ml.scheduler.extractor.scroll; -import org.elasticsearch.common.Nullable; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.util.List; import java.util.Objects; -public class ScrollDataExtractorContext { +class ScrollDataExtractorContext { final String jobId; - final String[] jobFields; - final String timeField; + final ExtractedFields extractedFields; final String[] indexes; final String[] types; final QueryBuilder query; - @Nullable - final AggregatorFactories.Builder aggregations; final List scriptFields; final int scrollSize; final long start; final long end; - public ScrollDataExtractorContext(String jobId, List jobFields, String timeField, List indexes, List types, - QueryBuilder query, @Nullable AggregatorFactories.Builder aggregations, - List scriptFields, int scrollSize, long start, long end) { + public ScrollDataExtractorContext(String jobId, ExtractedFields extractedFields, List indexes, List types, + QueryBuilder query, List scriptFields, int scrollSize, + long start, long end) { this.jobId = Objects.requireNonNull(jobId); - this.jobFields = jobFields.toArray(new String[jobFields.size()]); - this.timeField = Objects.requireNonNull(timeField); + this.extractedFields = Objects.requireNonNull(extractedFields); this.indexes = indexes.toArray(new String[indexes.size()]); this.types = types.toArray(new String[types.size()]); this.query = Objects.requireNonNull(query); - this.aggregations = aggregations; this.scriptFields = Objects.requireNonNull(scriptFields); this.scrollSize = scrollSize; this.start = start; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorFactory.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorFactory.java index 6f8fcc73cfb..fea27652ac4 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorFactory.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorFactory.java @@ -18,24 +18,23 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory { private final Client client; private final SchedulerConfig schedulerConfig; private final Job job; + private final ExtractedFields extractedFields; public ScrollDataExtractorFactory(Client client, SchedulerConfig schedulerConfig, Job job) { this.client = Objects.requireNonNull(client); this.schedulerConfig = Objects.requireNonNull(schedulerConfig); this.job = Objects.requireNonNull(job); + this.extractedFields = ExtractedFields.build(job, schedulerConfig); } @Override public DataExtractor newExtractor(long start, long end) { - String timeField = job.getDataDescription().getTimeField(); ScrollDataExtractorContext dataExtractorContext = new ScrollDataExtractorContext( job.getId(), - job.allFields(), - timeField, + extractedFields, schedulerConfig.getIndexes(), schedulerConfig.getTypes(), schedulerConfig.getQuery(), - schedulerConfig.getAggregations(), schedulerConfig.getScriptFields(), schedulerConfig.getScrollSize(), start, diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitToJsonProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/SearchHitToJsonProcessor.java similarity index 66% rename from elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitToJsonProcessor.java rename to elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/SearchHitToJsonProcessor.java index c7ceb41d76c..51003c62f75 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitToJsonProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/SearchHitToJsonProcessor.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.scheduler.extractor; +package org.elasticsearch.xpack.ml.scheduler.extractor.scroll; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -12,21 +12,22 @@ import org.elasticsearch.search.SearchHit; import java.io.IOException; import java.io.OutputStream; +import java.util.Objects; -public class SearchHitToJsonProcessor implements Releasable { +class SearchHitToJsonProcessor implements Releasable { - private final String[] fields; + private final ExtractedFields fields; private final XContentBuilder jsonBuilder; - public SearchHitToJsonProcessor(String[] fields, OutputStream outputStream) throws IOException { - this.fields = fields; - jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream); + public SearchHitToJsonProcessor(ExtractedFields fields, OutputStream outputStream) throws IOException { + this.fields = Objects.requireNonNull(fields); + this.jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream); } public void process(SearchHit hit) throws IOException { jsonBuilder.startObject(); - for (String field : fields) { - writeKeyValue(field, SearchHitFieldExtractor.extractField(hit, field)); + for (ExtractedField field : fields.getAllFields()) { + writeKeyValue(field.getName(), field.value(hit)); } jsonBuilder.endObject(); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransform.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransform.java index 6642f2775ec..90c1206b098 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransform.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransform.java @@ -5,17 +5,16 @@ */ package org.elasticsearch.xpack.ml.transforms.date; -import java.time.ZoneId; -import java.time.format.DateTimeParseException; -import java.util.List; -import java.util.Locale; - import org.apache.logging.log4j.Logger; - import org.elasticsearch.xpack.ml.transforms.TransformException; import org.elasticsearch.xpack.ml.utils.time.DateTimeFormatterTimestampConverter; import org.elasticsearch.xpack.ml.utils.time.TimestampConverter; +import java.time.ZoneOffset; +import java.time.format.DateTimeParseException; +import java.util.List; +import java.util.Locale; + /** * A transform that attempts to parse a String timestamp * according to a timeFormat. It converts that @@ -25,12 +24,11 @@ public class DateFormatTransform extends DateTransform { private final String timeFormat; private final TimestampConverter dateToEpochConverter; - public DateFormatTransform(String timeFormat, ZoneId defaultTimezone, - List readIndexes, List writeIndexes, Logger logger) { + public DateFormatTransform(String timeFormat, List readIndexes, List writeIndexes, Logger logger) { super(readIndexes, writeIndexes, logger); this.timeFormat = timeFormat; - dateToEpochConverter = DateTimeFormatterTimestampConverter.ofPattern(timeFormat, defaultTimezone); + dateToEpochConverter = DateTimeFormatterTimestampConverter.ofPattern(timeFormat, ZoneOffset.systemDefault()); } @Override @@ -38,9 +36,7 @@ public class DateFormatTransform extends DateTransform { try { return dateToEpochConverter.toEpochMillis(field); } catch (DateTimeParseException pe) { - String message = String.format(Locale.ROOT, "Cannot parse date '%s' with format string '%s'", - field, timeFormat); - + String message = String.format(Locale.ROOT, "Cannot parse date '%s' with format string '%s'", field, timeFormat); throw new ParseTimestampException(message); } } diff --git a/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties b/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties index 00305d0ff66..7cf953c4baa 100644 --- a/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties +++ b/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties @@ -144,7 +144,6 @@ scheduler.config.invalid.option.value = Invalid {0} value ''{1}'' in scheduler c scheduler.does.not.support.job.with.latency = A job configured with scheduler cannot support latency scheduler.aggregations.requires.job.with.summary.count.field = A job configured with a scheduler with aggregations must have summary_count_field_name ''{0}'' -scheduler.requires.job.with.dataformat.elasticsearch = A job configured with a scheduler must have dataFormat ''ELASTICSEARCH'' job.data.concurrent.use.close = Cannot close job {0} while another connection {2}is {1} the job job.data.concurrent.use.flush = Cannot flush job {0} while another connection {2}is {1} the job diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java index 38c10cdbcf6..bdd8a51f668 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java @@ -97,7 +97,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { client().execute(StartSchedulerAction.INSTANCE, startSchedulerRequest).get(); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); - assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs + numDocs2)); + assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2)); assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); MlMetadata mlMetadata = client().admin().cluster().prepareState().all().get() @@ -139,7 +139,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { t.start(); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); - assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs1)); + assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1)); assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); }); @@ -148,7 +148,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { indexDocs("data", numDocs2, now + 5000, now + 6000); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); - assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs1 + numDocs2)); + assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1 + numDocs2)); assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); }, 30, TimeUnit.SECONDS); @@ -182,8 +182,8 @@ public class ScheduledJobsIT extends ESIntegTestCase { private Job.Builder createJob() { DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + dataDescription.setFormat(DataDescription.DataFormat.JSON); + dataDescription.setTimeFormat("yyyy-MM-dd HH:mm:ss"); Detector.Builder d = new Detector.Builder("count", null); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledJobIT.java index b653d2c3849..f8220a4e787 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledJobIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.ml.MlPlugin; import org.junit.After; +import org.junit.Before; import java.io.BufferedReader; import java.io.IOException; @@ -25,35 +26,125 @@ import static org.hamcrest.Matchers.equalTo; public class ScheduledJobIT extends ESRestTestCase { - public void testStartJobScheduler_GivenLookbackOnly() throws Exception { - String jobId = "job-1"; - createAirlineDataIndex(); - createJob(jobId); - String schedulerId = "sched-1"; - createScheduler(schedulerId, jobId); - openJob(client(), jobId); + @Before + public void setUpData() throws Exception { + // Create index with source = enabled, doc_values = enabled, stored = false + String mappings = "{" + + " \"mappings\": {" + + " \"response\": {" + + " \"properties\": {" + + " \"time stamp\": { \"type\":\"date\"}," // space in 'time stamp' is intentional + + " \"airline\": { \"type\":\"keyword\"}," + + " \"responsetime\": { \"type\":\"float\"}" + + " }" + + " }" + + " }" + + "}"; + client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings)); - Response startSchedulerRequest = client().performRequest("post", - MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z"); - assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200)); - assertThat(responseEntityToString(startSchedulerRequest), containsString("{\"task\":\"")); - assertBusy(() -> { - try { - Response getJobResponse = client().performRequest("get", - MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); - assertThat(responseEntityToString(getJobResponse), containsString("\"input_record_count\":2")); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + client().performRequest("put", "airline-data/response/1", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}")); + client().performRequest("put", "airline-data/response/2", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}")); + + // Create index with source = enabled, doc_values = disabled (except time), stored = false + mappings = "{" + + " \"mappings\": {" + + " \"response\": {" + + " \"properties\": {" + + " \"time stamp\": { \"type\":\"date\"}," + + " \"airline\": { \"type\":\"keyword\", \"doc_values\":false}," + + " \"responsetime\": { \"type\":\"float\", \"doc_values\":false}" + + " }" + + " }" + + " }" + + "}"; + client().performRequest("put", "airline-data-disabled-doc-values", Collections.emptyMap(), new StringEntity(mappings)); + + client().performRequest("put", "airline-data-disabled-doc-values/response/1", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}")); + client().performRequest("put", "airline-data-disabled-doc-values/response/2", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}")); + + // Create index with source = disabled, doc_values = enabled (except time), stored = true + mappings = "{" + + " \"mappings\": {" + + " \"response\": {" + + " \"_source\":{\"enabled\":false}," + + " \"properties\": {" + + " \"time stamp\": { \"type\":\"date\", \"store\":true}," + + " \"airline\": { \"type\":\"keyword\", \"store\":true}," + + " \"responsetime\": { \"type\":\"float\", \"store\":true}" + + " }" + + " }" + + " }" + + "}"; + client().performRequest("put", "airline-data-disabled-source", Collections.emptyMap(), new StringEntity(mappings)); + + client().performRequest("put", "airline-data-disabled-source/response/1", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}")); + client().performRequest("put", "airline-data-disabled-source/response/2", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}")); + + // Create index with nested documents + mappings = "{" + + " \"mappings\": {" + + " \"response\": {" + + " \"properties\": {" + + " \"time\": { \"type\":\"date\"}" + + " }" + + " }" + + " }" + + "}"; + client().performRequest("put", "nested-data", Collections.emptyMap(), new StringEntity(mappings)); + + client().performRequest("put", "nested-data/response/1", Collections.emptyMap(), + new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\", \"responsetime\":{\"millis\":135.22}}")); + client().performRequest("put", "nested-data/response/2", Collections.emptyMap(), + new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"responsetime\":{\"millis\":222.0}}")); + + // Ensure all data is searchable + client().performRequest("post", "_refresh"); } - public void testStartJobScheduler_GivenRealtime() throws Exception { - String jobId = "job-2"; - createAirlineDataIndex(); + public void testLookbackOnly() throws Exception { + new LookbackOnlyTestHelper("lookback-1", "airline-data").setShouldSucceedProcessing(true).execute(); + } + + public void testLookbackOnlyWithSchedulerSourceEnabled() throws Exception { + new LookbackOnlyTestHelper("lookback-2", "airline-data").setEnableSchedulerSource(true).execute(); + } + + public void testLookbackOnlyWithDocValuesDisabledAndSchedulerSourceDisabled() throws Exception { + new LookbackOnlyTestHelper("lookback-3", "airline-data-disabled-doc-values").setShouldSucceedInput(false) + .setShouldSucceedProcessing(false).execute(); + } + + public void testLookbackOnlyWithDocValuesDisabledAndSchedulerSourceEnabled() throws Exception { + new LookbackOnlyTestHelper("lookback-4", "airline-data-disabled-doc-values").setEnableSchedulerSource(true).execute(); + } + + public void testLookbackOnlyWithSourceDisabled() throws Exception { + new LookbackOnlyTestHelper("lookback-5", "airline-data-disabled-source").execute(); + } + + public void testLookbackOnlyWithScriptFields() throws Exception { + new LookbackOnlyTestHelper("lookback-6", "airline-data-disabled-source").setAddScriptedFields(true).execute(); + } + + public void testLookbackOnlyWithNestedFieldsAndSchedulerSourceDisabled() throws Exception { + executeTestLookbackOnlyWithNestedFields("lookback-7", false); + } + + public void testLookbackOnlyWithNestedFieldsAndSchedulerSourceEnabled() throws Exception { + executeTestLookbackOnlyWithNestedFields("lookback-8", true); + } + + public void testRealtime() throws Exception { + String jobId = "job-realtime-1"; createJob(jobId); - String schedulerId = "sched-2"; - createScheduler(schedulerId, jobId); + String schedulerId = jobId + "-scheduler"; + createScheduler(schedulerId, jobId, "airline-data", false, false); openJob(client(), jobId); Response response = client().performRequest("post", @@ -65,7 +156,7 @@ public class ScheduledJobIT extends ESRestTestCase { Response getJobResponse = client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); String responseAsString = responseEntityToString(getJobResponse); - assertThat(responseAsString, containsString("\"input_record_count\":2")); + assertThat(responseAsString, containsString("\"processed_record_count\":2")); } catch (Exception e1) { throw new RuntimeException(e1); } @@ -93,35 +184,98 @@ public class ScheduledJobIT extends ESRestTestCase { assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}")); } - private void createAirlineDataIndex() throws Exception { - String airlineDataMappings = "{" + " \"mappings\": {" + " \"response\": {" + " \"properties\": {" - + " \"time\": { \"type\":\"date\"}," + " \"airline\": { \"type\":\"keyword\"}," - + " \"responsetime\": { \"type\":\"float\"}" + " }" + " }" + " }" + "}"; - client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(airlineDataMappings)); + private class LookbackOnlyTestHelper { + private String jobId; + private String dataIndex; + private boolean addScriptedFields; + private boolean enableSchedulerSource; + private boolean shouldSucceedInput; + private boolean shouldSucceedProcessing; - client().performRequest("put", "airline-data/response/1", Collections.emptyMap(), - new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}")); - client().performRequest("put", "airline-data/response/2", Collections.emptyMap(), - new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}")); + public LookbackOnlyTestHelper(String jobId, String dataIndex) { + this.jobId = jobId; + this.dataIndex = dataIndex; + this.shouldSucceedInput = true; + this.shouldSucceedProcessing = true; + } - client().performRequest("post", "airline-data/_refresh"); + public LookbackOnlyTestHelper setAddScriptedFields(boolean value) { + addScriptedFields = value; + return this; + } + + public LookbackOnlyTestHelper setEnableSchedulerSource(boolean value) { + enableSchedulerSource = value; + return this; + } + + public LookbackOnlyTestHelper setShouldSucceedInput(boolean value) { + shouldSucceedInput = value; + return this; + } + + public LookbackOnlyTestHelper setShouldSucceedProcessing(boolean value) { + shouldSucceedProcessing = value; + return this; + } + + public void execute() throws Exception { + createJob(jobId); + String schedulerId = "scheduler-" + jobId; + createScheduler(schedulerId, jobId, dataIndex, enableSchedulerSource, addScriptedFields); + openJob(client(), jobId); + + startSchedulerAndWaitUntilStopped(schedulerId); + Response jobStatsResponse = client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); + String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); + if (shouldSucceedInput) { + assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2")); + } else { + assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":0")); + } + if (shouldSucceedProcessing) { + assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2")); + } else { + assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":0")); + } + assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); + } + } + + private void startSchedulerAndWaitUntilStopped(String schedulerId) throws Exception { + Response startSchedulerRequest = client().performRequest("post", + MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z"); + assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(responseEntityToString(startSchedulerRequest), containsString("{\"task\":\"")); + assertBusy(() -> { + try { + Response schedulerStatsResponse = client().performRequest("get", + MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_stats"); + assertThat(responseEntityToString(schedulerStatsResponse), containsString("\"status\":\"STOPPED\"")); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } private Response createJob(String id) throws Exception { String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n" + " \"analysis_config\" : {\n" + " \"bucket_span\":3600,\n" + " \"detectors\" :[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n" - + " },\n" + " \"data_description\" : {\n" + " \"format\":\"ELASTICSEARCH\",\n" - + " \"time_field\":\"time\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n" + + " },\n" + " \"data_description\" : {\n" + " \"format\":\"JSON\",\n" + + " \"time_field\":\"time stamp\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n" + "}"; return client().performRequest("put", MlPlugin.BASE_PATH + "anomaly_detectors/" + id, Collections.emptyMap(), new StringEntity(job)); } - private Response createScheduler(String schedulerId, String jobId) throws IOException { - String schedulerConfig = "{" + "\"job_id\": \"" + jobId + "\",\n" + "\"indexes\":[\"airline-data\"],\n" - + "\"types\":[\"response\"]\n" + "}"; + private Response createScheduler(String schedulerId, String jobId, String dataIndex, boolean source, boolean addScriptedFields) + throws IOException { + String schedulerConfig = "{" + "\"job_id\": \"" + jobId + "\",\n" + "\"indexes\":[\"" + dataIndex + "\"],\n" + + "\"types\":[\"response\"]" + (source ? ",\"_source\":true" : "") + (addScriptedFields ? + ",\"script_fields\":{\"airline\":{\"script\":{\"lang\":\"painless\",\"inline\":\"doc['airline'].value\"}}}" : "") + +"}"; return client().performRequest("put", MlPlugin.BASE_PATH + "schedulers/" + schedulerId, Collections.emptyMap(), new StringEntity(schedulerConfig)); } @@ -137,6 +291,24 @@ public class ScheduledJobIT extends ESRestTestCase { assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); } + private void executeTestLookbackOnlyWithNestedFields(String jobId, boolean source) throws Exception { + String job = "{\"description\":\"Nested job\", \"analysis_config\" : {\"bucket_span\":3600,\"detectors\" :" + + "[{\"function\":\"mean\",\"field_name\":\"responsetime.millis\"}]}, \"data_description\" : {\"time_field\":\"time\"}" + + "}"; + client().performRequest("put", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), new StringEntity(job)); + + String schedulerId = jobId + "-scheduler"; + createScheduler(schedulerId, jobId, "nested-data", source, false); + openJob(client(), jobId); + + startSchedulerAndWaitUntilStopped(schedulerId); + Response jobStatsResponse = client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); + String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); + assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2")); + assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2")); + assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); + } + @After public void clearMlState() throws Exception { new MlRestTestStateCleaner(client(), this).clearMlMetadata(); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/DataFormatTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/DataFormatTests.java index a2b7c7f6706..adbcd7a4b65 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/DataFormatTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/DataFormatTests.java @@ -34,7 +34,6 @@ public class DataFormatTests extends ESTestCase { assertThat(DataFormat.JSON.ordinal(), equalTo(0)); assertThat(DataFormat.DELIMITED.ordinal(), equalTo(1)); assertThat(DataFormat.SINGLE_LINE.ordinal(), equalTo(2)); - assertThat(DataFormat.ELASTICSEARCH.ordinal(), equalTo(3)); } public void testwriteTo() throws Exception { @@ -58,13 +57,6 @@ public class DataFormatTests extends ESTestCase { assertThat(in.readVInt(), equalTo(2)); } } - - try (BytesStreamOutput out = new BytesStreamOutput()) { - DataFormat.ELASTICSEARCH.writeTo(out); - try (StreamInput in = out.bytes().streamInput()) { - assertThat(in.readVInt(), equalTo(3)); - } - } } public void testReadFrom() throws Exception { @@ -86,12 +78,6 @@ public class DataFormatTests extends ESTestCase { assertThat(DataFormat.readFromStream(in), equalTo(DataFormat.SINGLE_LINE)); } } - try (BytesStreamOutput out = new BytesStreamOutput()) { - out.writeVInt(3); - try (StreamInput in = out.bytes().streamInput()) { - assertThat(DataFormat.readFromStream(in), equalTo(DataFormat.ELASTICSEARCH)); - } - } } public void testInvalidReadFrom() throws Exception { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/manager/AutodetectProcessManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/manager/AutodetectProcessManagerTests.java index 2f7defd0634..d89c4eec243 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/manager/AutodetectProcessManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/manager/AutodetectProcessManagerTests.java @@ -44,6 +44,7 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Iterator; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import java.util.function.Supplier; @@ -162,7 +163,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = createManager(communicator); assertEquals(0, manager.numberOfOpenJobs()); - DataLoadParams params = new DataLoadParams(TimeRange.builder().build()); + DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()); manager.openJob("foo", false); manager.processData("foo", createInputStream(""), params, () -> false); assertEquals(1, manager.numberOfOpenJobs()); @@ -202,7 +203,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = createManager(communicator); Supplier cancellable = () -> false; - DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), true); + DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), true, Optional.empty()); InputStream inputStream = createInputStream(""); manager.openJob("foo", false); manager.processData("foo", inputStream, params, cancellable); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java index 4e756c500ca..a9a61c0c39b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.ml.job.AnalysisConfig; -import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.JobStatus; import org.elasticsearch.xpack.ml.job.JobTests; @@ -43,7 +42,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { int numJobs = randomIntBetween(0, 10); for (int i = 0; i < numJobs; i++) { Job job = JobTests.createRandomizedJob(); - if (job.getDataDescription() != null && job.getDataDescription().getFormat() == DataDescription.DataFormat.ELASTICSEARCH) { + if (randomBoolean()) { SchedulerConfig schedulerConfig = SchedulerConfigTests.createRandomizedSchedulerConfig(job.getId()); if (schedulerConfig.getAggregations() != null) { AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job.getAnalysisConfig().getDetectors()); @@ -238,9 +237,9 @@ public class MlMetadataTests extends AbstractSerializingTestCase { public void testPutScheduler_failBecauseJobIsNotCompatibleForScheduler() { Job.Builder job1 = createScheduledJob(); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.DELIMITED); - job1.setDataDescription(dataDescription); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job1.build().getAnalysisConfig()); + analysisConfig.setLatency(3600L); + job1.setAnalysisConfig(analysisConfig); SchedulerConfig schedulerConfig1 = createSchedulerConfig("scheduler1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1.build(), false); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 2bfaba62b3b..f5c2c80119e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -24,6 +24,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Collections; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -39,7 +40,7 @@ import static org.mockito.Mockito.when; public class AutodetectCommunicatorTests extends ESTestCase { public void testWriteResetBucketsControlMessage() throws IOException { - DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build()); + DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build(), false, Optional.empty()); AutodetectProcess process = mockAutodetectProcessWithOutputStream(); try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) { communicator.writeToJob(new ByteArrayInputStream(new byte[0]), params, () -> false); @@ -147,7 +148,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { () -> communicator.writeToJob(in, mock(DataLoadParams.class), () -> false)); communicator.inUse.set(null); - communicator.writeToJob(in, mock(DataLoadParams.class), () -> false); + communicator.writeToJob(in, new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()), () -> false); } public void testFlushInUse() throws IOException { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 3827287416f..30a69a41e5e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; import java.util.Collections; +import java.util.Optional; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; @@ -105,7 +106,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) { - DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), true); + DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), true, Optional.empty()); process.writeResetBucketsControlMessage(params); process.flushStream(); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParamsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParamsTests.java index e68b5a22e2e..c71b28bd37d 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParamsTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/DataLoadParamsTests.java @@ -7,19 +7,21 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.params; import org.elasticsearch.test.ESTestCase; +import java.util.Optional; + public class DataLoadParamsTests extends ESTestCase { public void testGetStart() { - assertEquals("", new DataLoadParams(TimeRange.builder().build()).getStart()); - assertEquals("3", new DataLoadParams(TimeRange.builder().startTime("3").build()).getStart()); + assertEquals("", new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()).getStart()); + assertEquals("3", new DataLoadParams(TimeRange.builder().startTime("3").build(), false, Optional.empty()).getStart()); } public void testGetEnd() { - assertEquals("", new DataLoadParams(TimeRange.builder().build()).getEnd()); - assertEquals("1", new DataLoadParams(TimeRange.builder().endTime("1").build()).getEnd()); + assertEquals("", new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()).getEnd()); + assertEquals("1", new DataLoadParams(TimeRange.builder().endTime("1").build(), false, Optional.empty()).getEnd()); } public void testIsResettingBuckets() { - assertFalse(new DataLoadParams(TimeRange.builder().build()).isResettingBuckets()); - assertTrue(new DataLoadParams(TimeRange.builder().startTime("5").build()).isResettingBuckets()); + assertFalse(new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()).isResettingBuckets()); + assertTrue(new DataLoadParams(TimeRange.builder().startTime("5").build(), false, Optional.empty()).isResettingBuckets()); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java index cfdd8bbccef..968a9a240ac 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java @@ -10,6 +10,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verifyNoMoreInteractions; import java.io.IOException; +import java.util.Optional; import java.util.stream.IntStream; import org.elasticsearch.test.ESTestCase; @@ -127,7 +128,8 @@ public class ControlMsgToProcessWriterTests extends ESTestCase { public void testWriteResetBucketsMessage() throws IOException { ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2); - writer.writeResetBucketsMessage(new DataLoadParams(TimeRange.builder().startTime("0").endTime("600").build())); + writer.writeResetBucketsMessage( + new DataLoadParams(TimeRange.builder().startTime("0").endTime("600").build(), false, Optional.empty())); InOrder inOrder = inOrder(lengthEncodedWriter); inOrder.verify(lengthEncodedWriter).writeNumFields(4); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactoryTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactoryTests.java index da3983e99d0..0fbba65c35b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactoryTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactoryTests.java @@ -24,13 +24,6 @@ public class DataToProcessWriterFactoryTests extends ESTestCase { assertTrue(createWriter(dataDescription.build()) instanceof JsonDataToProcessWriter); } - public void testCreate_GivenDataFormatIsElasticsearch() { - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataFormat.ELASTICSEARCH); - - assertTrue(createWriter(dataDescription.build()) instanceof JsonDataToProcessWriter); - } - public void testCreate_GivenDataFormatIsCsv() { DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setFormat(DataFormat.DELIMITED); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunnerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunnerTests.java index 51f173ccc94..c3b7798346b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunnerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunnerTests.java @@ -146,12 +146,21 @@ public class ScheduledJobRunnerTests extends ESTestCase { verify(threadPool, times(1)).executor(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); - verify(client).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo"))); + verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest(job))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STARTED)), any()); verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STOPPED)), any()); } + private static PostDataAction.Request createExpectedPostDataRequest(Job job) { + DataDescription.Builder expectedDataDescription = new DataDescription.Builder(); + expectedDataDescription.setTimeFormat("epoch_ms"); + expectedDataDescription.setFormat(DataDescription.DataFormat.JSON); + PostDataAction.Request expectedPostDataRequest = new PostDataAction.Request(job.getId()); + expectedPostDataRequest.setDataDescription(expectedDataDescription.build()); + return expectedPostDataRequest; + } + public void testStart_extractionProblem() throws Exception { Job.Builder jobBuilder = createScheduledJob(); SchedulerConfig schedulerConfig = createSchedulerConfig("scheduler1", "foo").build(); @@ -213,7 +222,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { task.stop(); verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STOPPED)), any()); } else { - verify(client).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo"))); + verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest(job))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME), any()); } @@ -233,9 +242,6 @@ public class ScheduledJobRunnerTests extends ESTestCase { Job.Builder builder = new Job.Builder("foo"); builder.setAnalysisConfig(acBuilder); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - builder.setDataDescription(dataDescription); return builder; } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobTests.java index 5d0c87ce466..131eb9ec628 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.action.FlushJobAction; import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.job.DataCounts; +import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.audit.Auditor; import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractor; import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractorFactory; @@ -40,6 +41,7 @@ public class ScheduledJobTests extends ESTestCase { private DataExtractorFactory dataExtractorFactory; private DataExtractor dataExtractor; private Client client; + private DataDescription.Builder dataDescription; private ActionFuture flushJobFuture; private long currentTime; @@ -52,6 +54,8 @@ public class ScheduledJobTests extends ESTestCase { dataExtractor = mock(DataExtractor.class); when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor); client = mock(Client.class); + dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.JSON); ActionFuture jobDataFuture = mock(ActionFuture.class); flushJobFuture = mock(ActionFuture.class); currentTime = 0; @@ -60,7 +64,10 @@ public class ScheduledJobTests extends ESTestCase { InputStream inputStream = new ByteArrayInputStream("content".getBytes(StandardCharsets.UTF_8)); when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); - when(client.execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("_job_id")))).thenReturn(jobDataFuture); + + PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id"); + expectedRequest.setDataDescription(dataDescription.build()); + when(client.execute(same(PostDataAction.INSTANCE), eq(expectedRequest))).thenReturn(jobDataFuture); when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); } @@ -176,7 +183,7 @@ public class ScheduledJobTests extends ESTestCase { private ScheduledJob createScheduledJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { Supplier currentTimeSupplier = () -> currentTime; - return new ScheduledJob("_job_id", frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor, + return new ScheduledJob("_job_id", dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor, currentTimeSupplier, latestFinalBucketEndTimeMs, latestRecordTimeMs); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidatorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidatorTests.java index 87a38c2dd4a..20cb881a629 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidatorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobValidatorTests.java @@ -9,7 +9,6 @@ import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.AnalysisConfig; -import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.Detector; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.messages.Messages; @@ -39,9 +38,6 @@ public class ScheduledJobValidatorTests extends ESTestCase { public void testVerify_GivenZeroLatency() { Job.Builder builder = buildJobBuilder("foo"); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - builder.setDataDescription(dataDescription); AnalysisConfig.Builder ac = createAnalysisConfig(); ac.setBucketSpan(1800L); ac.setLatency(0L); @@ -54,9 +50,6 @@ public class ScheduledJobValidatorTests extends ESTestCase { public void testVerify_GivenNoLatency() { Job.Builder builder = buildJobBuilder("foo"); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - builder.setDataDescription(dataDescription); AnalysisConfig.Builder ac = createAnalysisConfig(); ac.setBatchSpan(1800L); ac.setBucketSpan(100L); @@ -69,9 +62,6 @@ public class ScheduledJobValidatorTests extends ESTestCase { public void testVerify_GivenAggsAndCorrectSummaryCountField() throws IOException { Job.Builder builder = buildJobBuilder("foo"); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - builder.setDataDescription(dataDescription); AnalysisConfig.Builder ac = createAnalysisConfig(); ac.setBucketSpan(1800L); ac.setSummaryCountFieldName("doc_count"); @@ -86,9 +76,6 @@ public class ScheduledJobValidatorTests extends ESTestCase { String errorMessage = Messages.getMessage(Messages.SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, SchedulerConfig.DOC_COUNT); Job.Builder builder = buildJobBuilder("foo"); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - builder.setDataDescription(dataDescription); AnalysisConfig.Builder ac = createAnalysisConfig(); ac.setBucketSpan(1800L); builder.setAnalysisConfig(ac); @@ -105,9 +92,6 @@ public class ScheduledJobValidatorTests extends ESTestCase { String errorMessage = Messages.getMessage( Messages.SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, SchedulerConfig.DOC_COUNT); Job.Builder builder = buildJobBuilder("foo"); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - builder.setDataDescription(dataDescription); AnalysisConfig.Builder ac = createAnalysisConfig(); ac.setBucketSpan(1800L); ac.setSummaryCountFieldName("wrong"); @@ -125,9 +109,7 @@ public class ScheduledJobValidatorTests extends ESTestCase { Job.Builder builder = new Job.Builder(id); builder.setCreateTime(new Date()); AnalysisConfig.Builder ac = createAnalysisConfig(); - DataDescription.Builder dc = new DataDescription.Builder(); builder.setAnalysisConfig(ac); - builder.setDataDescription(dc); return builder; } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfigTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfigTests.java index 615bbedb408..70eec105688 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfigTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/SchedulerConfigTests.java @@ -61,6 +61,9 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase SearchHitFieldExtractor.extractTimeField(searchHit, "time")); - } - - public void testExtractTimeFieldGivenSingleValueInFields() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - Map fields = new HashMap<>(); - fields.put("time", new InternalSearchHitField("time", Arrays.asList(3L))); - searchHit.fields(fields); - - assertThat(SearchHitFieldExtractor.extractTimeField(searchHit, "time"), equalTo(3L)); - } - - public void testExtractTimeFieldGivenSingleValueInSource() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - searchHit.sourceRef(new BytesArray("{\"time\":1482418307000}")); - - assertThat(SearchHitFieldExtractor.extractTimeField(searchHit, "time"), equalTo(1482418307000L)); - } - - public void testExtractTimeFieldGivenArrayValue() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - Map fields = new HashMap<>(); - fields.put("time", new InternalSearchHitField("time", Arrays.asList(3L, 5L))); - searchHit.fields(fields); - - expectThrows(RuntimeException.class, () -> SearchHitFieldExtractor.extractTimeField(searchHit, "time")); - } - - public void testExtractTimeFieldGivenSingleNonLongValue() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - Map fields = new HashMap<>(); - fields.put("time", new InternalSearchHitField("time", Arrays.asList(3))); - searchHit.fields(fields); - - expectThrows(RuntimeException.class, () -> SearchHitFieldExtractor.extractTimeField(searchHit, "time")); - } -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitToJsonProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitToJsonProcessorTests.java deleted file mode 100644 index f9788ee9f8a..00000000000 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/SearchHitToJsonProcessorTests.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.scheduler.extractor; - -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHitField; -import org.elasticsearch.search.internal.InternalSearchHit; -import org.elasticsearch.search.internal.InternalSearchHitField; -import org.elasticsearch.test.ESTestCase; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import static org.hamcrest.Matchers.equalTo; - -public class SearchHitToJsonProcessorTests extends ESTestCase { - - public void testProcessGivenHitContainsNothing() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - - String json = searchHitToString(new String[] {"field_1", "field_2"}, searchHit); - - assertThat(json, equalTo("{}")); - } - - public void testProcessGivenHitContainsEmptySource() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - searchHit.sourceRef(new BytesArray("{}")); - - String json = searchHitToString(new String[] {"field_1", "field_2"}, searchHit); - - assertThat(json, equalTo("{}")); - } - - public void testProcessGivenHitContainsSingleValueInFields() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - Map fields = new HashMap<>(); - fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(3))); - searchHit.fields(fields); - - String json = searchHitToString(new String[] {"field_1"}, searchHit); - - assertThat(json, equalTo("{\"field_1\":3}")); - } - - public void testProcessGivenHitContainsArrayValueInFields() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - Map fields = new HashMap<>(); - fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(3, 9))); - searchHit.fields(fields); - - String json = searchHitToString(new String[] {"field_1"}, searchHit); - - assertThat(json, equalTo("{\"field_1\":[3,9]}")); - } - - public void testProcessGivenHitContainsSingleValueInSource() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - String hitSource = "{\"field_1\":\"foo\"}"; - searchHit.sourceRef(new BytesArray(hitSource)); - - String json = searchHitToString(new String[] {"field_1"}, searchHit); - - assertThat(json, equalTo("{\"field_1\":\"foo\"}")); - } - - public void testProcessGivenHitContainsArrayValueInSource() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - String hitSource = "{\"field_1\":[\"foo\",\"bar\"]}"; - searchHit.sourceRef(new BytesArray(hitSource)); - - String json = searchHitToString(new String[] {"field_1"}, searchHit); - - assertThat(json, equalTo("{\"field_1\":[\"foo\",\"bar\"]}")); - } - - public void testProcessGivenHitContainsFieldsAndSource() throws IOException { - InternalSearchHit searchHit = new InternalSearchHit(42); - String hitSource = "{\"field_1\":\"foo\"}"; - searchHit.sourceRef(new BytesArray(hitSource)); - Map fields = new HashMap<>(); - fields.put("field_2", new InternalSearchHitField("field_2", Arrays.asList("bar"))); - searchHit.fields(fields); - - String json = searchHitToString(new String[] {"field_1", "field_2"}, searchHit); - - assertThat(json, equalTo("{\"field_1\":\"foo\",\"field_2\":\"bar\"}")); - } - - public void testProcessGivenMultipleHits() throws IOException { - InternalSearchHit searchHit1 = new InternalSearchHit(42); - Map fields = new HashMap<>(); - fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(3))); - searchHit1.fields(fields); - InternalSearchHit searchHit2 = new InternalSearchHit(42); - fields = new HashMap<>(); - fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(5))); - searchHit2.fields(fields); - - String json = searchHitToString(new String[] {"field_1"}, searchHit1, searchHit2); - - assertThat(json, equalTo("{\"field_1\":3} {\"field_1\":5}")); - } - - private String searchHitToString(String[] fields, SearchHit... searchHits) throws IOException { - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(fields, outputStream)) { - for (int i = 0; i < searchHits.length; i++) { - hitProcessor.process(searchHits[i]); - } - } - return outputStream.toString(StandardCharsets.UTF_8.name()); - } -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFieldTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFieldTests.java new file mode 100644 index 00000000000..48e78c19422 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFieldTests.java @@ -0,0 +1,130 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.scheduler.extractor.scroll; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHitField; +import org.elasticsearch.search.internal.InternalSearchHit; +import org.elasticsearch.search.internal.InternalSearchHitField; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class ExtractedFieldTests extends ESTestCase { + + public void testValueGivenDocValue() { + SearchHit hit = new SearchHitBuilder(42).addField("single", "bar").addField("array", Arrays.asList("a", "b")).build(); + + ExtractedField single = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE); + assertThat(single.value(hit), equalTo(new String[] { "bar" })); + + ExtractedField array = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE); + assertThat(array.value(hit), equalTo(new String[] { "a", "b" })); + + ExtractedField missing = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE); + assertThat(missing.value(hit), equalTo(new Object[0])); + } + + public void testValueGivenScriptField() { + SearchHit hit = new SearchHitBuilder(42).addField("single", "bar").addField("array", Arrays.asList("a", "b")).build(); + + ExtractedField single = ExtractedField.newField("single", ExtractedField.ExtractionMethod.SCRIPT_FIELD); + assertThat(single.value(hit), equalTo(new String[] { "bar" })); + + ExtractedField array = ExtractedField.newField("array", ExtractedField.ExtractionMethod.SCRIPT_FIELD); + assertThat(array.value(hit), equalTo(new String[] { "a", "b" })); + + ExtractedField missing = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.SCRIPT_FIELD); + assertThat(missing.value(hit), equalTo(new Object[0])); + } + + public void testValueGivenSource() { + SearchHit hit = new SearchHitBuilder(42).setSource("{\"single\":\"bar\",\"array\":[\"a\",\"b\"]}").build(); + + ExtractedField single = ExtractedField.newField("single", ExtractedField.ExtractionMethod.SOURCE); + assertThat(single.value(hit), equalTo(new String[] { "bar" })); + + ExtractedField array = ExtractedField.newField("array", ExtractedField.ExtractionMethod.SOURCE); + assertThat(array.value(hit), equalTo(new String[] { "a", "b" })); + + ExtractedField missing = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.SOURCE); + assertThat(missing.value(hit), equalTo(new Object[0])); + } + + public void testValueGivenNestedSource() { + SearchHit hit = new SearchHitBuilder(42).setSource("{\"level_1\":{\"level_2\":{\"foo\":\"bar\"}}}").build(); + + ExtractedField nested = ExtractedField.newField("level_1.level_2.foo", ExtractedField.ExtractionMethod.SOURCE); + assertThat(nested.value(hit), equalTo(new String[] { "bar" })); + } + + public void testValueGivenSourceAndHitWithNoSource() { + ExtractedField missing = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.SOURCE); + assertThat(missing.value(new SearchHitBuilder(3).build()), equalTo(new Object[0])); + } + + public void testValueGivenMismatchingMethod() { + SearchHit hit = new SearchHitBuilder(42).addField("a", 1).setSource("{\"b\":2}").build(); + + ExtractedField invalidA = ExtractedField.newField("a", ExtractedField.ExtractionMethod.SOURCE); + assertThat(invalidA.value(hit), equalTo(new Object[0])); + ExtractedField validA = ExtractedField.newField("a", ExtractedField.ExtractionMethod.DOC_VALUE); + assertThat(validA.value(hit), equalTo(new Integer[] { 1 })); + + ExtractedField invalidB = ExtractedField.newField("b", ExtractedField.ExtractionMethod.DOC_VALUE); + assertThat(invalidB.value(hit), equalTo(new Object[0])); + ExtractedField validB = ExtractedField.newField("b", ExtractedField.ExtractionMethod.SOURCE); + assertThat(validB.value(hit), equalTo(new Integer[] { 2 })); + } + + public void testValueGivenEmptyHit() { + SearchHit hit = new SearchHitBuilder(42).build(); + + ExtractedField docValue = ExtractedField.newField("a", ExtractedField.ExtractionMethod.SOURCE); + assertThat(docValue.value(hit), equalTo(new Object[0])); + + ExtractedField sourceField = ExtractedField.newField("b", ExtractedField.ExtractionMethod.DOC_VALUE); + assertThat(sourceField.value(hit), equalTo(new Object[0])); + } + + static class SearchHitBuilder { + + private final InternalSearchHit hit; + private final Map fields; + + SearchHitBuilder(int docId) { + hit = new InternalSearchHit(docId); + fields = new HashMap<>(); + } + + SearchHitBuilder addField(String name, Object value) { + return addField(name, Arrays.asList(value)); + } + + SearchHitBuilder addField(String name, List values) { + fields.put(name, new InternalSearchHitField(name, values)); + return this; + } + + SearchHitBuilder setSource(String sourceJson) { + hit.sourceRef(new BytesArray(sourceJson)); + return this; + } + + SearchHit build() { + if (!fields.isEmpty()) { + hit.fields(fields); + } + return hit; + } + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFieldsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFieldsTests.java new file mode 100644 index 00000000000..98861c4b3e1 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ExtractedFieldsTests.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.scheduler.extractor.scroll; + +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; + +public class ExtractedFieldsTests extends ESTestCase { + + private ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE); + + public void testInvalidConstruction() { + expectThrows(IllegalArgumentException.class, () -> new ExtractedFields(timeField, Collections.emptyList())); + } + + public void testTimeFieldOnly() { + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); + + assertThat(extractedFields.getAllFields(), equalTo(Arrays.asList(timeField))); + assertThat(extractedFields.timeField(), equalTo("time")); + assertThat(extractedFields.getDocValueFields(), equalTo(new String[] { timeField.getName() })); + assertThat(extractedFields.getSourceFields().length, equalTo(0)); + } + + public void testAllTypesOfFields() { + ExtractedField docValue1 = ExtractedField.newField("doc1", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField docValue2 = ExtractedField.newField("doc2", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField scriptField1 = ExtractedField.newField("scripted1", ExtractedField.ExtractionMethod.SCRIPT_FIELD); + ExtractedField scriptField2 = ExtractedField.newField("scripted2", ExtractedField.ExtractionMethod.SCRIPT_FIELD); + ExtractedField sourceField1 = ExtractedField.newField("src1", ExtractedField.ExtractionMethod.SOURCE); + ExtractedField sourceField2 = ExtractedField.newField("src2", ExtractedField.ExtractionMethod.SOURCE); + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField, + docValue1, docValue2, scriptField1, scriptField2, sourceField1, sourceField2)); + + assertThat(extractedFields.getAllFields().size(), equalTo(7)); + assertThat(extractedFields.timeField(), equalTo("time")); + assertThat(extractedFields.getDocValueFields(), equalTo(new String[] {"time", "doc1", "doc2"})); + assertThat(extractedFields.getSourceFields(), equalTo(new String[] {"src1", "src2"})); + } + + public void testTimeFieldValue() { + SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(1).addField("time", 1000L).build(); + + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); + + assertThat(extractedFields.timeFieldValue(hit), equalTo(1000L)); + } + + public void testTimeFieldValueGivenEmptyArray() { + SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(1).addField("time", Collections.emptyList()).build(); + + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); + + expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit)); + } + + public void testTimeFieldValueGivenValueHasTwoElements() { + SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(1).addField("time", Arrays.asList(1L, 2L)).build(); + + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); + + expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit)); + } + + public void testTimeFieldValueGivenValueIsString() { + SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(1).addField("time", "a string").build(); + + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); + + expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit)); + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorTests.java index 4bfb2e4824c..f8451c8bb76 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractorTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHitField; @@ -48,12 +47,10 @@ public class ScrollDataExtractorTests extends ESTestCase { private List capturedContinueScrollIds; private List capturedClearScrollIds; private String jobId; - private List jobFields; - private String timeField; + private ExtractedFields extractedFields; private List types; private List indexes; private QueryBuilder query; - private AggregatorFactories.Builder aggregations; private List scriptFields; private int scrollSize; @@ -93,13 +90,13 @@ public class ScrollDataExtractorTests extends ESTestCase { capturedSearchRequests = new ArrayList<>(); capturedContinueScrollIds = new ArrayList<>(); capturedClearScrollIds = new ArrayList<>(); - timeField = "time"; jobId = "test-job"; - jobFields = Arrays.asList(timeField, "field_1"); + ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE); + extractedFields = new ExtractedFields(timeField, + Arrays.asList(timeField, ExtractedField.newField("field_1", ExtractedField.ExtractionMethod.DOC_VALUE))); indexes = Arrays.asList("index-1", "index-2"); types = Arrays.asList("type-1", "type-2"); query = QueryBuilders.matchAllQuery(); - aggregations = null; scriptFields = Collections.emptyList(); scrollSize = 1000; } @@ -253,8 +250,7 @@ public class ScrollDataExtractorTests extends ESTestCase { } private ScrollDataExtractorContext createContext(long start, long end) { - return new ScrollDataExtractorContext(jobId, jobFields, timeField, indexes, types, query, aggregations, scriptFields, scrollSize, - start, end); + return new ScrollDataExtractorContext(jobId, extractedFields, indexes, types, query, scriptFields, scrollSize, start, end); } private SearchResponse createEmptySearchResponse() { @@ -270,7 +266,7 @@ public class ScrollDataExtractorTests extends ESTestCase { for (int i = 0; i < timestamps.size(); i++) { InternalSearchHit hit = new InternalSearchHit(randomInt()); Map fields = new HashMap<>(); - fields.put(timeField, new InternalSearchHitField("time", Arrays.asList(timestamps.get(i)))); + fields.put(extractedFields.timeField(), new InternalSearchHitField("time", Arrays.asList(timestamps.get(i)))); fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(field1Values.get(i)))); fields.put("field_2", new InternalSearchHitField("field_2", Arrays.asList(field2Values.get(i)))); hit.fields(fields); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/SearchHitToJsonProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/SearchHitToJsonProcessorTests.java new file mode 100644 index 00000000000..d805129df32 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/SearchHitToJsonProcessorTests.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.scheduler.extractor.scroll; + +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.test.ESTestCase; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.hamcrest.Matchers.equalTo; + +public class SearchHitToJsonProcessorTests extends ESTestCase { + + public void testProcessGivenSingleHit() throws IOException { + ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField missingField = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField singleField = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField arrayField = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField, missingField, singleField, arrayField)); + + SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(8) + .addField("time", 1000L) + .addField("single", "a") + .addField("array", Arrays.asList("b", "c")) + .build(); + + String json = searchHitToString(extractedFields, hit); + + assertThat(json, equalTo("{\"time\":1000,\"single\":\"a\",\"array\":[\"b\",\"c\"]}")); + } + + public void testProcessGivenMultipleHits() throws IOException { + ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField missingField = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField singleField = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField arrayField = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField, missingField, singleField, arrayField)); + + SearchHit hit1 = new ExtractedFieldTests.SearchHitBuilder(8) + .addField("time", 1000L) + .addField("single", "a1") + .addField("array", Arrays.asList("b1", "c1")) + .build(); + + SearchHit hit2 = new ExtractedFieldTests.SearchHitBuilder(8) + .addField("time", 2000L) + .addField("single", "a2") + .addField("array", Arrays.asList("b2", "c2")) + .build(); + + String json = searchHitToString(extractedFields, hit1, hit2); + + assertThat(json, equalTo("{\"time\":1000,\"single\":\"a1\",\"array\":[\"b1\",\"c1\"]} " + + "{\"time\":2000,\"single\":\"a2\",\"array\":[\"b2\",\"c2\"]}")); + } + + private String searchHitToString(ExtractedFields fields, SearchHit... searchHits) throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(fields, outputStream)) { + for (int i = 0; i < searchHits.length; i++) { + hitProcessor.process(searchHits[i]); + } + } + return outputStream.toString(StandardCharsets.UTF_8.name()); + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransformTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransformTests.java index 2176d51470c..0678a5ff2d4 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransformTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/transforms/date/DateFormatTransformTests.java @@ -5,19 +5,16 @@ */ package org.elasticsearch.xpack.ml.transforms.date; -import static org.elasticsearch.xpack.ml.transforms.TransformTestUtils.createIndexArray; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.transforms.Transform.TransformIndex; +import org.elasticsearch.xpack.ml.transforms.TransformException; -import static org.mockito.Mockito.mock; - -import java.time.ZoneOffset; import java.util.Collections; import java.util.List; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.test.ESTestCase; - -import org.elasticsearch.xpack.ml.transforms.Transform.TransformIndex; -import org.elasticsearch.xpack.ml.transforms.TransformException; +import static org.elasticsearch.xpack.ml.transforms.TransformTestUtils.createIndexArray; +import static org.mockito.Mockito.mock; public class DateFormatTransformTests extends ESTestCase { @@ -26,7 +23,7 @@ public class DateFormatTransformTests extends ESTestCase { List writeIndexes = createIndexArray(new TransformIndex(2, 0)); DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss.SSSXXX", - ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class)); + readIndexes, writeIndexes, mock(Logger.class)); String[] input = {"2014-01-01 13:42:56.500Z"}; String[] scratch = {}; @@ -42,8 +39,7 @@ public class DateFormatTransformTests extends ESTestCase { public void testTransform_GivenInvalidFormat() throws TransformException { IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, - () -> new DateFormatTransform("yyyy-MM HH:mm:ss", ZoneOffset.systemDefault(), - Collections.emptyList(), Collections.emptyList(), mock(Logger.class))); + () -> new DateFormatTransform("yyyy-MM HH:mm:ss", Collections.emptyList(), Collections.emptyList(), mock(Logger.class))); assertEquals("Timestamp cannot be derived from pattern: yyyy-MM HH:mm:ss", e.getMessage()); @@ -54,8 +50,7 @@ public class DateFormatTransformTests extends ESTestCase { List readIndexes = createIndexArray(new TransformIndex(0, 0)); List writeIndexes = createIndexArray(new TransformIndex(2, 0)); - DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss", - ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class)); + DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss", readIndexes, writeIndexes, mock(Logger.class)); String[] input = {"invalid"}; String[] scratch = {}; @@ -73,8 +68,7 @@ public class DateFormatTransformTests extends ESTestCase { List readIndexes = createIndexArray(new TransformIndex(1, 0)); List writeIndexes = createIndexArray(new TransformIndex(2, 0)); - DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss", - ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class)); + DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss", readIndexes, writeIndexes, mock(Logger.class)); String[] input = {}; String[] scratch = {null}; @@ -90,15 +84,14 @@ public class DateFormatTransformTests extends ESTestCase { List writeIndexes = createIndexArray(new TransformIndex(2, 0)); ESTestCase.expectThrows(IllegalArgumentException.class, - () -> new DateFormatTransform("e", ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class))); + () -> new DateFormatTransform("e", readIndexes, writeIndexes, mock(Logger.class))); } public void testTransform_FromScratchArea() throws TransformException { List readIndexes = createIndexArray(new TransformIndex(1, 0)); List writeIndexes = createIndexArray(new TransformIndex(2, 0)); - DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ssXXX", - ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class)); + DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ssXXX", readIndexes, writeIndexes, mock(Logger.class)); String[] input = {}; String[] scratch = {"2014-01-01 00:00:00Z"}; @@ -116,7 +109,7 @@ public class DateFormatTransformTests extends ESTestCase { List writeIndexes = createIndexArray(new TransformIndex(2, 0)); DateFormatTransform transformer = new DateFormatTransform("'['yyyy-MM-dd HH:mm:ssX']'", - ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class)); + readIndexes, writeIndexes, mock(Logger.class)); String[] input = {"[2014-06-23 00:00:00Z]"}; String[] scratch = {}; diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers.yaml index 40fd17db5ba..c9d58a2d5c8 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers.yaml @@ -10,7 +10,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"ELASTICSEARCH", + "format":"JSON", "time_field":"time", "time_format":"epoch" } @@ -27,9 +27,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"ELASTICSEARCH", - "time_field":"time", - "time_format":"epoch" + "time_field":"time" } } diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers_stats.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers_stats.yaml index 87755dcc048..224605d526e 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers_stats.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/get_schedulers_stats.yaml @@ -10,7 +10,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"ELASTICSEARCH", + "format":"JSON", "time_field":"time", "time_format":"epoch" } @@ -27,9 +27,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"ELASTICSEARCH", - "time_field":"time", - "time_format":"epoch" + "time_field":"time" } } diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml index 6fd7b973575..e0475d53ab9 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml @@ -195,7 +195,7 @@ "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format":"ELASTICSEARCH", + "format":"JSON", "time_field":"time", "time_format":"yyyy-MM-dd HH:mm:ssX" } diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml index 508764211bc..06b0a617149 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml @@ -33,7 +33,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format" : "ELASTICSEARCH", + "format" : "JSON", "time_field":"time", "time_format":"yyyy-MM-dd'T'HH:mm:ssX" } diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/schedulers_crud.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/schedulers_crud.yaml index 40f1fd0c1c3..3ab3d518364 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/schedulers_crud.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/schedulers_crud.yaml @@ -10,7 +10,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"ELASTICSEARCH", + "format":"JSON", "time_field":"time", "time_format":"epoch" } @@ -26,9 +26,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"ELASTICSEARCH", - "time_field":"time", - "time_format":"epoch" + "time_field":"time" } } diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/start_stop_scheduler.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/start_stop_scheduler.yaml index 51dc41aa2e5..5aa57256d18 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/start_stop_scheduler.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/start_stop_scheduler.yaml @@ -21,7 +21,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format":"ELASTICSEARCH", + "format":"JSON", "time_field":"time", "time_format":"epoch" }