Improve field extraction in scheduler (elastic/elasticsearch#748)

This commit performs the following improvements:

- the time field is always requested as doc_value. This makes
specifying a time format for scheduled jobs unnecessary.
- adds DataDescription as a param to the PostDataAction. When set,
it overrides the job's DataDescription. This allows the scheduler to
override the job's DataDescription since it knows the data format (JSON)
and the time format (epoch_ms). This is not exposed in the REST API to
discourage users from using it.
- by default, data extractor search now requests doc_values for analysis fields. This is
expected to result in increased performance.
- a `_source` field is added to the scheduler config. This needs to be
set to true when one or more of the analysis fields do not have
doc_values.
- the ELASTICSEARCH data format is removed as is now redundant.
- fixes the usage of `script_fields`. Previously, setting
`script_fields` would result to none of the source to be returned. Thus,
is the analysis fields were a mixture of script and non-script fields it
would not work.
- ensures nested fields are handled properly

Closes elastic/elasticsearch#679, Closes elastic/elasticsearch#267 

Original commit: elastic/x-pack-elasticsearch@fed35ed354
This commit is contained in:
Dimitris Athanasiou 2017-01-18 18:46:43 +00:00 committed by GitHub
parent 4c0d2a492d
commit c33f26976d
48 changed files with 931 additions and 528 deletions

View File

@ -31,6 +31,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.job.DataCounts;
import org.elasticsearch.xpack.ml.job.DataDescription;
import org.elasticsearch.xpack.ml.job.Job;
import org.elasticsearch.xpack.ml.job.manager.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
@ -39,6 +40,7 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
public class PostDataAction extends Action<PostDataAction.Request, PostDataAction.Response, PostDataAction.RequestBuilder> {
@ -148,6 +150,7 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
private boolean ignoreDowntime = false;
private String resetStart = "";
private String resetEnd = "";
private DataDescription dataDescription;
private BytesReference content;
Request() {
@ -186,6 +189,14 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
this.resetEnd = resetEnd;
}
public DataDescription getDataDescription() {
return dataDescription;
}
public void setDataDescription(DataDescription dataDescription) {
this.dataDescription = dataDescription;
}
public BytesReference getContent() { return content; }
public void setContent(BytesReference content) {
@ -209,6 +220,7 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
ignoreDowntime = in.readBoolean();
resetStart = in.readOptionalString();
resetEnd = in.readOptionalString();
dataDescription = in.readOptionalWriteable(DataDescription::new);
content = in.readBytesReference();
}
@ -219,13 +231,14 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
out.writeBoolean(ignoreDowntime);
out.writeOptionalString(resetStart);
out.writeOptionalString(resetEnd);
out.writeOptionalWriteable(dataDescription);
out.writeBytesReference(content);
}
@Override
public int hashCode() {
// content stream not included
return Objects.hash(jobId, ignoreDowntime, resetStart, resetEnd);
return Objects.hash(jobId, ignoreDowntime, resetStart, resetEnd, dataDescription);
}
@Override
@ -242,7 +255,8 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
return Objects.equals(jobId, other.jobId) &&
Objects.equals(ignoreDowntime, other.ignoreDowntime) &&
Objects.equals(resetStart, other.resetStart) &&
Objects.equals(resetEnd, other.resetEnd);
Objects.equals(resetEnd, other.resetEnd) &&
Objects.equals(dataDescription, other.dataDescription);
}
}
@ -263,7 +277,8 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
PostDataTask postDataTask = (PostDataTask) task;
TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build();
DataLoadParams params = new DataLoadParams(timeRange, request.isIgnoreDowntime());
DataLoadParams params = new DataLoadParams(timeRange, request.isIgnoreDowntime(),
Optional.ofNullable(request.getDataDescription()));
threadPool.executor(MlPlugin.THREAD_POOL_NAME).execute(() -> {
try {
DataCounts dataCounts = processManager.processData(request.getJobId(), request.content.streamInput(), params,

View File

@ -459,7 +459,7 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
this.detectors = detectors;
}
Builder(AnalysisConfig analysisConfig) {
public Builder(AnalysisConfig analysisConfig) {
this.detectors = analysisConfig.detectors;
this.bucketSpan = analysisConfig.bucketSpan;
this.batchSpan = analysisConfig.batchSpan;

View File

@ -41,9 +41,7 @@ public class DataDescription extends ToXContentToBytes implements Writeable {
public enum DataFormat implements Writeable {
JSON("json"),
DELIMITED("delimited"),
SINGLE_LINE("single_line"),
// TODO norelease, this can now be removed
ELASTICSEARCH("elasticsearch");
SINGLE_LINE("single_line");
/**
* Delimited used to be called delineated. We keep supporting that for backwards

View File

@ -196,8 +196,6 @@ public final class Messages {
public static final String SCHEDULER_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "scheduler.does.not.support.job.with.latency";
public static final String SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD =
"scheduler.aggregations.requires.job.with.summary.count.field";
public static final String SCHEDULER_REQUIRES_JOB_WITH_DATAFORMAT_ELASTICSEARCH =
"scheduler.requires.job.with.dataformat.elasticsearch";
public static final String SCHEDULER_CANNOT_START = "scheduler.cannot.start";
public static final String SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE = "scheduler.cannot.stop.in.current.state";

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ml.job.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.DataCounts;
import org.elasticsearch.xpack.ml.job.DataDescription;
import org.elasticsearch.xpack.ml.job.Job;
import org.elasticsearch.xpack.ml.job.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.messages.Messages;
@ -42,10 +43,9 @@ public class AutodetectCommunicator implements Closeable {
private static final Logger LOGGER = Loggers.getLogger(AutodetectCommunicator.class);
private static final int DEFAULT_TRY_TIMEOUT_SECS = 30;
private final String jobId;
private final Job job;
private final StatusReporter statusReporter;
private final AutodetectProcess autodetectProcess;
private final DataToProcessWriter autoDetectWriter;
private final AutoDetectResultProcessor autoDetectResultProcessor;
final AtomicReference<CountDownLatch> inUse = new AtomicReference<>();
@ -53,7 +53,7 @@ public class AutodetectCommunicator implements Closeable {
public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process,
StatusReporter statusReporter, AutoDetectResultProcessor autoDetectResultProcessor,
StateProcessor stateProcessor) {
this.jobId = job.getId();
this.job = job;
this.autodetectProcess = process;
this.statusReporter = statusReporter;
this.autoDetectResultProcessor = autoDetectResultProcessor;
@ -61,29 +61,30 @@ public class AutodetectCommunicator implements Closeable {
AnalysisConfig analysisConfig = job.getAnalysisConfig();
boolean usePerPartitionNormalization = analysisConfig.getUsePerPartitionNormalization();
autoDetectExecutor.execute(() ->
autoDetectResultProcessor.process(jobId, process.getProcessOutStream(), usePerPartitionNormalization)
autoDetectResultProcessor.process(job.getId(), process.getProcessOutStream(), usePerPartitionNormalization)
);
autoDetectExecutor.execute(() ->
stateProcessor.process(jobId, process.getPersistStream())
stateProcessor.process(job.getId(), process.getPersistStream())
);
this.autoDetectWriter = createProcessWriter(job, process, statusReporter);
}
private DataToProcessWriter createProcessWriter(Job job, AutodetectProcess process, StatusReporter statusReporter) {
return DataToProcessWriterFactory.create(true, process, job.getDataDescription(), job.getAnalysisConfig(),
new TransformConfigs(job.getTransforms()) , statusReporter, LOGGER);
}
public void writeJobInputHeader() throws IOException {
autoDetectWriter.writeHeader();
createProcessWriter(Optional.empty()).writeHeader();
}
private DataToProcessWriter createProcessWriter(Optional<DataDescription> dataDescription) {
return DataToProcessWriterFactory.create(true, autodetectProcess, dataDescription.orElse(job.getDataDescription()),
job.getAnalysisConfig(), new TransformConfigs(job.getTransforms()) , statusReporter, LOGGER);
}
public DataCounts writeToJob(InputStream inputStream, DataLoadParams params, Supplier<Boolean> cancelled) throws IOException {
return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, jobId), () -> {
return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, job.getId()), () -> {
if (params.isResettingBuckets()) {
autodetectProcess.writeResetBucketsControlMessage(params);
}
CountingInputStream countingStream = new CountingInputStream(inputStream, statusReporter);
DataToProcessWriter autoDetectWriter = createProcessWriter(params.getDataDescription());
DataCounts results = autoDetectWriter.write(countingStream, cancelled);
autoDetectWriter.flush();
return results;
@ -92,7 +93,7 @@ public class AutodetectCommunicator implements Closeable {
@Override
public void close() throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_CLOSE, jobId), () -> {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_CLOSE, job.getId()), () -> {
statusReporter.close();
autodetectProcess.close();
autoDetectResultProcessor.awaitCompletion();
@ -101,7 +102,7 @@ public class AutodetectCommunicator implements Closeable {
}
public void writeUpdateConfigMessage(String config) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, jobId), () -> {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> {
autodetectProcess.writeUpdateConfigMessage(config);
return null;
}, false);
@ -112,15 +113,15 @@ public class AutodetectCommunicator implements Closeable {
}
void flushJob(InterimResultsParams params, int tryTimeoutSecs) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_FLUSH, jobId), () -> {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_FLUSH, job.getId()), () -> {
String flushId = autodetectProcess.flushJob(params);
Duration timeout = Duration.ofSeconds(tryTimeoutSecs);
LOGGER.info("[{}] waiting for flush", jobId);
LOGGER.info("[{}] waiting for flush", job.getId());
boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, timeout);
LOGGER.info("[{}] isFlushComplete={}", jobId, isFlushComplete);
LOGGER.info("[{}] isFlushComplete={}", job.getId(), isFlushComplete);
if (!isFlushComplete) {
String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_TIMEOUT, jobId) + " " + autodetectProcess.readError();
String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_TIMEOUT, job.getId()) + " " + autodetectProcess.readError();
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg);
}
@ -138,7 +139,7 @@ public class AutodetectCommunicator implements Closeable {
private void checkProcessIsAlive() {
if (!autodetectProcess.isProcessAlive()) {
ParameterizedMessage message =
new ParameterizedMessage("[{}] Unexpected death of autodetect: {}", jobId, autodetectProcess.readError());
new ParameterizedMessage("[{}] Unexpected death of autodetect: {}", job.getId(), autodetectProcess.readError());
LOGGER.error(message);
throw ExceptionsHelper.serverError(message.getFormattedMessage());
}

View File

@ -5,19 +5,20 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.params;
import org.elasticsearch.xpack.ml.job.DataDescription;
import java.util.Objects;
import java.util.Optional;
public class DataLoadParams {
private final TimeRange resetTimeRange;
private final boolean ignoreDowntime;
private final Optional<DataDescription> dataDescription;
public DataLoadParams(TimeRange resetTimeRange) {
this(resetTimeRange, false);
}
public DataLoadParams(TimeRange resetTimeRange, boolean ignoreDowntime) {
public DataLoadParams(TimeRange resetTimeRange, boolean ignoreDowntime, Optional<DataDescription> dataDescription) {
this.resetTimeRange = Objects.requireNonNull(resetTimeRange);
this.ignoreDowntime = ignoreDowntime;
this.dataDescription = Objects.requireNonNull(dataDescription);
}
public boolean isResettingBuckets() {
@ -35,5 +36,9 @@ public class DataLoadParams {
public boolean isIgnoreDowntime() {
return ignoreDowntime;
}
public Optional<DataDescription> getDataDescription() {
return dataDescription;
}
}

View File

@ -5,9 +5,25 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xpack.ml.job.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.DataDescription;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.status.StatusReporter;
import org.elasticsearch.xpack.ml.job.transform.TransformConfig;
import org.elasticsearch.xpack.ml.job.transform.TransformConfigs;
import org.elasticsearch.xpack.ml.transforms.DependencySorter;
import org.elasticsearch.xpack.ml.transforms.Transform;
import org.elasticsearch.xpack.ml.transforms.Transform.TransformIndex;
import org.elasticsearch.xpack.ml.transforms.Transform.TransformResult;
import org.elasticsearch.xpack.ml.transforms.TransformException;
import org.elasticsearch.xpack.ml.transforms.TransformFactory;
import org.elasticsearch.xpack.ml.transforms.date.DateFormatTransform;
import org.elasticsearch.xpack.ml.transforms.date.DateTransform;
import org.elasticsearch.xpack.ml.transforms.date.DoubleDateTransform;
import java.io.IOException;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -23,26 +39,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xpack.ml.job.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.DataDescription;
import org.elasticsearch.xpack.ml.job.DataDescription.DataFormat;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.status.StatusReporter;
import org.elasticsearch.xpack.ml.job.transform.TransformConfig;
import org.elasticsearch.xpack.ml.job.transform.TransformConfigs;
import org.elasticsearch.xpack.ml.transforms.DependencySorter;
import org.elasticsearch.xpack.ml.transforms.Transform;
import org.elasticsearch.xpack.ml.transforms.Transform.TransformIndex;
import org.elasticsearch.xpack.ml.transforms.Transform.TransformResult;
import org.elasticsearch.xpack.ml.transforms.TransformException;
import org.elasticsearch.xpack.ml.transforms.TransformFactory;
import org.elasticsearch.xpack.ml.transforms.date.DateFormatTransform;
import org.elasticsearch.xpack.ml.transforms.date.DateTransform;
import org.elasticsearch.xpack.ml.transforms.date.DoubleDateTransform;
public abstract class AbstractDataToProcessWriter implements DataToProcessWriter {
protected static final int TIME_FIELD_OUT_INDEX = 0;
@ -175,9 +171,6 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
}
protected void buildDateTransform(Map<String, Integer> scratchAreaIndexes, Map<String, Integer> outFieldIndexes) {
boolean isDateFormatString = dataDescription.isTransformTime()
&& !dataDescription.isEpochMs();
List<TransformIndex> readIndexes = new ArrayList<>();
Integer index = inFieldIndexes.get(dataDescription.getTimeField());
@ -202,15 +195,11 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
writeIndexes.add(new TransformIndex(TransformFactory.OUTPUT_ARRAY_INDEX,
outFieldIndexes.get(dataDescription.getTimeField())));
boolean isDateFormatString = dataDescription.isTransformTime() && !dataDescription.isEpochMs();
if (isDateFormatString) {
// Elasticsearch assumes UTC for dates without timezone information.
ZoneId defaultTimezone = dataDescription.getFormat() == DataFormat.ELASTICSEARCH
? ZoneOffset.UTC : ZoneOffset.systemDefault();
dateTransform = new DateFormatTransform(dataDescription.getTimeFormat(),
defaultTimezone, readIndexes, writeIndexes, logger);
dateTransform = new DateFormatTransform(dataDescription.getTimeFormat(), readIndexes, writeIndexes, logger);
} else {
dateTransform = new DoubleDateTransform(dataDescription.isEpochMs(),
readIndexes, writeIndexes, logger);
dateTransform = new DoubleDateTransform(dataDescription.isEpochMs(), readIndexes, writeIndexes, logger);
}
}

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.xpack.ml.job.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.DataDescription;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
@ -36,7 +35,6 @@ public final class DataToProcessWriterFactory {
TransformConfigs transforms, StatusReporter statusReporter, Logger logger) {
switch (dataDescription.getFormat()) {
case JSON:
case ELASTICSEARCH:
return new JsonDataToProcessWriter(includeControlField, autodetectProcess, dataDescription, analysisConfig,
transforms, statusReporter, logger);
case DELIMITED:

View File

@ -14,6 +14,7 @@ import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.job.DataCounts;
import org.elasticsearch.xpack.ml.job.DataDescription;
import org.elasticsearch.xpack.ml.job.audit.Auditor;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractor;
@ -22,6 +23,7 @@ import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractorFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -34,6 +36,7 @@ class ScheduledJob {
private final Auditor auditor;
private final String jobId;
private final DataDescription dataDescription;
private final long frequencyMs;
private final long queryDelayMs;
private final Client client;
@ -44,10 +47,11 @@ class ScheduledJob {
private volatile Long lastEndTimeMs;
private AtomicBoolean running = new AtomicBoolean(true);
ScheduledJob(String jobId, long frequencyMs, long queryDelayMs, DataExtractorFactory dataExtractorFactory,
Client client, Auditor auditor, Supplier<Long> currentTimeSupplier,
ScheduledJob(String jobId, DataDescription dataDescription, long frequencyMs, long queryDelayMs,
DataExtractorFactory dataExtractorFactory, Client client, Auditor auditor, Supplier<Long> currentTimeSupplier,
long latestFinalBucketEndTimeMs, long latestRecordTimeMs) {
this.jobId = jobId;
this.dataDescription = Objects.requireNonNull(dataDescription);
this.frequencyMs = frequencyMs;
this.queryDelayMs = queryDelayMs;
this.dataExtractorFactory = dataExtractorFactory;
@ -187,6 +191,7 @@ class ScheduledJob {
private DataCounts postData(InputStream inputStream) throws IOException, ExecutionException, InterruptedException {
PostDataAction.Request request = new PostDataAction.Request(jobId);
request.setDataDescription(dataDescription);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Streams.copy(inputStream, outputStream);
request.setContent(new BytesArray(outputStream.toByteArray()));

View File

@ -20,6 +20,7 @@ import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.StartSchedulerAction;
import org.elasticsearch.xpack.ml.action.UpdateSchedulerStatusAction;
import org.elasticsearch.xpack.ml.job.DataCounts;
import org.elasticsearch.xpack.ml.job.DataDescription;
import org.elasticsearch.xpack.ml.job.Job;
import org.elasticsearch.xpack.ml.job.JobStatus;
import org.elasticsearch.xpack.ml.job.audit.Auditor;
@ -189,7 +190,7 @@ public class ScheduledJobRunner extends AbstractComponent {
Duration frequency = getFrequencyOrDefault(scheduler, job);
Duration queryDelay = Duration.ofSeconds(scheduler.getConfig().getQueryDelay());
DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(scheduler.getConfig(), job);
ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(),
ScheduledJob scheduledJob = new ScheduledJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(),
dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs);
Holder holder = new Holder(scheduler, scheduledJob, new ProblemTracker(() -> auditor), handler);
task.setHolder(holder);
@ -200,6 +201,16 @@ public class ScheduledJobRunner extends AbstractComponent {
return new ScrollDataExtractorFactory(client, schedulerConfig, job);
}
private static DataDescription buildDataDescription(Job job) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.JSON);
if (job.getDataDescription() != null) {
dataDescription.setTimeField(job.getDataDescription().getTimeField());
}
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
return dataDescription.build();
}
private void gatherInformation(String jobId, BiConsumer<QueryPage<Bucket>, DataCounts> handler, Consumer<Exception> errorHandler) {
BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder()
.sortField(Bucket.TIMESTAMP.getPreferredName())

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.ml.scheduler;
import org.elasticsearch.xpack.ml.job.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.DataDescription;
import org.elasticsearch.xpack.ml.job.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
@ -28,9 +27,5 @@ public final class ScheduledJobValidator {
throw new IllegalArgumentException(
Messages.getMessage(Messages.SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, SchedulerConfig.DOC_COUNT));
}
DataDescription dataDescription = job.getDataDescription();
if (dataDescription == null || dataDescription.getFormat() != DataDescription.DataFormat.ELASTICSEARCH) {
throw new IllegalArgumentException(Messages.getMessage(Messages.SCHEDULER_REQUIRES_JOB_WITH_DATAFORMAT_ELASTICSEARCH));
}
}
}

View File

@ -62,6 +62,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
public static final ParseField AGGREGATIONS = new ParseField("aggregations");
public static final ParseField AGGS = new ParseField("aggs");
public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
public static final ParseField SOURCE = new ParseField("_source");
public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("scheduler_config", Builder::new);
@ -86,6 +87,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
return parsedScriptFields;
}, SCRIPT_FIELDS);
PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE);
PARSER.declareBoolean(Builder::setSource, SOURCE);
}
private final String id;
@ -107,10 +109,11 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
private final AggregatorFactories.Builder aggregations;
private final List<SearchSourceBuilder.ScriptField> scriptFields;
private final Integer scrollSize;
private final boolean source;
private SchedulerConfig(String id, String jobId, Long queryDelay, Long frequency, List<String> indexes, List<String> types,
QueryBuilder query, AggregatorFactories.Builder aggregations,
List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize) {
List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize, boolean source) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
@ -121,6 +124,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
this.aggregations = aggregations;
this.scriptFields = scriptFields;
this.scrollSize = scrollSize;
this.source = source;
}
public SchedulerConfig(StreamInput in) throws IOException {
@ -146,6 +150,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
this.scriptFields = null;
}
this.scrollSize = in.readOptionalVInt();
this.source = in.readBoolean();
}
public String getId() {
@ -171,7 +176,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
* @return The indexes to search, or <code>null</code> if not set.
*/
public List<String> getIndexes() {
return this.indexes;
return indexes;
}
/**
@ -181,11 +186,15 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
* @return The types to search, or <code>null</code> if not set.
*/
public List<String> getTypes() {
return this.types;
return types;
}
public Integer getScrollSize() {
return this.scrollSize;
return scrollSize;
}
public boolean isSource() {
return source;
}
public QueryBuilder getQuery() {
@ -227,6 +236,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
out.writeBoolean(false);
}
out.writeOptionalVInt(scrollSize);
out.writeBoolean(source);
}
@Override
@ -258,6 +268,9 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
builder.endObject();
}
builder.field(SCROLL_SIZE.getPreferredName(), scrollSize);
if (source) {
builder.field(SOURCE.getPreferredName(), source);
}
return builder;
}
@ -287,12 +300,13 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
&& Objects.equals(this.query, that.query)
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(this.aggregations, that.aggregations)
&& Objects.equals(this.scriptFields, that.scriptFields);
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.source, that.source);
}
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, aggregations, scriptFields);
return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, aggregations, scriptFields, source);
}
public static class Builder {
@ -310,6 +324,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
private AggregatorFactories.Builder aggregations;
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize = DEFAULT_SCROLL_SIZE;
private boolean source = false;
public Builder() {
}
@ -331,6 +346,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
this.aggregations = config.aggregations;
this.scriptFields = config.scriptFields;
this.scrollSize = config.scrollSize;
this.source = config.source;
}
public void setId(String schedulerId) {
@ -390,6 +406,10 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
this.scrollSize = scrollSize;
}
public void setSource(boolean enabled) {
this.source = enabled;
}
public SchedulerConfig build() {
ExceptionsHelper.requireNonNull(id, ID.getPreferredName());
ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
@ -402,7 +422,8 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
if (types == null || types.isEmpty() || types.contains(null) || types.contains("")) {
throw invalidOptionValue(TYPES.getPreferredName(), types);
}
return new SchedulerConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize);
return new SchedulerConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize,
source);
}
private static ElasticsearchException invalidOptionValue(String fieldName, Object value) {

View File

@ -1,53 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.scheduler.extractor;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public final class SearchHitFieldExtractor {
private SearchHitFieldExtractor() {}
public static Object[] extractField(SearchHit hit, String field) {
SearchHitField keyValue = hit.field(field);
if (keyValue != null) {
List<Object> values = keyValue.values();
return values.toArray(new Object[values.size()]);
} else {
return extractFieldFromSource(hit.getSource(), field);
}
}
private static Object[] extractFieldFromSource(Map<String, Object> source, String field) {
if (source != null) {
Object values = source.get(field);
if (values != null) {
if (values instanceof Object[]) {
return (Object[]) values;
} else {
return new Object[]{values};
}
}
}
return new Object[0];
}
public static Long extractTimeField(SearchHit hit, String timeField) {
Object[] fields = extractField(hit, timeField);
if (fields.length != 1) {
throw new RuntimeException("Time field [" + timeField + "] expected a single value; actual was: " + Arrays.toString(fields));
}
if (fields[0] instanceof Long) {
return (Long) fields[0];
}
throw new RuntimeException("Time field [" + timeField + "] expected a long value; actual was: " + fields[0]);
}
}

View File

@ -0,0 +1,109 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.scheduler.extractor.scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import java.util.List;
import java.util.Map;
import java.util.Objects;
abstract class ExtractedField {
public enum ExtractionMethod {
SOURCE, DOC_VALUE, SCRIPT_FIELD
}
protected final String name;
private final ExtractionMethod extractionMethod;
protected ExtractedField(String name, ExtractionMethod extractionMethod) {
this.name = Objects.requireNonNull(name);
this.extractionMethod = Objects.requireNonNull(extractionMethod);
}
public String getName() {
return name;
}
public ExtractionMethod getExtractionMethod() {
return extractionMethod;
}
public abstract Object[] value(SearchHit hit);
public static ExtractedField newField(String name, ExtractionMethod extractionMethod) {
switch (extractionMethod) {
case DOC_VALUE:
case SCRIPT_FIELD:
return new FromFields(name, extractionMethod);
case SOURCE:
return new FromSource(name, extractionMethod);
default:
throw new IllegalArgumentException("Invalid extraction method [" + extractionMethod + "]");
}
}
private static class FromFields extends ExtractedField {
public FromFields(String name, ExtractionMethod extractionMethod) {
super(name, extractionMethod);
}
@Override
public Object[] value(SearchHit hit) {
SearchHitField keyValue = hit.field(name);
if (keyValue != null) {
List<Object> values = keyValue.values();
return values.toArray(new Object[values.size()]);
}
return new Object[0];
}
}
private static class FromSource extends ExtractedField {
private String[] namePath;
public FromSource(String name, ExtractionMethod extractionMethod) {
super(name, extractionMethod);
namePath = name.split("\\.");
}
@Override
public Object[] value(SearchHit hit) {
Map<String, Object> source = hit.getSource();
int level = 0;
while (source != null && level < namePath.length - 1) {
source = getNextLevel(source, namePath[level]);
level++;
}
if (source != null) {
Object values = source.get(namePath[level]);
if (values != null) {
if (values instanceof List<?>) {
@SuppressWarnings("unchecked")
List<Object> asList = (List<Object>) values;
return asList.toArray(new Object[asList.size()]);
} else {
return new Object[]{values};
}
}
}
return new Object[0];
}
@SuppressWarnings("unchecked")
private static Map<String, Object> getNextLevel(Map<String, Object> source, String key) {
Object nextLevel = source.get(key);
if (nextLevel instanceof Map<?, ?>) {
return (Map<String, Object>) source.get(key);
}
return null;
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.scheduler.extractor.scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.ml.job.Job;
import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
class ExtractedFields {
private final ExtractedField timeField;
private final List<ExtractedField> allFields;
public ExtractedFields(ExtractedField timeField, List<ExtractedField> allFields) {
if (!allFields.contains(timeField)) {
throw new IllegalArgumentException("timeField should also be contained in allFields");
}
this.timeField = Objects.requireNonNull(timeField);
this.allFields = Collections.unmodifiableList(allFields);
}
public List<ExtractedField> getAllFields() {
return allFields;
}
public String[] getSourceFields() {
return filterFields(ExtractedField.ExtractionMethod.SOURCE);
}
public String[] getDocValueFields() {
return filterFields(ExtractedField.ExtractionMethod.DOC_VALUE);
}
private String[] filterFields(ExtractedField.ExtractionMethod method) {
List<String> result = new ArrayList<>();
for (ExtractedField field : allFields) {
if (field.getExtractionMethod() == method) {
result.add(field.getName());
}
}
return result.toArray(new String[result.size()]);
}
public String timeField() {
return timeField.getName();
}
public Long timeFieldValue(SearchHit hit) {
Object[] value = timeField.value(hit);
if (value.length != 1) {
throw new RuntimeException("Time field [" + timeField.getName() + "] expected a single value; actual was: "
+ Arrays.toString(value));
}
if (value[0] instanceof Long) {
return (Long) value[0];
}
throw new RuntimeException("Time field [" + timeField.getName() + "] expected a long value; actual was: " + value[0]);
}
public static ExtractedFields build(Job job, SchedulerConfig schedulerConfig) {
Set<String> scriptFields = schedulerConfig.getScriptFields().stream().map(sf -> sf.fieldName()).collect(Collectors.toSet());
String timeField = job.getDataDescription().getTimeField();
ExtractedField timeExtractedField = ExtractedField.newField(timeField, scriptFields.contains(timeField) ?
ExtractedField.ExtractionMethod.SCRIPT_FIELD : ExtractedField.ExtractionMethod.DOC_VALUE);
List<String> remainingFields = job.allFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList());
List<ExtractedField> allExtractedFields = new ArrayList<>(remainingFields.size());
allExtractedFields.add(timeExtractedField);
for (String field : remainingFields) {
ExtractedField.ExtractionMethod method = scriptFields.contains(field) ? ExtractedField.ExtractionMethod.SCRIPT_FIELD :
schedulerConfig.isSource() ? ExtractedField.ExtractionMethod.SOURCE : ExtractedField.ExtractionMethod.DOC_VALUE;
allExtractedFields.add(ExtractedField.newField(field, method));
}
return new ExtractedFields(timeExtractedField, allExtractedFields);
}
}

View File

@ -19,13 +19,9 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.scheduler.extractor.SearchHitFieldExtractor;
import org.elasticsearch.xpack.ml.scheduler.extractor.SearchHitToJsonProcessor;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -104,20 +100,22 @@ class ScrollDataExtractor implements DataExtractor {
private SearchRequestBuilder buildSearchRequest() {
SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client)
.setScroll(SCROLL_TIMEOUT)
.addSort(context.timeField, SortOrder.ASC)
.addSort(context.extractedFields.timeField(), SortOrder.ASC)
.setIndices(context.indexes)
.setTypes(context.types)
.setSize(context.scrollSize)
.setQuery(createQuery());
if (context.aggregations != null) {
searchRequestBuilder.setSize(0);
for (AggregationBuilder aggregationBuilder : context.aggregations.getAggregatorFactories()) {
searchRequestBuilder.addAggregation(aggregationBuilder);
}
for (PipelineAggregationBuilder pipelineAggregationBuilder : context.aggregations.getPipelineAggregatorFactories()) {
searchRequestBuilder.addAggregation(pipelineAggregationBuilder);
}
for (String docValueField : context.extractedFields.getDocValueFields()) {
searchRequestBuilder.addDocValueField(docValueField);
}
String[] sourceFields = context.extractedFields.getSourceFields();
if (sourceFields.length == 0) {
searchRequestBuilder.setFetchSource(false);
} else {
searchRequestBuilder.setFetchSource(sourceFields, null);
}
for (SearchSourceBuilder.ScriptField scriptField : context.scriptFields) {
searchRequestBuilder.addScriptField(scriptField.fieldName(), scriptField.script());
}
@ -132,10 +130,10 @@ class ScrollDataExtractor implements DataExtractor {
}
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(context.jobFields, outputStream)) {
try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(context.extractedFields, outputStream)) {
for (SearchHit hit : searchResponse.getHits().hits()) {
if (isCancelled) {
Long timestamp = SearchHitFieldExtractor.extractTimeField(hit, context.timeField);
Long timestamp = context.extractedFields.timeFieldValue(hit);
if (timestamp != null) {
if (timestampOnCancel == null) {
timestampOnCancel = timestamp;
@ -170,7 +168,10 @@ class ScrollDataExtractor implements DataExtractor {
private QueryBuilder createQuery() {
QueryBuilder userQuery = context.query;
QueryBuilder timeQuery = new RangeQueryBuilder(context.timeField).gte(context.start).lt(context.end).format("epoch_millis");
QueryBuilder timeQuery = new RangeQueryBuilder(context.extractedFields.timeField())
.gte(context.start)
.lt(context.end)
.format("epoch_millis");
return new BoolQueryBuilder().filter(userQuery).filter(timeQuery);
}

View File

@ -5,39 +5,32 @@
*/
package org.elasticsearch.xpack.ml.scheduler.extractor.scroll;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.util.List;
import java.util.Objects;
public class ScrollDataExtractorContext {
class ScrollDataExtractorContext {
final String jobId;
final String[] jobFields;
final String timeField;
final ExtractedFields extractedFields;
final String[] indexes;
final String[] types;
final QueryBuilder query;
@Nullable
final AggregatorFactories.Builder aggregations;
final List<SearchSourceBuilder.ScriptField> scriptFields;
final int scrollSize;
final long start;
final long end;
public ScrollDataExtractorContext(String jobId, List<String> jobFields, String timeField, List<String> indexes, List<String> types,
QueryBuilder query, @Nullable AggregatorFactories.Builder aggregations,
List<SearchSourceBuilder.ScriptField> scriptFields, int scrollSize, long start, long end) {
public ScrollDataExtractorContext(String jobId, ExtractedFields extractedFields, List<String> indexes, List<String> types,
QueryBuilder query, List<SearchSourceBuilder.ScriptField> scriptFields, int scrollSize,
long start, long end) {
this.jobId = Objects.requireNonNull(jobId);
this.jobFields = jobFields.toArray(new String[jobFields.size()]);
this.timeField = Objects.requireNonNull(timeField);
this.extractedFields = Objects.requireNonNull(extractedFields);
this.indexes = indexes.toArray(new String[indexes.size()]);
this.types = types.toArray(new String[types.size()]);
this.query = Objects.requireNonNull(query);
this.aggregations = aggregations;
this.scriptFields = Objects.requireNonNull(scriptFields);
this.scrollSize = scrollSize;
this.start = start;

View File

@ -18,24 +18,23 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
private final Client client;
private final SchedulerConfig schedulerConfig;
private final Job job;
private final ExtractedFields extractedFields;
public ScrollDataExtractorFactory(Client client, SchedulerConfig schedulerConfig, Job job) {
this.client = Objects.requireNonNull(client);
this.schedulerConfig = Objects.requireNonNull(schedulerConfig);
this.job = Objects.requireNonNull(job);
this.extractedFields = ExtractedFields.build(job, schedulerConfig);
}
@Override
public DataExtractor newExtractor(long start, long end) {
String timeField = job.getDataDescription().getTimeField();
ScrollDataExtractorContext dataExtractorContext = new ScrollDataExtractorContext(
job.getId(),
job.allFields(),
timeField,
extractedFields,
schedulerConfig.getIndexes(),
schedulerConfig.getTypes(),
schedulerConfig.getQuery(),
schedulerConfig.getAggregations(),
schedulerConfig.getScriptFields(),
schedulerConfig.getScrollSize(),
start,

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.scheduler.extractor;
package org.elasticsearch.xpack.ml.scheduler.extractor.scroll;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -12,21 +12,22 @@ import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;
public class SearchHitToJsonProcessor implements Releasable {
class SearchHitToJsonProcessor implements Releasable {
private final String[] fields;
private final ExtractedFields fields;
private final XContentBuilder jsonBuilder;
public SearchHitToJsonProcessor(String[] fields, OutputStream outputStream) throws IOException {
this.fields = fields;
jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream);
public SearchHitToJsonProcessor(ExtractedFields fields, OutputStream outputStream) throws IOException {
this.fields = Objects.requireNonNull(fields);
this.jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream);
}
public void process(SearchHit hit) throws IOException {
jsonBuilder.startObject();
for (String field : fields) {
writeKeyValue(field, SearchHitFieldExtractor.extractField(hit, field));
for (ExtractedField field : fields.getAllFields()) {
writeKeyValue(field.getName(), field.value(hit));
}
jsonBuilder.endObject();
}

View File

@ -5,17 +5,16 @@
*/
package org.elasticsearch.xpack.ml.transforms.date;
import java.time.ZoneId;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Locale;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.xpack.ml.transforms.TransformException;
import org.elasticsearch.xpack.ml.utils.time.DateTimeFormatterTimestampConverter;
import org.elasticsearch.xpack.ml.utils.time.TimestampConverter;
import java.time.ZoneOffset;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Locale;
/**
* A transform that attempts to parse a String timestamp
* according to a timeFormat. It converts that
@ -25,12 +24,11 @@ public class DateFormatTransform extends DateTransform {
private final String timeFormat;
private final TimestampConverter dateToEpochConverter;
public DateFormatTransform(String timeFormat, ZoneId defaultTimezone,
List<TransformIndex> readIndexes, List<TransformIndex> writeIndexes, Logger logger) {
public DateFormatTransform(String timeFormat, List<TransformIndex> readIndexes, List<TransformIndex> writeIndexes, Logger logger) {
super(readIndexes, writeIndexes, logger);
this.timeFormat = timeFormat;
dateToEpochConverter = DateTimeFormatterTimestampConverter.ofPattern(timeFormat, defaultTimezone);
dateToEpochConverter = DateTimeFormatterTimestampConverter.ofPattern(timeFormat, ZoneOffset.systemDefault());
}
@Override
@ -38,9 +36,7 @@ public class DateFormatTransform extends DateTransform {
try {
return dateToEpochConverter.toEpochMillis(field);
} catch (DateTimeParseException pe) {
String message = String.format(Locale.ROOT, "Cannot parse date '%s' with format string '%s'",
field, timeFormat);
String message = String.format(Locale.ROOT, "Cannot parse date '%s' with format string '%s'", field, timeFormat);
throw new ParseTimestampException(message);
}
}

View File

@ -144,7 +144,6 @@ scheduler.config.invalid.option.value = Invalid {0} value ''{1}'' in scheduler c
scheduler.does.not.support.job.with.latency = A job configured with scheduler cannot support latency
scheduler.aggregations.requires.job.with.summary.count.field = A job configured with a scheduler with aggregations must have summary_count_field_name ''{0}''
scheduler.requires.job.with.dataformat.elasticsearch = A job configured with a scheduler must have dataFormat ''ELASTICSEARCH''
job.data.concurrent.use.close = Cannot close job {0} while another connection {2}is {1} the job
job.data.concurrent.use.flush = Cannot flush job {0} while another connection {2}is {1} the job

View File

@ -97,7 +97,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
client().execute(StartSchedulerAction.INSTANCE, startSchedulerRequest).get();
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs + numDocs2));
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
MlMetadata mlMetadata = client().admin().cluster().prepareState().all().get()
@ -139,7 +139,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
t.start();
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs1));
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
});
@ -148,7 +148,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
indexDocs("data", numDocs2, now + 5000, now + 6000);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs1 + numDocs2));
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1 + numDocs2));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
}, 30, TimeUnit.SECONDS);
@ -182,8 +182,8 @@ public class ScheduledJobsIT extends ESIntegTestCase {
private Job.Builder createJob() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH);
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
dataDescription.setFormat(DataDescription.DataFormat.JSON);
dataDescription.setTimeFormat("yyyy-MM-dd HH:mm:ss");
Detector.Builder d = new Detector.Builder("count", null);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));

View File

@ -12,6 +12,7 @@ import org.elasticsearch.client.RestClient;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.ml.MlPlugin;
import org.junit.After;
import org.junit.Before;
import java.io.BufferedReader;
import java.io.IOException;
@ -25,35 +26,125 @@ import static org.hamcrest.Matchers.equalTo;
public class ScheduledJobIT extends ESRestTestCase {
public void testStartJobScheduler_GivenLookbackOnly() throws Exception {
String jobId = "job-1";
createAirlineDataIndex();
createJob(jobId);
String schedulerId = "sched-1";
createScheduler(schedulerId, jobId);
openJob(client(), jobId);
@Before
public void setUpData() throws Exception {
// Create index with source = enabled, doc_values = enabled, stored = false
String mappings = "{"
+ " \"mappings\": {"
+ " \"response\": {"
+ " \"properties\": {"
+ " \"time stamp\": { \"type\":\"date\"}," // space in 'time stamp' is intentional
+ " \"airline\": { \"type\":\"keyword\"},"
+ " \"responsetime\": { \"type\":\"float\"}"
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings));
Response startSchedulerRequest = client().performRequest("post",
MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z");
assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(startSchedulerRequest), containsString("{\"task\":\""));
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get",
MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
assertThat(responseEntityToString(getJobResponse), containsString("\"input_record_count\":2"));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
client().performRequest("put", "airline-data/response/1", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}"));
client().performRequest("put", "airline-data/response/2", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}"));
// Create index with source = enabled, doc_values = disabled (except time), stored = false
mappings = "{"
+ " \"mappings\": {"
+ " \"response\": {"
+ " \"properties\": {"
+ " \"time stamp\": { \"type\":\"date\"},"
+ " \"airline\": { \"type\":\"keyword\", \"doc_values\":false},"
+ " \"responsetime\": { \"type\":\"float\", \"doc_values\":false}"
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data-disabled-doc-values", Collections.emptyMap(), new StringEntity(mappings));
client().performRequest("put", "airline-data-disabled-doc-values/response/1", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}"));
client().performRequest("put", "airline-data-disabled-doc-values/response/2", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}"));
// Create index with source = disabled, doc_values = enabled (except time), stored = true
mappings = "{"
+ " \"mappings\": {"
+ " \"response\": {"
+ " \"_source\":{\"enabled\":false},"
+ " \"properties\": {"
+ " \"time stamp\": { \"type\":\"date\", \"store\":true},"
+ " \"airline\": { \"type\":\"keyword\", \"store\":true},"
+ " \"responsetime\": { \"type\":\"float\", \"store\":true}"
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data-disabled-source", Collections.emptyMap(), new StringEntity(mappings));
client().performRequest("put", "airline-data-disabled-source/response/1", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}"));
client().performRequest("put", "airline-data-disabled-source/response/2", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}"));
// Create index with nested documents
mappings = "{"
+ " \"mappings\": {"
+ " \"response\": {"
+ " \"properties\": {"
+ " \"time\": { \"type\":\"date\"}"
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", "nested-data", Collections.emptyMap(), new StringEntity(mappings));
client().performRequest("put", "nested-data/response/1", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\", \"responsetime\":{\"millis\":135.22}}"));
client().performRequest("put", "nested-data/response/2", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"responsetime\":{\"millis\":222.0}}"));
// Ensure all data is searchable
client().performRequest("post", "_refresh");
}
public void testStartJobScheduler_GivenRealtime() throws Exception {
String jobId = "job-2";
createAirlineDataIndex();
public void testLookbackOnly() throws Exception {
new LookbackOnlyTestHelper("lookback-1", "airline-data").setShouldSucceedProcessing(true).execute();
}
public void testLookbackOnlyWithSchedulerSourceEnabled() throws Exception {
new LookbackOnlyTestHelper("lookback-2", "airline-data").setEnableSchedulerSource(true).execute();
}
public void testLookbackOnlyWithDocValuesDisabledAndSchedulerSourceDisabled() throws Exception {
new LookbackOnlyTestHelper("lookback-3", "airline-data-disabled-doc-values").setShouldSucceedInput(false)
.setShouldSucceedProcessing(false).execute();
}
public void testLookbackOnlyWithDocValuesDisabledAndSchedulerSourceEnabled() throws Exception {
new LookbackOnlyTestHelper("lookback-4", "airline-data-disabled-doc-values").setEnableSchedulerSource(true).execute();
}
public void testLookbackOnlyWithSourceDisabled() throws Exception {
new LookbackOnlyTestHelper("lookback-5", "airline-data-disabled-source").execute();
}
public void testLookbackOnlyWithScriptFields() throws Exception {
new LookbackOnlyTestHelper("lookback-6", "airline-data-disabled-source").setAddScriptedFields(true).execute();
}
public void testLookbackOnlyWithNestedFieldsAndSchedulerSourceDisabled() throws Exception {
executeTestLookbackOnlyWithNestedFields("lookback-7", false);
}
public void testLookbackOnlyWithNestedFieldsAndSchedulerSourceEnabled() throws Exception {
executeTestLookbackOnlyWithNestedFields("lookback-8", true);
}
public void testRealtime() throws Exception {
String jobId = "job-realtime-1";
createJob(jobId);
String schedulerId = "sched-2";
createScheduler(schedulerId, jobId);
String schedulerId = jobId + "-scheduler";
createScheduler(schedulerId, jobId, "airline-data", false, false);
openJob(client(), jobId);
Response response = client().performRequest("post",
@ -65,7 +156,7 @@ public class ScheduledJobIT extends ESRestTestCase {
Response getJobResponse = client().performRequest("get",
MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String responseAsString = responseEntityToString(getJobResponse);
assertThat(responseAsString, containsString("\"input_record_count\":2"));
assertThat(responseAsString, containsString("\"processed_record_count\":2"));
} catch (Exception e1) {
throw new RuntimeException(e1);
}
@ -93,35 +184,98 @@ public class ScheduledJobIT extends ESRestTestCase {
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
}
private void createAirlineDataIndex() throws Exception {
String airlineDataMappings = "{" + " \"mappings\": {" + " \"response\": {" + " \"properties\": {"
+ " \"time\": { \"type\":\"date\"}," + " \"airline\": { \"type\":\"keyword\"},"
+ " \"responsetime\": { \"type\":\"float\"}" + " }" + " }" + " }" + "}";
client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(airlineDataMappings));
private class LookbackOnlyTestHelper {
private String jobId;
private String dataIndex;
private boolean addScriptedFields;
private boolean enableSchedulerSource;
private boolean shouldSucceedInput;
private boolean shouldSucceedProcessing;
client().performRequest("put", "airline-data/response/1", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}"));
client().performRequest("put", "airline-data/response/2", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}"));
public LookbackOnlyTestHelper(String jobId, String dataIndex) {
this.jobId = jobId;
this.dataIndex = dataIndex;
this.shouldSucceedInput = true;
this.shouldSucceedProcessing = true;
}
client().performRequest("post", "airline-data/_refresh");
public LookbackOnlyTestHelper setAddScriptedFields(boolean value) {
addScriptedFields = value;
return this;
}
public LookbackOnlyTestHelper setEnableSchedulerSource(boolean value) {
enableSchedulerSource = value;
return this;
}
public LookbackOnlyTestHelper setShouldSucceedInput(boolean value) {
shouldSucceedInput = value;
return this;
}
public LookbackOnlyTestHelper setShouldSucceedProcessing(boolean value) {
shouldSucceedProcessing = value;
return this;
}
public void execute() throws Exception {
createJob(jobId);
String schedulerId = "scheduler-" + jobId;
createScheduler(schedulerId, jobId, dataIndex, enableSchedulerSource, addScriptedFields);
openJob(client(), jobId);
startSchedulerAndWaitUntilStopped(schedulerId);
Response jobStatsResponse = client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
if (shouldSucceedInput) {
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));
} else {
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":0"));
}
if (shouldSucceedProcessing) {
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2"));
} else {
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":0"));
}
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
}
}
private void startSchedulerAndWaitUntilStopped(String schedulerId) throws Exception {
Response startSchedulerRequest = client().performRequest("post",
MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z");
assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(startSchedulerRequest), containsString("{\"task\":\""));
assertBusy(() -> {
try {
Response schedulerStatsResponse = client().performRequest("get",
MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_stats");
assertThat(responseEntityToString(schedulerStatsResponse), containsString("\"status\":\"STOPPED\""));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
private Response createJob(String id) throws Exception {
String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n"
+ " \"analysis_config\" : {\n" + " \"bucket_span\":3600,\n"
+ " \"detectors\" :[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n"
+ " },\n" + " \"data_description\" : {\n" + " \"format\":\"ELASTICSEARCH\",\n"
+ " \"time_field\":\"time\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n"
+ " },\n" + " \"data_description\" : {\n" + " \"format\":\"JSON\",\n"
+ " \"time_field\":\"time stamp\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n"
+ "}";
return client().performRequest("put", MlPlugin.BASE_PATH + "anomaly_detectors/" + id,
Collections.emptyMap(), new StringEntity(job));
}
private Response createScheduler(String schedulerId, String jobId) throws IOException {
String schedulerConfig = "{" + "\"job_id\": \"" + jobId + "\",\n" + "\"indexes\":[\"airline-data\"],\n"
+ "\"types\":[\"response\"]\n" + "}";
private Response createScheduler(String schedulerId, String jobId, String dataIndex, boolean source, boolean addScriptedFields)
throws IOException {
String schedulerConfig = "{" + "\"job_id\": \"" + jobId + "\",\n" + "\"indexes\":[\"" + dataIndex + "\"],\n"
+ "\"types\":[\"response\"]" + (source ? ",\"_source\":true" : "") + (addScriptedFields ?
",\"script_fields\":{\"airline\":{\"script\":{\"lang\":\"painless\",\"inline\":\"doc['airline'].value\"}}}" : "")
+"}";
return client().performRequest("put", MlPlugin.BASE_PATH + "schedulers/" + schedulerId, Collections.emptyMap(),
new StringEntity(schedulerConfig));
}
@ -137,6 +291,24 @@ public class ScheduledJobIT extends ESRestTestCase {
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
}
private void executeTestLookbackOnlyWithNestedFields(String jobId, boolean source) throws Exception {
String job = "{\"description\":\"Nested job\", \"analysis_config\" : {\"bucket_span\":3600,\"detectors\" :"
+ "[{\"function\":\"mean\",\"field_name\":\"responsetime.millis\"}]}, \"data_description\" : {\"time_field\":\"time\"}"
+ "}";
client().performRequest("put", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), new StringEntity(job));
String schedulerId = jobId + "-scheduler";
createScheduler(schedulerId, jobId, "nested-data", source, false);
openJob(client(), jobId);
startSchedulerAndWaitUntilStopped(schedulerId);
Response jobStatsResponse = client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
}
@After
public void clearMlState() throws Exception {
new MlRestTestStateCleaner(client(), this).clearMlMetadata();

View File

@ -34,7 +34,6 @@ public class DataFormatTests extends ESTestCase {
assertThat(DataFormat.JSON.ordinal(), equalTo(0));
assertThat(DataFormat.DELIMITED.ordinal(), equalTo(1));
assertThat(DataFormat.SINGLE_LINE.ordinal(), equalTo(2));
assertThat(DataFormat.ELASTICSEARCH.ordinal(), equalTo(3));
}
public void testwriteTo() throws Exception {
@ -58,13 +57,6 @@ public class DataFormatTests extends ESTestCase {
assertThat(in.readVInt(), equalTo(2));
}
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
DataFormat.ELASTICSEARCH.writeTo(out);
try (StreamInput in = out.bytes().streamInput()) {
assertThat(in.readVInt(), equalTo(3));
}
}
}
public void testReadFrom() throws Exception {
@ -86,12 +78,6 @@ public class DataFormatTests extends ESTestCase {
assertThat(DataFormat.readFromStream(in), equalTo(DataFormat.SINGLE_LINE));
}
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeVInt(3);
try (StreamInput in = out.bytes().streamInput()) {
assertThat(DataFormat.readFromStream(in), equalTo(DataFormat.ELASTICSEARCH));
}
}
}
public void testInvalidReadFrom() throws Exception {

View File

@ -44,6 +44,7 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -162,7 +163,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createManager(communicator);
assertEquals(0, manager.numberOfOpenJobs());
DataLoadParams params = new DataLoadParams(TimeRange.builder().build());
DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), false, Optional.empty());
manager.openJob("foo", false);
manager.processData("foo", createInputStream(""), params, () -> false);
assertEquals(1, manager.numberOfOpenJobs());
@ -202,7 +203,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createManager(communicator);
Supplier<Boolean> cancellable = () -> false;
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), true);
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), true, Optional.empty());
InputStream inputStream = createInputStream("");
manager.openJob("foo", false);
manager.processData("foo", inputStream, params, cancellable);

View File

@ -16,7 +16,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ml.job.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.DataDescription;
import org.elasticsearch.xpack.ml.job.Job;
import org.elasticsearch.xpack.ml.job.JobStatus;
import org.elasticsearch.xpack.ml.job.JobTests;
@ -43,7 +42,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
int numJobs = randomIntBetween(0, 10);
for (int i = 0; i < numJobs; i++) {
Job job = JobTests.createRandomizedJob();
if (job.getDataDescription() != null && job.getDataDescription().getFormat() == DataDescription.DataFormat.ELASTICSEARCH) {
if (randomBoolean()) {
SchedulerConfig schedulerConfig = SchedulerConfigTests.createRandomizedSchedulerConfig(job.getId());
if (schedulerConfig.getAggregations() != null) {
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job.getAnalysisConfig().getDetectors());
@ -238,9 +237,9 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
public void testPutScheduler_failBecauseJobIsNotCompatibleForScheduler() {
Job.Builder job1 = createScheduledJob();
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.DELIMITED);
job1.setDataDescription(dataDescription);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job1.build().getAnalysisConfig());
analysisConfig.setLatency(3600L);
job1.setAnalysisConfig(analysisConfig);
SchedulerConfig schedulerConfig1 = createSchedulerConfig("scheduler1", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1.build(), false);

View File

@ -24,6 +24,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -39,7 +40,7 @@ import static org.mockito.Mockito.when;
public class AutodetectCommunicatorTests extends ESTestCase {
public void testWriteResetBucketsControlMessage() throws IOException {
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build());
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build(), false, Optional.empty());
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) {
communicator.writeToJob(new ByteArrayInputStream(new byte[0]), params, () -> false);
@ -147,7 +148,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
() -> communicator.writeToJob(in, mock(DataLoadParams.class), () -> false));
communicator.inUse.set(null);
communicator.writeToJob(in, mock(DataLoadParams.class), () -> false);
communicator.writeToJob(in, new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()), () -> false);
}
public void testFlushInUse() throws IOException {

View File

@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Optional;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
@ -105,7 +106,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class),
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) {
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), true);
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), true, Optional.empty());
process.writeResetBucketsControlMessage(params);
process.flushStream();

View File

@ -7,19 +7,21 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.params;
import org.elasticsearch.test.ESTestCase;
import java.util.Optional;
public class DataLoadParamsTests extends ESTestCase {
public void testGetStart() {
assertEquals("", new DataLoadParams(TimeRange.builder().build()).getStart());
assertEquals("3", new DataLoadParams(TimeRange.builder().startTime("3").build()).getStart());
assertEquals("", new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()).getStart());
assertEquals("3", new DataLoadParams(TimeRange.builder().startTime("3").build(), false, Optional.empty()).getStart());
}
public void testGetEnd() {
assertEquals("", new DataLoadParams(TimeRange.builder().build()).getEnd());
assertEquals("1", new DataLoadParams(TimeRange.builder().endTime("1").build()).getEnd());
assertEquals("", new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()).getEnd());
assertEquals("1", new DataLoadParams(TimeRange.builder().endTime("1").build(), false, Optional.empty()).getEnd());
}
public void testIsResettingBuckets() {
assertFalse(new DataLoadParams(TimeRange.builder().build()).isResettingBuckets());
assertTrue(new DataLoadParams(TimeRange.builder().startTime("5").build()).isResettingBuckets());
assertFalse(new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()).isResettingBuckets());
assertTrue(new DataLoadParams(TimeRange.builder().startTime("5").build(), false, Optional.empty()).isResettingBuckets());
}
}

View File

@ -10,6 +10,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import java.io.IOException;
import java.util.Optional;
import java.util.stream.IntStream;
import org.elasticsearch.test.ESTestCase;
@ -127,7 +128,8 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
public void testWriteResetBucketsMessage() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
writer.writeResetBucketsMessage(new DataLoadParams(TimeRange.builder().startTime("0").endTime("600").build()));
writer.writeResetBucketsMessage(
new DataLoadParams(TimeRange.builder().startTime("0").endTime("600").build(), false, Optional.empty()));
InOrder inOrder = inOrder(lengthEncodedWriter);
inOrder.verify(lengthEncodedWriter).writeNumFields(4);

View File

@ -24,13 +24,6 @@ public class DataToProcessWriterFactoryTests extends ESTestCase {
assertTrue(createWriter(dataDescription.build()) instanceof JsonDataToProcessWriter);
}
public void testCreate_GivenDataFormatIsElasticsearch() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataFormat.ELASTICSEARCH);
assertTrue(createWriter(dataDescription.build()) instanceof JsonDataToProcessWriter);
}
public void testCreate_GivenDataFormatIsCsv() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataFormat.DELIMITED);

View File

@ -146,12 +146,21 @@ public class ScheduledJobRunnerTests extends ESTestCase {
verify(threadPool, times(1)).executor(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo")));
verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest(job)));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STARTED)), any());
verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STOPPED)), any());
}
private static PostDataAction.Request createExpectedPostDataRequest(Job job) {
DataDescription.Builder expectedDataDescription = new DataDescription.Builder();
expectedDataDescription.setTimeFormat("epoch_ms");
expectedDataDescription.setFormat(DataDescription.DataFormat.JSON);
PostDataAction.Request expectedPostDataRequest = new PostDataAction.Request(job.getId());
expectedPostDataRequest.setDataDescription(expectedDataDescription.build());
return expectedPostDataRequest;
}
public void testStart_extractionProblem() throws Exception {
Job.Builder jobBuilder = createScheduledJob();
SchedulerConfig schedulerConfig = createSchedulerConfig("scheduler1", "foo").build();
@ -213,7 +222,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
task.stop();
verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STOPPED)), any());
} else {
verify(client).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo")));
verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest(job)));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME), any());
}
@ -233,9 +242,6 @@ public class ScheduledJobRunnerTests extends ESTestCase {
Job.Builder builder = new Job.Builder("foo");
builder.setAnalysisConfig(acBuilder);
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH);
builder.setDataDescription(dataDescription);
return builder;
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.job.DataCounts;
import org.elasticsearch.xpack.ml.job.DataDescription;
import org.elasticsearch.xpack.ml.job.audit.Auditor;
import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.scheduler.extractor.DataExtractorFactory;
@ -40,6 +41,7 @@ public class ScheduledJobTests extends ESTestCase {
private DataExtractorFactory dataExtractorFactory;
private DataExtractor dataExtractor;
private Client client;
private DataDescription.Builder dataDescription;
private ActionFuture<FlushJobAction.Response> flushJobFuture;
private long currentTime;
@ -52,6 +54,8 @@ public class ScheduledJobTests extends ESTestCase {
dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
client = mock(Client.class);
dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.JSON);
ActionFuture<PostDataAction.Response> jobDataFuture = mock(ActionFuture.class);
flushJobFuture = mock(ActionFuture.class);
currentTime = 0;
@ -60,7 +64,10 @@ public class ScheduledJobTests extends ESTestCase {
InputStream inputStream = new ByteArrayInputStream("content".getBytes(StandardCharsets.UTF_8));
when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
when(client.execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("_job_id")))).thenReturn(jobDataFuture);
PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id");
expectedRequest.setDataDescription(dataDescription.build());
when(client.execute(same(PostDataAction.INSTANCE), eq(expectedRequest))).thenReturn(jobDataFuture);
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts));
}
@ -176,7 +183,7 @@ public class ScheduledJobTests extends ESTestCase {
private ScheduledJob createScheduledJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs,
long latestRecordTimeMs) {
Supplier<Long> currentTimeSupplier = () -> currentTime;
return new ScheduledJob("_job_id", frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor,
return new ScheduledJob("_job_id", dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor,
currentTimeSupplier, latestFinalBucketEndTimeMs, latestRecordTimeMs);
}

View File

@ -9,7 +9,6 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.DataDescription;
import org.elasticsearch.xpack.ml.job.Detector;
import org.elasticsearch.xpack.ml.job.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
@ -39,9 +38,6 @@ public class ScheduledJobValidatorTests extends ESTestCase {
public void testVerify_GivenZeroLatency() {
Job.Builder builder = buildJobBuilder("foo");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH);
builder.setDataDescription(dataDescription);
AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setBucketSpan(1800L);
ac.setLatency(0L);
@ -54,9 +50,6 @@ public class ScheduledJobValidatorTests extends ESTestCase {
public void testVerify_GivenNoLatency() {
Job.Builder builder = buildJobBuilder("foo");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH);
builder.setDataDescription(dataDescription);
AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setBatchSpan(1800L);
ac.setBucketSpan(100L);
@ -69,9 +62,6 @@ public class ScheduledJobValidatorTests extends ESTestCase {
public void testVerify_GivenAggsAndCorrectSummaryCountField() throws IOException {
Job.Builder builder = buildJobBuilder("foo");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH);
builder.setDataDescription(dataDescription);
AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setBucketSpan(1800L);
ac.setSummaryCountFieldName("doc_count");
@ -86,9 +76,6 @@ public class ScheduledJobValidatorTests extends ESTestCase {
String errorMessage = Messages.getMessage(Messages.SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD,
SchedulerConfig.DOC_COUNT);
Job.Builder builder = buildJobBuilder("foo");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH);
builder.setDataDescription(dataDescription);
AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setBucketSpan(1800L);
builder.setAnalysisConfig(ac);
@ -105,9 +92,6 @@ public class ScheduledJobValidatorTests extends ESTestCase {
String errorMessage = Messages.getMessage(
Messages.SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, SchedulerConfig.DOC_COUNT);
Job.Builder builder = buildJobBuilder("foo");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH);
builder.setDataDescription(dataDescription);
AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setBucketSpan(1800L);
ac.setSummaryCountFieldName("wrong");
@ -125,9 +109,7 @@ public class ScheduledJobValidatorTests extends ESTestCase {
Job.Builder builder = new Job.Builder(id);
builder.setCreateTime(new Date());
AnalysisConfig.Builder ac = createAnalysisConfig();
DataDescription.Builder dc = new DataDescription.Builder();
builder.setAnalysisConfig(ac);
builder.setDataDescription(dc);
return builder;
}

View File

@ -61,6 +61,9 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
if (randomBoolean()) {
builder.setQueryDelay(randomNonNegativeLong());
}
if (randomBoolean()) {
builder.setSource(randomBoolean());
}
return builder.build();
}

View File

@ -1,62 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.scheduler.extractor;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHitField;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class SearchHitFieldExtractorTests extends ESTestCase {
public void testExtractTimeFieldGivenHitContainsNothing() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
expectThrows(RuntimeException.class, () -> SearchHitFieldExtractor.extractTimeField(searchHit, "time"));
}
public void testExtractTimeFieldGivenSingleValueInFields() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
Map<String, SearchHitField> fields = new HashMap<>();
fields.put("time", new InternalSearchHitField("time", Arrays.asList(3L)));
searchHit.fields(fields);
assertThat(SearchHitFieldExtractor.extractTimeField(searchHit, "time"), equalTo(3L));
}
public void testExtractTimeFieldGivenSingleValueInSource() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
searchHit.sourceRef(new BytesArray("{\"time\":1482418307000}"));
assertThat(SearchHitFieldExtractor.extractTimeField(searchHit, "time"), equalTo(1482418307000L));
}
public void testExtractTimeFieldGivenArrayValue() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
Map<String, SearchHitField> fields = new HashMap<>();
fields.put("time", new InternalSearchHitField("time", Arrays.asList(3L, 5L)));
searchHit.fields(fields);
expectThrows(RuntimeException.class, () -> SearchHitFieldExtractor.extractTimeField(searchHit, "time"));
}
public void testExtractTimeFieldGivenSingleNonLongValue() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
Map<String, SearchHitField> fields = new HashMap<>();
fields.put("time", new InternalSearchHitField("time", Arrays.asList(3)));
searchHit.fields(fields);
expectThrows(RuntimeException.class, () -> SearchHitFieldExtractor.extractTimeField(searchHit, "time"));
}
}

View File

@ -1,122 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.scheduler.extractor;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHitField;
import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class SearchHitToJsonProcessorTests extends ESTestCase {
public void testProcessGivenHitContainsNothing() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
String json = searchHitToString(new String[] {"field_1", "field_2"}, searchHit);
assertThat(json, equalTo("{}"));
}
public void testProcessGivenHitContainsEmptySource() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
searchHit.sourceRef(new BytesArray("{}"));
String json = searchHitToString(new String[] {"field_1", "field_2"}, searchHit);
assertThat(json, equalTo("{}"));
}
public void testProcessGivenHitContainsSingleValueInFields() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
Map<String, SearchHitField> fields = new HashMap<>();
fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(3)));
searchHit.fields(fields);
String json = searchHitToString(new String[] {"field_1"}, searchHit);
assertThat(json, equalTo("{\"field_1\":3}"));
}
public void testProcessGivenHitContainsArrayValueInFields() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
Map<String, SearchHitField> fields = new HashMap<>();
fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(3, 9)));
searchHit.fields(fields);
String json = searchHitToString(new String[] {"field_1"}, searchHit);
assertThat(json, equalTo("{\"field_1\":[3,9]}"));
}
public void testProcessGivenHitContainsSingleValueInSource() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
String hitSource = "{\"field_1\":\"foo\"}";
searchHit.sourceRef(new BytesArray(hitSource));
String json = searchHitToString(new String[] {"field_1"}, searchHit);
assertThat(json, equalTo("{\"field_1\":\"foo\"}"));
}
public void testProcessGivenHitContainsArrayValueInSource() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
String hitSource = "{\"field_1\":[\"foo\",\"bar\"]}";
searchHit.sourceRef(new BytesArray(hitSource));
String json = searchHitToString(new String[] {"field_1"}, searchHit);
assertThat(json, equalTo("{\"field_1\":[\"foo\",\"bar\"]}"));
}
public void testProcessGivenHitContainsFieldsAndSource() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
String hitSource = "{\"field_1\":\"foo\"}";
searchHit.sourceRef(new BytesArray(hitSource));
Map<String, SearchHitField> fields = new HashMap<>();
fields.put("field_2", new InternalSearchHitField("field_2", Arrays.asList("bar")));
searchHit.fields(fields);
String json = searchHitToString(new String[] {"field_1", "field_2"}, searchHit);
assertThat(json, equalTo("{\"field_1\":\"foo\",\"field_2\":\"bar\"}"));
}
public void testProcessGivenMultipleHits() throws IOException {
InternalSearchHit searchHit1 = new InternalSearchHit(42);
Map<String, SearchHitField> fields = new HashMap<>();
fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(3)));
searchHit1.fields(fields);
InternalSearchHit searchHit2 = new InternalSearchHit(42);
fields = new HashMap<>();
fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(5)));
searchHit2.fields(fields);
String json = searchHitToString(new String[] {"field_1"}, searchHit1, searchHit2);
assertThat(json, equalTo("{\"field_1\":3} {\"field_1\":5}"));
}
private String searchHitToString(String[] fields, SearchHit... searchHits) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(fields, outputStream)) {
for (int i = 0; i < searchHits.length; i++) {
hitProcessor.process(searchHits[i]);
}
}
return outputStream.toString(StandardCharsets.UTF_8.name());
}
}

View File

@ -0,0 +1,130 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.scheduler.extractor.scroll;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHitField;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class ExtractedFieldTests extends ESTestCase {
public void testValueGivenDocValue() {
SearchHit hit = new SearchHitBuilder(42).addField("single", "bar").addField("array", Arrays.asList("a", "b")).build();
ExtractedField single = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE);
assertThat(single.value(hit), equalTo(new String[] { "bar" }));
ExtractedField array = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE);
assertThat(array.value(hit), equalTo(new String[] { "a", "b" }));
ExtractedField missing = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE);
assertThat(missing.value(hit), equalTo(new Object[0]));
}
public void testValueGivenScriptField() {
SearchHit hit = new SearchHitBuilder(42).addField("single", "bar").addField("array", Arrays.asList("a", "b")).build();
ExtractedField single = ExtractedField.newField("single", ExtractedField.ExtractionMethod.SCRIPT_FIELD);
assertThat(single.value(hit), equalTo(new String[] { "bar" }));
ExtractedField array = ExtractedField.newField("array", ExtractedField.ExtractionMethod.SCRIPT_FIELD);
assertThat(array.value(hit), equalTo(new String[] { "a", "b" }));
ExtractedField missing = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.SCRIPT_FIELD);
assertThat(missing.value(hit), equalTo(new Object[0]));
}
public void testValueGivenSource() {
SearchHit hit = new SearchHitBuilder(42).setSource("{\"single\":\"bar\",\"array\":[\"a\",\"b\"]}").build();
ExtractedField single = ExtractedField.newField("single", ExtractedField.ExtractionMethod.SOURCE);
assertThat(single.value(hit), equalTo(new String[] { "bar" }));
ExtractedField array = ExtractedField.newField("array", ExtractedField.ExtractionMethod.SOURCE);
assertThat(array.value(hit), equalTo(new String[] { "a", "b" }));
ExtractedField missing = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.SOURCE);
assertThat(missing.value(hit), equalTo(new Object[0]));
}
public void testValueGivenNestedSource() {
SearchHit hit = new SearchHitBuilder(42).setSource("{\"level_1\":{\"level_2\":{\"foo\":\"bar\"}}}").build();
ExtractedField nested = ExtractedField.newField("level_1.level_2.foo", ExtractedField.ExtractionMethod.SOURCE);
assertThat(nested.value(hit), equalTo(new String[] { "bar" }));
}
public void testValueGivenSourceAndHitWithNoSource() {
ExtractedField missing = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.SOURCE);
assertThat(missing.value(new SearchHitBuilder(3).build()), equalTo(new Object[0]));
}
public void testValueGivenMismatchingMethod() {
SearchHit hit = new SearchHitBuilder(42).addField("a", 1).setSource("{\"b\":2}").build();
ExtractedField invalidA = ExtractedField.newField("a", ExtractedField.ExtractionMethod.SOURCE);
assertThat(invalidA.value(hit), equalTo(new Object[0]));
ExtractedField validA = ExtractedField.newField("a", ExtractedField.ExtractionMethod.DOC_VALUE);
assertThat(validA.value(hit), equalTo(new Integer[] { 1 }));
ExtractedField invalidB = ExtractedField.newField("b", ExtractedField.ExtractionMethod.DOC_VALUE);
assertThat(invalidB.value(hit), equalTo(new Object[0]));
ExtractedField validB = ExtractedField.newField("b", ExtractedField.ExtractionMethod.SOURCE);
assertThat(validB.value(hit), equalTo(new Integer[] { 2 }));
}
public void testValueGivenEmptyHit() {
SearchHit hit = new SearchHitBuilder(42).build();
ExtractedField docValue = ExtractedField.newField("a", ExtractedField.ExtractionMethod.SOURCE);
assertThat(docValue.value(hit), equalTo(new Object[0]));
ExtractedField sourceField = ExtractedField.newField("b", ExtractedField.ExtractionMethod.DOC_VALUE);
assertThat(sourceField.value(hit), equalTo(new Object[0]));
}
static class SearchHitBuilder {
private final InternalSearchHit hit;
private final Map<String, SearchHitField> fields;
SearchHitBuilder(int docId) {
hit = new InternalSearchHit(docId);
fields = new HashMap<>();
}
SearchHitBuilder addField(String name, Object value) {
return addField(name, Arrays.asList(value));
}
SearchHitBuilder addField(String name, List<Object> values) {
fields.put(name, new InternalSearchHitField(name, values));
return this;
}
SearchHitBuilder setSource(String sourceJson) {
hit.sourceRef(new BytesArray(sourceJson));
return this;
}
SearchHit build() {
if (!fields.isEmpty()) {
hit.fields(fields);
}
return hit;
}
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.scheduler.extractor.scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
public class ExtractedFieldsTests extends ESTestCase {
private ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE);
public void testInvalidConstruction() {
expectThrows(IllegalArgumentException.class, () -> new ExtractedFields(timeField, Collections.emptyList()));
}
public void testTimeFieldOnly() {
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField));
assertThat(extractedFields.getAllFields(), equalTo(Arrays.asList(timeField)));
assertThat(extractedFields.timeField(), equalTo("time"));
assertThat(extractedFields.getDocValueFields(), equalTo(new String[] { timeField.getName() }));
assertThat(extractedFields.getSourceFields().length, equalTo(0));
}
public void testAllTypesOfFields() {
ExtractedField docValue1 = ExtractedField.newField("doc1", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedField docValue2 = ExtractedField.newField("doc2", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedField scriptField1 = ExtractedField.newField("scripted1", ExtractedField.ExtractionMethod.SCRIPT_FIELD);
ExtractedField scriptField2 = ExtractedField.newField("scripted2", ExtractedField.ExtractionMethod.SCRIPT_FIELD);
ExtractedField sourceField1 = ExtractedField.newField("src1", ExtractedField.ExtractionMethod.SOURCE);
ExtractedField sourceField2 = ExtractedField.newField("src2", ExtractedField.ExtractionMethod.SOURCE);
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField,
docValue1, docValue2, scriptField1, scriptField2, sourceField1, sourceField2));
assertThat(extractedFields.getAllFields().size(), equalTo(7));
assertThat(extractedFields.timeField(), equalTo("time"));
assertThat(extractedFields.getDocValueFields(), equalTo(new String[] {"time", "doc1", "doc2"}));
assertThat(extractedFields.getSourceFields(), equalTo(new String[] {"src1", "src2"}));
}
public void testTimeFieldValue() {
SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(1).addField("time", 1000L).build();
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField));
assertThat(extractedFields.timeFieldValue(hit), equalTo(1000L));
}
public void testTimeFieldValueGivenEmptyArray() {
SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(1).addField("time", Collections.emptyList()).build();
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField));
expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit));
}
public void testTimeFieldValueGivenValueHasTwoElements() {
SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(1).addField("time", Arrays.asList(1L, 2L)).build();
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField));
expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit));
}
public void testTimeFieldValueGivenValueIsString() {
SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(1).addField("time", "a string").build();
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField));
expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit));
}
}

View File

@ -14,7 +14,6 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHitField;
@ -48,12 +47,10 @@ public class ScrollDataExtractorTests extends ESTestCase {
private List<String> capturedContinueScrollIds;
private List<String> capturedClearScrollIds;
private String jobId;
private List<String> jobFields;
private String timeField;
private ExtractedFields extractedFields;
private List<String> types;
private List<String> indexes;
private QueryBuilder query;
private AggregatorFactories.Builder aggregations;
private List<SearchSourceBuilder.ScriptField> scriptFields;
private int scrollSize;
@ -93,13 +90,13 @@ public class ScrollDataExtractorTests extends ESTestCase {
capturedSearchRequests = new ArrayList<>();
capturedContinueScrollIds = new ArrayList<>();
capturedClearScrollIds = new ArrayList<>();
timeField = "time";
jobId = "test-job";
jobFields = Arrays.asList(timeField, "field_1");
ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE);
extractedFields = new ExtractedFields(timeField,
Arrays.asList(timeField, ExtractedField.newField("field_1", ExtractedField.ExtractionMethod.DOC_VALUE)));
indexes = Arrays.asList("index-1", "index-2");
types = Arrays.asList("type-1", "type-2");
query = QueryBuilders.matchAllQuery();
aggregations = null;
scriptFields = Collections.emptyList();
scrollSize = 1000;
}
@ -253,8 +250,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
}
private ScrollDataExtractorContext createContext(long start, long end) {
return new ScrollDataExtractorContext(jobId, jobFields, timeField, indexes, types, query, aggregations, scriptFields, scrollSize,
start, end);
return new ScrollDataExtractorContext(jobId, extractedFields, indexes, types, query, scriptFields, scrollSize, start, end);
}
private SearchResponse createEmptySearchResponse() {
@ -270,7 +266,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
for (int i = 0; i < timestamps.size(); i++) {
InternalSearchHit hit = new InternalSearchHit(randomInt());
Map<String, SearchHitField> fields = new HashMap<>();
fields.put(timeField, new InternalSearchHitField("time", Arrays.asList(timestamps.get(i))));
fields.put(extractedFields.timeField(), new InternalSearchHitField("time", Arrays.asList(timestamps.get(i))));
fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(field1Values.get(i))));
fields.put("field_2", new InternalSearchHitField("field_2", Arrays.asList(field2Values.get(i))));
hit.fields(fields);

View File

@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.scheduler.extractor.scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import static org.hamcrest.Matchers.equalTo;
public class SearchHitToJsonProcessorTests extends ESTestCase {
public void testProcessGivenSingleHit() throws IOException {
ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedField missingField = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedField singleField = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedField arrayField = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField, missingField, singleField, arrayField));
SearchHit hit = new ExtractedFieldTests.SearchHitBuilder(8)
.addField("time", 1000L)
.addField("single", "a")
.addField("array", Arrays.asList("b", "c"))
.build();
String json = searchHitToString(extractedFields, hit);
assertThat(json, equalTo("{\"time\":1000,\"single\":\"a\",\"array\":[\"b\",\"c\"]}"));
}
public void testProcessGivenMultipleHits() throws IOException {
ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedField missingField = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedField singleField = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedField arrayField = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField, missingField, singleField, arrayField));
SearchHit hit1 = new ExtractedFieldTests.SearchHitBuilder(8)
.addField("time", 1000L)
.addField("single", "a1")
.addField("array", Arrays.asList("b1", "c1"))
.build();
SearchHit hit2 = new ExtractedFieldTests.SearchHitBuilder(8)
.addField("time", 2000L)
.addField("single", "a2")
.addField("array", Arrays.asList("b2", "c2"))
.build();
String json = searchHitToString(extractedFields, hit1, hit2);
assertThat(json, equalTo("{\"time\":1000,\"single\":\"a1\",\"array\":[\"b1\",\"c1\"]} " +
"{\"time\":2000,\"single\":\"a2\",\"array\":[\"b2\",\"c2\"]}"));
}
private String searchHitToString(ExtractedFields fields, SearchHit... searchHits) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(fields, outputStream)) {
for (int i = 0; i < searchHits.length; i++) {
hitProcessor.process(searchHits[i]);
}
}
return outputStream.toString(StandardCharsets.UTF_8.name());
}
}

View File

@ -5,19 +5,16 @@
*/
package org.elasticsearch.xpack.ml.transforms.date;
import static org.elasticsearch.xpack.ml.transforms.TransformTestUtils.createIndexArray;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.transforms.Transform.TransformIndex;
import org.elasticsearch.xpack.ml.transforms.TransformException;
import static org.mockito.Mockito.mock;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.List;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.transforms.Transform.TransformIndex;
import org.elasticsearch.xpack.ml.transforms.TransformException;
import static org.elasticsearch.xpack.ml.transforms.TransformTestUtils.createIndexArray;
import static org.mockito.Mockito.mock;
public class DateFormatTransformTests extends ESTestCase {
@ -26,7 +23,7 @@ public class DateFormatTransformTests extends ESTestCase {
List<TransformIndex> writeIndexes = createIndexArray(new TransformIndex(2, 0));
DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss.SSSXXX",
ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class));
readIndexes, writeIndexes, mock(Logger.class));
String[] input = {"2014-01-01 13:42:56.500Z"};
String[] scratch = {};
@ -42,8 +39,7 @@ public class DateFormatTransformTests extends ESTestCase {
public void testTransform_GivenInvalidFormat() throws TransformException {
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
() -> new DateFormatTransform("yyyy-MM HH:mm:ss", ZoneOffset.systemDefault(),
Collections.emptyList(), Collections.emptyList(), mock(Logger.class)));
() -> new DateFormatTransform("yyyy-MM HH:mm:ss", Collections.emptyList(), Collections.emptyList(), mock(Logger.class)));
assertEquals("Timestamp cannot be derived from pattern: yyyy-MM HH:mm:ss", e.getMessage());
@ -54,8 +50,7 @@ public class DateFormatTransformTests extends ESTestCase {
List<TransformIndex> readIndexes = createIndexArray(new TransformIndex(0, 0));
List<TransformIndex> writeIndexes = createIndexArray(new TransformIndex(2, 0));
DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss",
ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class));
DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss", readIndexes, writeIndexes, mock(Logger.class));
String[] input = {"invalid"};
String[] scratch = {};
@ -73,8 +68,7 @@ public class DateFormatTransformTests extends ESTestCase {
List<TransformIndex> readIndexes = createIndexArray(new TransformIndex(1, 0));
List<TransformIndex> writeIndexes = createIndexArray(new TransformIndex(2, 0));
DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss",
ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class));
DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ss", readIndexes, writeIndexes, mock(Logger.class));
String[] input = {};
String[] scratch = {null};
@ -90,15 +84,14 @@ public class DateFormatTransformTests extends ESTestCase {
List<TransformIndex> writeIndexes = createIndexArray(new TransformIndex(2, 0));
ESTestCase.expectThrows(IllegalArgumentException.class,
() -> new DateFormatTransform("e", ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class)));
() -> new DateFormatTransform("e", readIndexes, writeIndexes, mock(Logger.class)));
}
public void testTransform_FromScratchArea() throws TransformException {
List<TransformIndex> readIndexes = createIndexArray(new TransformIndex(1, 0));
List<TransformIndex> writeIndexes = createIndexArray(new TransformIndex(2, 0));
DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ssXXX",
ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class));
DateFormatTransform transformer = new DateFormatTransform("yyyy-MM-dd HH:mm:ssXXX", readIndexes, writeIndexes, mock(Logger.class));
String[] input = {};
String[] scratch = {"2014-01-01 00:00:00Z"};
@ -116,7 +109,7 @@ public class DateFormatTransformTests extends ESTestCase {
List<TransformIndex> writeIndexes = createIndexArray(new TransformIndex(2, 0));
DateFormatTransform transformer = new DateFormatTransform("'['yyyy-MM-dd HH:mm:ssX']'",
ZoneOffset.systemDefault(), readIndexes, writeIndexes, mock(Logger.class));
readIndexes, writeIndexes, mock(Logger.class));
String[] input = {"[2014-06-23 00:00:00Z]"};
String[] scratch = {};

View File

@ -10,7 +10,7 @@ setup:
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"ELASTICSEARCH",
"format":"JSON",
"time_field":"time",
"time_format":"epoch"
}
@ -27,9 +27,7 @@ setup:
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"ELASTICSEARCH",
"time_field":"time",
"time_format":"epoch"
"time_field":"time"
}
}

View File

@ -10,7 +10,7 @@ setup:
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"ELASTICSEARCH",
"format":"JSON",
"time_field":"time",
"time_format":"epoch"
}
@ -27,9 +27,7 @@ setup:
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"ELASTICSEARCH",
"time_field":"time",
"time_format":"epoch"
"time_field":"time"
}
}

View File

@ -195,7 +195,7 @@
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"ELASTICSEARCH",
"format":"JSON",
"time_field":"time",
"time_format":"yyyy-MM-dd HH:mm:ssX"
}

View File

@ -33,7 +33,7 @@ setup:
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format" : "ELASTICSEARCH",
"format" : "JSON",
"time_field":"time",
"time_format":"yyyy-MM-dd'T'HH:mm:ssX"
}

View File

@ -10,7 +10,7 @@ setup:
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"ELASTICSEARCH",
"format":"JSON",
"time_field":"time",
"time_format":"epoch"
}
@ -26,9 +26,7 @@ setup:
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"ELASTICSEARCH",
"time_field":"time",
"time_format":"epoch"
"time_field":"time"
}
}

View File

@ -21,7 +21,7 @@ setup:
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"ELASTICSEARCH",
"format":"JSON",
"time_field":"time",
"time_format":"epoch"
}