[ML] Automate detection of way to extract fields (elastic/x-pack-elasticsearch#1686)

In 5.4.x, the datafeed attempts to get all fields from
doc_values by default. It has a `_source` parameter which
when enabled changes the strategy to instead try to get
all fields from the source.

This has been the most common issue users have been
reporting as it means the datafeed will fail to fetch
any text fields by default.

This change uses the field capabilities API in order
to automatically detect whether a field is aggregatable.
It then extracts such fields from doc_values while the
rest are taken from source. The change also adds
validation to the start datafeed action so that if
fields are missing mappings or the time field is not
aggregatable we respond with an appropriate error.

relates elastic/x-pack-elasticsearch#1649

Original commit: elastic/x-pack-elasticsearch@76e2cc6cb2
This commit is contained in:
Dimitris Athanasiou 2017-06-12 14:56:31 +01:00 committed by GitHub
parent c9bbc17742
commit 8eb62eac27
27 changed files with 843 additions and 441 deletions

View File

@ -70,6 +70,7 @@ import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.JobManager;
@ -299,7 +300,8 @@ public class MachineLearning implements ActionPlugin {
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, xContentRegistry, auditor);
PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, internalClient);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, internalClient, clusterService, jobProvider,
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(internalClient, jobProvider, auditor, System::currentTimeMillis);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, internalClient, clusterService, datafeedJobBuilder,
System::currentTimeMillis, auditor, persistentTasksService);
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(settings, clusterService, datafeedManager, autodetectProcessManager);
InvalidLicenseEnforcer invalidLicenseEnforcer =

View File

@ -218,9 +218,19 @@ public class PreviewDatafeedAction extends Action<PreviewDatafeedAction.Request,
// NB: this is using the client from the transport layer, NOT the internal client.
// This is important because it means the datafeed search will fail if the user
// requesting the preview doesn't have permission to search the relevant indices.
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedWithAutoChunking.build(), job);
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
DataExtractorFactory.create(client, datafeedWithAutoChunking.build(), job, new ActionListener<DataExtractorFactory>() {
@Override
public void onResponse(DataExtractorFactory dataExtractorFactory) {
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
/** Visible for testing */

View File

@ -46,7 +46,6 @@ import org.elasticsearch.xpack.security.SecurityContext;
import org.elasticsearch.xpack.security.action.user.HasPrivilegesAction;
import org.elasticsearch.xpack.security.action.user.HasPrivilegesRequest;
import org.elasticsearch.xpack.security.action.user.HasPrivilegesResponse;
import org.elasticsearch.xpack.security.authc.Authentication;
import org.elasticsearch.xpack.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.security.support.Exceptions;

View File

@ -51,6 +51,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.messages.Messages;
@ -63,6 +64,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste
import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException;
import java.util.Objects;
@ -419,16 +421,19 @@ public class StartDatafeedAction
// The start datafeed api is a low through put api, so the fact that we redirect to elected master node shouldn't be an issue.
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final InternalClient client;
private final XPackLicenseState licenseState;
private final PersistentTasksService persistentTasksService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
XPackLicenseState licenseState, PersistentTasksService persistentTasksService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
InternalClient client) {
super(settings, NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new);
this.licenseState = licenseState;
this.persistentTasksService = persistentTasksService;
this.client = client;
}
@Override
@ -463,7 +468,17 @@ public class StartDatafeedAction
listener.onFailure(e);
}
};
persistentTasksService.startPersistentTask(MlMetadata.datafeedTaskId(params.datafeedId), TASK_NAME, params, finalListener);
// Verify data extractor factory can be created, then start persistent task
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
StartDatafeedAction.validate(params.getDatafeedId(), mlMetadata, tasks);
DatafeedConfig datafeed = mlMetadata.getDatafeed(params.getDatafeedId());
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
DataExtractorFactory.create(client, datafeed, job, ActionListener.wrap(
dataExtractorFactory -> persistentTasksService.startPersistentTask(MlMetadata.datafeedTaskId(params.datafeedId),
TASK_NAME, params, finalListener)
, listener::onFailure));
} else {
listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING));
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
@ -99,7 +100,8 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
return parsedScriptFields;
}, SCRIPT_FIELDS);
PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE);
PARSER.declareBoolean(Builder::setSource, SOURCE);
// TODO this is to read former _source field. Remove in v7.0.0
PARSER.declareBoolean((builder, value) -> {}, SOURCE);
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG);
}
@ -122,12 +124,11 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private final AggregatorFactories.Builder aggregations;
private final List<SearchSourceBuilder.ScriptField> scriptFields;
private final Integer scrollSize;
private final boolean source;
private final ChunkingConfig chunkingConfig;
private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
QueryBuilder query, AggregatorFactories.Builder aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, boolean source, ChunkingConfig chunkingConfig) {
Integer scrollSize, ChunkingConfig chunkingConfig) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
@ -138,7 +139,6 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.aggregations = aggregations;
this.scriptFields = scriptFields;
this.scrollSize = scrollSize;
this.source = source;
this.chunkingConfig = chunkingConfig;
}
@ -165,7 +165,10 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.scriptFields = null;
}
this.scrollSize = in.readOptionalVInt();
this.source = in.readBoolean();
if (in.getVersion().before(Version.V_5_5_0)) {
// read former _source field
in.readBoolean();
}
this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new);
}
@ -197,10 +200,6 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
return scrollSize;
}
public boolean isSource() {
return source;
}
public QueryBuilder getQuery() {
return query;
}
@ -338,7 +337,10 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
out.writeBoolean(false);
}
out.writeOptionalVInt(scrollSize);
out.writeBoolean(source);
if (out.getVersion().before(Version.V_5_5_0)) {
// write former _source field
out.writeBoolean(false);
}
out.writeOptionalWriteable(chunkingConfig);
}
@ -371,9 +373,6 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
builder.endObject();
}
builder.field(SCROLL_SIZE.getPreferredName(), scrollSize);
if (source) {
builder.field(SOURCE.getPreferredName(), source);
}
if (chunkingConfig != null) {
builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig);
}
@ -407,13 +406,12 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(this.aggregations, that.aggregations)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.source, that.source)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
}
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields, source,
return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields,
chunkingConfig);
}
@ -438,7 +436,6 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private AggregatorFactories.Builder aggregations;
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize = DEFAULT_SCROLL_SIZE;
private boolean source = false;
private ChunkingConfig chunkingConfig;
public Builder() {
@ -461,7 +458,6 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.aggregations = config.aggregations;
this.scriptFields = config.scriptFields;
this.scrollSize = config.scrollSize;
this.source = config.source;
this.chunkingConfig = config.chunkingConfig;
}
@ -517,10 +513,6 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.scrollSize = scrollSize;
}
public void setSource(boolean enabled) {
this.source = enabled;
}
public void setChunkingConfig(ChunkingConfig chunkingConfig) {
this.chunkingConfig = chunkingConfig;
}
@ -540,7 +532,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
validateAggregations();
setDefaultChunkingConfig();
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
source, chunkingConfig);
chunkingConfig);
}
private void validateAggregations() {

View File

@ -173,16 +173,16 @@ class DatafeedJob {
// Instead, it is preferable to retry the given interval next time an extraction
// is triggered.
// A common issue for our users is that they have fields without doc values.
// Yet, they did not enable _source on the datafeed. It is really useful
// to display a specific error message for this situation. Unfortunately,
// there are no great ways to identify the issue but search for 'doc values'
// For aggregated datafeeds it is possible for our users to use fields without doc values.
// In that case, it is really useful to display an error message explaining exactly that.
// Unfortunately, there are no great ways to identify the issue but search for 'doc values'
// deep in the exception.
if (e.toString().contains("doc values")) {
throw new ExtractionProblemException(new IllegalArgumentException("One or more fields do not have doc values; " +
"please enable doc values for all analysis fields or enable _source on the datafeed"));
throw new ExtractionProblemException(nextRealtimeTimestamp(), new IllegalArgumentException(
"One or more fields do not have doc values; please enable doc values for all analysis fields for datafeeds" +
" using aggregations"));
}
throw new ExtractionProblemException(e);
throw new ExtractionProblemException(nextRealtimeTimestamp(), e);
}
if (isIsolated) {
return;
@ -209,7 +209,7 @@ class DatafeedJob {
// happened to the c++ process. We sent a batch of data to the c++ process
// yet we do not know how many of those were processed. It is better to
// advance time in order to avoid importing duplicate data.
error = new AnalysisProblemException(shouldStop, e);
error = new AnalysisProblemException(nextRealtimeTimestamp(), shouldStop, e);
break;
}
recordCount += counts.getProcessedRecordCount();
@ -229,7 +229,7 @@ class DatafeedJob {
}
if (recordCount == 0) {
throw new EmptyDataCountException();
throw new EmptyDataCountException(nextRealtimeTimestamp());
}
// If the datafeed was stopped, then it is possible that by the time
@ -280,35 +280,46 @@ class DatafeedJob {
// happened to the c++ process. We sent a batch of data to the c++ process
// yet we do not know how many of those were processed. It is better to
// advance time in order to avoid importing duplicate data.
throw new AnalysisProblemException(shouldStop, e);
throw new AnalysisProblemException(nextRealtimeTimestamp(), shouldStop, e);
}
}
class AnalysisProblemException extends RuntimeException {
/**
* Visible for testing
*/
Long lastEndTimeMs() {
return lastEndTimeMs;
}
static class AnalysisProblemException extends RuntimeException {
final boolean shouldStop;
final long nextDelayInMsSinceEpoch = nextRealtimeTimestamp();
final long nextDelayInMsSinceEpoch;
AnalysisProblemException(boolean shouldStop, Throwable cause) {
AnalysisProblemException(long nextDelayInMsSinceEpoch, boolean shouldStop, Throwable cause) {
super(cause);
this.shouldStop = shouldStop;
this.nextDelayInMsSinceEpoch = nextDelayInMsSinceEpoch;
}
}
class ExtractionProblemException extends RuntimeException {
static class ExtractionProblemException extends RuntimeException {
final long nextDelayInMsSinceEpoch = nextRealtimeTimestamp();
final long nextDelayInMsSinceEpoch;
ExtractionProblemException(Throwable cause) {
ExtractionProblemException(long nextDelayInMsSinceEpoch, Throwable cause) {
super(cause);
this.nextDelayInMsSinceEpoch = nextDelayInMsSinceEpoch;
}
}
class EmptyDataCountException extends RuntimeException {
static class EmptyDataCountException extends RuntimeException {
final long nextDelayInMsSinceEpoch = nextRealtimeTimestamp();
final long nextDelayInMsSinceEpoch;
EmptyDataCountException() {}
EmptyDataCountException(long nextDelayInMsSinceEpoch) {
this.nextDelayInMsSinceEpoch = nextDelayInMsSinceEpoch;
}
}
}

View File

@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class DatafeedJobBuilder {
private final Client client;
private final JobProvider jobProvider;
private final Auditor auditor;
private final Supplier<Long> currentTimeSupplier;
public DatafeedJobBuilder(Client client, JobProvider jobProvider, Auditor auditor, Supplier<Long> currentTimeSupplier) {
this.client = Objects.requireNonNull(client);
this.jobProvider = Objects.requireNonNull(jobProvider);
this.auditor = Objects.requireNonNull(auditor);
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
}
void build(Job job, DatafeedConfig datafeed, ActionListener<DatafeedJob> listener) {
// Step 5. Build datafeed job object
Consumer<Context> contextHanlder = context -> {
Duration frequency = getFrequencyOrDefault(datafeed, job);
Duration queryDelay = Duration.ofMillis(datafeed.getQueryDelay().millis());
DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(),
context.dataExtractorFactory, client, auditor, currentTimeSupplier,
context.latestFinalBucketEndMs, context.latestRecordTimeMs);
listener.onResponse(datafeedJob);
};
final Context context = new Context();
// Step 4. Context building complete - invoke final listener
ActionListener<DataExtractorFactory> dataExtractorFactoryHandler = ActionListener.wrap(
dataExtractorFactory -> {
context.dataExtractorFactory = dataExtractorFactory;
contextHanlder.accept(context);
}, e -> {
auditor.error(job.getId(), e.getMessage());
listener.onFailure(e);
}
);
// Step 3. Create data extractory factory
Consumer<DataCounts> dataCountsHandler = dataCounts -> {
if (dataCounts.getLatestRecordTimeStamp() != null) {
context.latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime();
}
DataExtractorFactory.create(client, datafeed, job, dataExtractorFactoryHandler);
};
// Step 2. Collect data counts
Consumer<QueryPage<Bucket>> bucketsHandler = buckets -> {
if (buckets.results().size() == 1) {
TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
context.latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.millis() - 1;
}
jobProvider.dataCounts(job.getId(), dataCountsHandler, listener::onFailure);
};
// Step 1. Collect latest bucket
BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder()
.sortField(Result.TIMESTAMP.getPreferredName())
.sortDescending(true).size(1)
.includeInterim(false)
.build();
jobProvider.bucketsViaInternalClient(job.getId(), latestBucketQuery, bucketsHandler, e -> {
if (e instanceof ResourceNotFoundException) {
QueryPage<Bucket> empty = new QueryPage<>(Collections.emptyList(), 0, Bucket.RESULT_TYPE_FIELD);
bucketsHandler.accept(empty);
} else {
listener.onFailure(e);
}
});
}
private static Duration getFrequencyOrDefault(DatafeedConfig datafeed, Job job) {
TimeValue frequency = datafeed.getFrequency();
TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
return frequency == null ? DefaultFrequency.ofBucketSpan(bucketSpan.seconds()) : Duration.ofSeconds(frequency.seconds());
}
private static DataDescription buildDataDescription(Job job) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
if (job.getDataDescription() != null) {
dataDescription.setTimeField(job.getDataDescription().getTimeField());
}
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
return dataDescription.build();
}
private static class Context {
volatile long latestFinalBucketEndMs = -1L;
volatile long latestRecordTimeMs = -1L;
volatile DataExtractorFactory dataExtractorFactory;
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -23,25 +22,16 @@ import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
@ -51,7 +41,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -64,29 +53,29 @@ public class DatafeedManager extends AbstractComponent {
private final Client client;
private final ClusterService clusterService;
private final PersistentTasksService persistentTasksService;
private final JobProvider jobProvider;
private final ThreadPool threadPool;
private final Supplier<Long> currentTimeSupplier;
private final Auditor auditor;
// Use allocationId as key instead of datafeed id
private final ConcurrentMap<Long, Holder> runningDatafeedsOnThisNode = new ConcurrentHashMap<>();
private volatile boolean isolated;
private final DatafeedJobBuilder datafeedJobBuilder;
private final TaskRunner taskRunner = new TaskRunner();
private volatile boolean isolated;
public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider,
public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
Supplier<Long> currentTimeSupplier, Auditor auditor, PersistentTasksService persistentTasksService) {
super(Settings.EMPTY);
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.jobProvider = Objects.requireNonNull(jobProvider);
this.threadPool = threadPool;
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
this.auditor = Objects.requireNonNull(auditor);
this.persistentTasksService = Objects.requireNonNull(persistentTasksService);
this.datafeedJobBuilder = Objects.requireNonNull(datafeedJobBuilder);
clusterService.addListener(taskRunner);
}
public void run(StartDatafeedAction.DatafeedTask task, Consumer<Exception> handler) {
public void run(StartDatafeedAction.DatafeedTask task, Consumer<Exception> taskHandler) {
String datafeedId = task.getDatafeedId();
ClusterState state = clusterService.state();
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
@ -96,30 +85,27 @@ public class DatafeedManager extends AbstractComponent {
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
gatherInformation(job.getId(), (buckets, dataCounts) -> {
long latestFinalBucketEndMs = -1L;
TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
if (buckets.results().size() == 1) {
latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.millis() - 1;
}
long latestRecordTimeMs = -1L;
if (dataCounts.getLatestRecordTimeStamp() != null) {
latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime();
}
Holder holder = createJobDatafeed(datafeed, job, latestFinalBucketEndMs, latestRecordTimeMs, handler, task);
runningDatafeedsOnThisNode.put(task.getAllocationId(), holder);
task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
taskRunner.runWhenJobIsOpened(task);
}
@Override
public void onFailure(Exception e) {
handler.accept(e);
}
});
}, handler);
ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(
datafeedJob -> {
Holder holder = new Holder(task.getPersistentTaskId(), task.getAllocationId(), datafeed, datafeedJob,
task.isLookbackOnly(), new ProblemTracker(auditor, job.getId()), taskHandler);
runningDatafeedsOnThisNode.put(task.getAllocationId(), holder);
task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
taskRunner.runWhenJobIsOpened(task);
}
@Override
public void onFailure(Exception e) {
taskHandler.accept(e);
}
});
}, taskHandler::accept
);
datafeedJobBuilder.build(job, datafeed, datafeedJobHandler);
}
public void stopDatafeed(StartDatafeedAction.DatafeedTask task, String reason, TimeValue timeout) {
@ -269,21 +255,6 @@ public class DatafeedManager extends AbstractComponent {
}
}
Holder createJobDatafeed(DatafeedConfig datafeed, Job job, long finalBucketEndMs, long latestRecordTimeMs,
Consumer<Exception> handler, StartDatafeedAction.DatafeedTask task) {
Duration frequency = getFrequencyOrDefault(datafeed, job);
Duration queryDelay = Duration.ofMillis(datafeed.getQueryDelay().millis());
DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job);
DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(),
dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs);
return new Holder(task.getPersistentTaskId(), task.getAllocationId(), datafeed, datafeedJob, task.isLookbackOnly(),
new ProblemTracker(auditor, job.getId()), handler);
}
DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeed, Job job) {
return DataExtractorFactory.create(client, datafeed, job);
}
private String getJobId(StartDatafeedAction.DatafeedTask task) {
return runningDatafeedsOnThisNode.get(task.getAllocationId()).getJobId();
}
@ -292,40 +263,6 @@ public class DatafeedManager extends AbstractComponent {
return MlMetadata.getJobState(getJobId(datafeedTask), tasks);
}
private static DataDescription buildDataDescription(Job job) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
if (job.getDataDescription() != null) {
dataDescription.setTimeField(job.getDataDescription().getTimeField());
}
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
return dataDescription.build();
}
private void gatherInformation(String jobId, BiConsumer<QueryPage<Bucket>, DataCounts> handler, Consumer<Exception> errorHandler) {
BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder()
.sortField(Result.TIMESTAMP.getPreferredName())
.sortDescending(true).size(1)
.includeInterim(false)
.build();
jobProvider.bucketsViaInternalClient(jobId, latestBucketQuery, buckets -> {
jobProvider.dataCounts(jobId, dataCounts -> handler.accept(buckets, dataCounts), errorHandler);
}, e -> {
if (e instanceof ResourceNotFoundException) {
QueryPage<Bucket> empty = new QueryPage<>(Collections.emptyList(), 0, Bucket.RESULT_TYPE_FIELD);
jobProvider.dataCounts(jobId, dataCounts -> handler.accept(empty, dataCounts), errorHandler);
} else {
errorHandler.accept(e);
}
});
}
private static Duration getFrequencyOrDefault(DatafeedConfig datafeed, Job job) {
TimeValue frequency = datafeed.getFrequency();
TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
return frequency == null ? DefaultFrequency.ofBucketSpan(bucketSpan.seconds()) : Duration.ofSeconds(frequency.seconds());
}
private TimeValue computeNextDelay(long next) {
return new TimeValue(Math.max(1, next - currentTimeSupplier.get()));
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -62,7 +63,6 @@ public class DatafeedUpdate implements Writeable, ToXContent {
return parsedScriptFields;
}, DatafeedConfig.SCRIPT_FIELDS);
PARSER.declareInt(Builder::setScrollSize, DatafeedConfig.SCROLL_SIZE);
PARSER.declareBoolean(Builder::setSource, DatafeedConfig.SOURCE);
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, DatafeedConfig.CHUNKING_CONFIG);
}
@ -76,12 +76,11 @@ public class DatafeedUpdate implements Writeable, ToXContent {
private final AggregatorFactories.Builder aggregations;
private final List<SearchSourceBuilder.ScriptField> scriptFields;
private final Integer scrollSize;
private final Boolean source;
private final ChunkingConfig chunkingConfig;
private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
QueryBuilder query, AggregatorFactories.Builder aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, Boolean source, ChunkingConfig chunkingConfig) {
Integer scrollSize, ChunkingConfig chunkingConfig) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
@ -92,7 +91,6 @@ public class DatafeedUpdate implements Writeable, ToXContent {
this.aggregations = aggregations;
this.scriptFields = scriptFields;
this.scrollSize = scrollSize;
this.source = source;
this.chunkingConfig = chunkingConfig;
}
@ -119,7 +117,10 @@ public class DatafeedUpdate implements Writeable, ToXContent {
this.scriptFields = null;
}
this.scrollSize = in.readOptionalVInt();
this.source = in.readOptionalBoolean();
if (in.getVersion().before(Version.V_5_5_0)) {
// TODO for former _source param - remove in v7.0.0
in.readOptionalBoolean();
}
this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new);
}
@ -157,7 +158,10 @@ public class DatafeedUpdate implements Writeable, ToXContent {
out.writeBoolean(false);
}
out.writeOptionalVInt(scrollSize);
out.writeOptionalBoolean(source);
if (out.getVersion().before(Version.V_5_5_0)) {
// TODO for former _source param - remove in v7.0.0
out.writeOptionalBoolean(false);
}
out.writeOptionalWriteable(chunkingConfig);
}
@ -184,7 +188,6 @@ public class DatafeedUpdate implements Writeable, ToXContent {
builder.endObject();
}
addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize);
addOptionalField(builder, DatafeedConfig.SOURCE, source);
addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
builder.endObject();
return builder;
@ -233,9 +236,6 @@ public class DatafeedUpdate implements Writeable, ToXContent {
if (scrollSize != null) {
builder.setScrollSize(scrollSize);
}
if (source != null) {
builder.setSource(source);
}
if (chunkingConfig != null) {
builder.setChunkingConfig(chunkingConfig);
}
@ -269,13 +269,12 @@ public class DatafeedUpdate implements Writeable, ToXContent {
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(this.aggregations, that.aggregations)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.source, that.source)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
}
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields, source,
return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields,
chunkingConfig);
}
@ -296,7 +295,6 @@ public class DatafeedUpdate implements Writeable, ToXContent {
private AggregatorFactories.Builder aggregations;
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize;
private Boolean source;
private ChunkingConfig chunkingConfig;
public Builder() {
@ -317,7 +315,6 @@ public class DatafeedUpdate implements Writeable, ToXContent {
this.aggregations = config.aggregations;
this.scriptFields = config.scriptFields;
this.scrollSize = config.scrollSize;
this.source = config.source;
this.chunkingConfig = config.chunkingConfig;
}
@ -363,17 +360,13 @@ public class DatafeedUpdate implements Writeable, ToXContent {
this.scrollSize = scrollSize;
}
public void setSource(boolean enabled) {
this.source = enabled;
}
public void setChunkingConfig(ChunkingConfig chunkingConfig) {
this.chunkingConfig = chunkingConfig;
}
public DatafeedUpdate build() {
return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
source, chunkingConfig);
chunkingConfig);
}
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.datafeed.extractor;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
@ -18,11 +19,18 @@ public interface DataExtractorFactory {
/**
* Creates a {@code DataExtractorFactory} for the given datafeed-job combination.
*/
static DataExtractorFactory create(Client client, DatafeedConfig datafeedConfig, Job job) {
boolean isScrollSearch = datafeedConfig.hasAggregations() == false;
DataExtractorFactory dataExtractorFactory = isScrollSearch ? new ScrollDataExtractorFactory(client, datafeedConfig, job)
: new AggregationDataExtractorFactory(client, datafeedConfig, job);
return datafeedConfig.getChunkingConfig().isEnabled() ? new ChunkedDataExtractorFactory(
client, datafeedConfig, job, dataExtractorFactory) : dataExtractorFactory;
static void create(Client client, DatafeedConfig datafeed, Job job, ActionListener<DataExtractorFactory> listener) {
ActionListener<DataExtractorFactory> factoryHandler = ActionListener.wrap(
factory -> listener.onResponse(datafeed.getChunkingConfig().isEnabled()
? new ChunkedDataExtractorFactory(client, datafeed, job, factory) : factory)
, listener::onFailure
);
boolean isScrollSearch = datafeed.hasAggregations() == false;
if (isScrollSearch) {
ScrollDataExtractorFactory.create(client, datafeed, job, factoryHandler);
} else {
factoryHandler.onResponse(new AggregationDataExtractorFactory(client, datafeed, job));
}
}
}

View File

@ -5,14 +5,19 @@
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@ -21,6 +26,8 @@ class ExtractedFields {
private final ExtractedField timeField;
private final List<ExtractedField> allFields;
private final String[] docValueFields;
private final String[] sourceFields;
ExtractedFields(ExtractedField timeField, List<ExtractedField> allFields) {
if (!allFields.contains(timeField)) {
@ -28,6 +35,8 @@ class ExtractedFields {
}
this.timeField = Objects.requireNonNull(timeField);
this.allFields = Collections.unmodifiableList(allFields);
this.docValueFields = filterFields(ExtractedField.ExtractionMethod.DOC_VALUE, allFields);
this.sourceFields = filterFields(ExtractedField.ExtractionMethod.SOURCE, allFields);
}
public List<ExtractedField> getAllFields() {
@ -35,16 +44,16 @@ class ExtractedFields {
}
public String[] getSourceFields() {
return filterFields(ExtractedField.ExtractionMethod.SOURCE);
return sourceFields;
}
public String[] getDocValueFields() {
return filterFields(ExtractedField.ExtractionMethod.DOC_VALUE);
return docValueFields;
}
private String[] filterFields(ExtractedField.ExtractionMethod method) {
private static String[] filterFields(ExtractedField.ExtractionMethod method, List<ExtractedField> fields) {
List<String> result = new ArrayList<>();
for (ExtractedField field : allFields) {
for (ExtractedField field : fields) {
if (field.getExtractionMethod() == method) {
result.add(field.getName());
}
@ -68,19 +77,40 @@ class ExtractedFields {
throw new RuntimeException("Time field [" + timeField.getName() + "] expected a long value; actual was: " + value[0]);
}
public static ExtractedFields build(Job job, DatafeedConfig datafeedConfig) {
Set<String> scriptFields = datafeedConfig.getScriptFields().stream().map(sf -> sf.fieldName()).collect(Collectors.toSet());
public static ExtractedFields build(Job job, DatafeedConfig datafeed, FieldCapabilitiesResponse fieldsCapabilities) {
Set<String> scriptFields = datafeed.getScriptFields().stream().map(sf -> sf.fieldName()).collect(Collectors.toSet());
String timeField = job.getDataDescription().getTimeField();
if (scriptFields.contains(timeField) == false && isAggregatable(datafeed.getId(), timeField, fieldsCapabilities) == false) {
throw ExceptionsHelper.badRequestException("datafeed [" + datafeed.getId() + "] cannot retrieve time field [" + timeField
+ "] because it is not aggregatable");
}
ExtractedField timeExtractedField = ExtractedField.newTimeField(timeField, scriptFields.contains(timeField) ?
ExtractedField.ExtractionMethod.SCRIPT_FIELD : ExtractedField.ExtractionMethod.DOC_VALUE);
List<String> remainingFields = job.allFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList());
List<ExtractedField> allExtractedFields = new ArrayList<>(remainingFields.size());
List<String> remainingFields = job.allFields().stream().filter(
f -> !(f.equals(timeField) || f.equals(AnalysisConfig.ML_CATEGORY_FIELD))).collect(Collectors.toList());
List<ExtractedField> allExtractedFields = new ArrayList<>(remainingFields.size() + 1);
allExtractedFields.add(timeExtractedField);
for (String field : remainingFields) {
ExtractedField.ExtractionMethod method = scriptFields.contains(field) ? ExtractedField.ExtractionMethod.SCRIPT_FIELD :
datafeedConfig.isSource() ? ExtractedField.ExtractionMethod.SOURCE : ExtractedField.ExtractionMethod.DOC_VALUE;
ExtractedField.ExtractionMethod method = scriptFields.contains(field) ? ExtractedField.ExtractionMethod.SCRIPT_FIELD
: isAggregatable(datafeed.getId(), field, fieldsCapabilities) ? ExtractedField.ExtractionMethod.DOC_VALUE
: ExtractedField.ExtractionMethod.SOURCE;
allExtractedFields.add(ExtractedField.newField(field, method));
}
return new ExtractedFields(timeExtractedField, allExtractedFields);
}
private static boolean isAggregatable(String datafeedId, String field, FieldCapabilitiesResponse fieldsCapabilities) {
Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field);
if (fieldCaps == null || fieldCaps.isEmpty()) {
throw ExceptionsHelper.badRequestException("datafeed [" + datafeedId + "] cannot retrieve field [" + field
+ "] because it has no mappings");
}
for (FieldCapabilities capsPerIndex : fieldCaps.values()) {
if (!capsPerIndex.isAggregatable()) {
return false;
}
}
return true;
}
}

View File

@ -5,12 +5,20 @@
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.util.List;
import java.util.Objects;
public class ScrollDataExtractorFactory implements DataExtractorFactory {
@ -20,11 +28,11 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
private final Job job;
private final ExtractedFields extractedFields;
public ScrollDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job) {
private ScrollDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job, ExtractedFields extractedFields) {
this.client = Objects.requireNonNull(client);
this.datafeedConfig = Objects.requireNonNull(datafeedConfig);
this.job = Objects.requireNonNull(job);
this.extractedFields = ExtractedFields.build(job, datafeedConfig);
this.extractedFields = Objects.requireNonNull(extractedFields);
}
@Override
@ -41,4 +49,29 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
end);
return new ScrollDataExtractor(client, dataExtractorContext);
}
public static void create(Client client, DatafeedConfig datafeed, Job job, ActionListener<DataExtractorFactory> listener) {
// Step 2. Contruct the factory and notify listener
ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap(
fieldCapabilitiesResponse -> {
ExtractedFields extractedFields = ExtractedFields.build(job, datafeed, fieldCapabilitiesResponse);
listener.onResponse(new ScrollDataExtractorFactory(client, datafeed, job, extractedFields));
}, e -> {
if (e instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("datafeed [" + datafeed.getId()
+ "] cannot retrieve data because index " + ((IndexNotFoundException) e).getIndex() + " does not exist"));
} else {
listener.onFailure(e);
}
}
);
// Step 1. Get field capabilities necessary to build the information of how to extract fields
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
fieldCapabilitiesRequest.indices(datafeed.getIndices().toArray(new String[datafeed.getIndices().size()]));
List<String> fields = job.allFields();
fieldCapabilitiesRequest.fields(fields.toArray(new String[fields.size()]));
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler);
}
}

View File

@ -66,7 +66,7 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
private static final ParseField MULTIPLE_BUCKET_SPANS = new ParseField("multiple_bucket_spans");
private static final ParseField USER_PER_PARTITION_NORMALIZATION = new ParseField("use_per_partition_normalization");
private static final String ML_CATEGORY_FIELD = "mlcategory";
public static final String ML_CATEGORY_FIELD = "mlcategory";
public static final Set<String> AUTO_CREATED_FIELDS = new HashSet<>(Collections.singletonList(ML_CATEGORY_FIELD));
public static final long DEFAULT_RESULT_FINALIZATION_WINDOW = 2L;

View File

@ -405,8 +405,7 @@ public class JobProvider {
* Uses the internal client, so runs as the _xpack user
*/
public void bucketsViaInternalClient(String jobId, BucketsQuery query, Consumer<QueryPage<Bucket>> handler,
Consumer<Exception> errorHandler)
throws ResourceNotFoundException {
Consumer<Exception> errorHandler) {
buckets(jobId, query, handler, errorHandler, client);
}

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.ml.datafeed;
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
@ -80,9 +79,6 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
if (randomBoolean()) {
builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, 1_000_000)));
}
if (randomBoolean()) {
builder.setSource(randomBoolean());
}
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}

View File

@ -0,0 +1,180 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class DatafeedJobBuilderTests extends ESTestCase {
private Client client;
private Auditor auditor;
private JobProvider jobProvider;
private Consumer<Exception> taskHandler;
private DatafeedJobBuilder datafeedJobBuilder;
@Before
public void init() {
client = mock(Client.class);
auditor = mock(Auditor.class);
jobProvider = mock(JobProvider.class);
taskHandler = mock(Consumer.class);
datafeedJobBuilder = new DatafeedJobBuilder(client, jobProvider, auditor, System::currentTimeMillis);
Mockito.doAnswer(invocationOnMock -> {
String jobId = (String) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
Consumer<DataCounts> handler = (Consumer<DataCounts>) invocationOnMock.getArguments()[1];
handler.accept(new DataCounts(jobId));
return null;
}).when(jobProvider).dataCounts(any(), any(), any());
doAnswer(invocationOnMock -> {
@SuppressWarnings("rawtypes")
Consumer consumer = (Consumer) invocationOnMock.getArguments()[3];
consumer.accept(new ResourceNotFoundException("dummy"));
return null;
}).when(jobProvider).bucketsViaInternalClient(any(), any(), any(), any());
}
public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build();
AtomicBoolean wasHandlerCalled = new AtomicBoolean(false);
ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(
datafeedJob -> {
assertThat(datafeedJob.isRunning(), is(true));
assertThat(datafeedJob.isIsolated(), is(false));
assertThat(datafeedJob.lastEndTimeMs(), is(nullValue()));
wasHandlerCalled.compareAndSet(false, true);
}, e -> fail()
);
datafeedJobBuilder.build(jobBuilder.build(new Date()), datafeed, datafeedJobHandler);
assertBusy(() -> wasHandlerCalled.get());
}
public void testBuild_GivenScrollDatafeedAndOldJobWithLatestRecordTimestampAfterLatestBucket() throws Exception {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build();
givenLatestTimes(7_200_000L, 3_600_000L);
AtomicBoolean wasHandlerCalled = new AtomicBoolean(false);
ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(
datafeedJob -> {
assertThat(datafeedJob.isRunning(), is(true));
assertThat(datafeedJob.isIsolated(), is(false));
assertThat(datafeedJob.lastEndTimeMs(), equalTo(7_200_000L));
wasHandlerCalled.compareAndSet(false, true);
}, e -> fail()
);
datafeedJobBuilder.build(jobBuilder.build(new Date()), datafeed, datafeedJobHandler);
assertBusy(() -> wasHandlerCalled.get());
}
public void testBuild_GivenScrollDatafeedAndOldJobWithLatestBucketAfterLatestRecordTimestamp() throws Exception {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build();
givenLatestTimes(3_800_000L, 3_600_000L);
AtomicBoolean wasHandlerCalled = new AtomicBoolean(false);
ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(
datafeedJob -> {
assertThat(datafeedJob.isRunning(), is(true));
assertThat(datafeedJob.isIsolated(), is(false));
assertThat(datafeedJob.lastEndTimeMs(), equalTo(7_199_999L));
wasHandlerCalled.compareAndSet(false, true);
}, e -> fail()
);
datafeedJobBuilder.build(jobBuilder.build(new Date()), datafeed, datafeedJobHandler);
assertBusy(() -> wasHandlerCalled.get());
}
public void testBuild_GivenBucketsRequestFails() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build();
Exception error = new RuntimeException("error");
doAnswer(invocationOnMock -> {
@SuppressWarnings("rawtypes")
Consumer consumer = (Consumer) invocationOnMock.getArguments()[3];
consumer.accept(error);
return null;
}).when(jobProvider).bucketsViaInternalClient(any(), any(), any(), any());
datafeedJobBuilder.build(jobBuilder.build(new Date()), datafeed, ActionListener.wrap(datafeedJob -> fail(), taskHandler));
verify(taskHandler).accept(error);
}
private void givenLatestTimes(long latestRecordTimestamp, long latestBucketTimestamp) {
Mockito.doAnswer(invocationOnMock -> {
String jobId = (String) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
Consumer<DataCounts> handler = (Consumer<DataCounts>) invocationOnMock.getArguments()[1];
DataCounts dataCounts = new DataCounts(jobId);
dataCounts.setLatestRecordTimeStamp(new Date(latestRecordTimestamp));
handler.accept(dataCounts);
return null;
}).when(jobProvider).dataCounts(any(), any(), any());
doAnswer(invocationOnMock -> {
@SuppressWarnings("rawtypes")
Consumer consumer = (Consumer) invocationOnMock.getArguments()[2];
Bucket bucket = mock(Bucket.class);
when(bucket.getTimestamp()).thenReturn(new Date(latestBucketTimestamp));
QueryPage<Bucket> bucketQueryPage = new QueryPage(Collections.singletonList(bucket), 1, Bucket.RESULTS_FIELD);
consumer.accept(bucketQueryPage);
return null;
}).when(jobProvider).bucketsViaInternalClient(any(), any(), any(), any());
}
}

View File

@ -259,5 +259,4 @@ public class DatafeedJobTests extends ESTestCase {
return new DatafeedJob("_job_id", dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor,
currentTimeSupplier, latestFinalBucketEndTimeMs, latestRecordTimeMs);
}
}

View File

@ -5,9 +5,7 @@
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -18,48 +16,34 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction.DatafeedTask;
import org.elasticsearch.xpack.ml.action.StartDatafeedActionTests;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
@ -71,7 +55,6 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -82,12 +65,9 @@ import static org.mockito.Mockito.when;
public class DatafeedManagerTests extends ESTestCase {
private Client client;
private ActionFuture<PostDataAction.Response> jobDataFuture;
private ActionFuture<FlushJobAction.Response> flushJobFuture;
private ClusterService clusterService;
private ThreadPool threadPool;
private DataExtractorFactory dataExtractorFactory;
private DatafeedJob datafeedJob;
private DatafeedManager datafeedManager;
private long currentTime = 120000;
private Auditor auditor;
@ -99,7 +79,8 @@ public class DatafeedManagerTests extends ESTestCase {
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
Job job = createDatafeedJob().build(new Date());
mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeedConfig("datafeed_id", job.getId()).build());
DatafeedConfig datafeed = createDatafeedConfig("datafeed_id", job.getId()).build();
mlMetadata.putDatafeed(datafeed);
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
@ -117,26 +98,15 @@ public class DatafeedManagerTests extends ESTestCase {
ArgumentCaptor<XContentBuilder> argumentCaptor = ArgumentCaptor.forClass(XContentBuilder.class);
client = new MockClientBuilder("foo")
Client client = new MockClientBuilder("foo")
.prepareIndex(Auditor.NOTIFICATIONS_INDEX, AuditMessage.TYPE.getPreferredName(), "responseId", argumentCaptor)
.build();
jobDataFuture = mock(ActionFuture.class);
flushJobFuture = mock(ActionFuture.class);
DiscoveryNode dNode = mock(DiscoveryNode.class);
when(dNode.getName()).thenReturn("this_node_has_a_name");
when(clusterService.localNode()).thenReturn(dNode);
auditor = mock(Auditor.class);
JobProvider jobProvider = mock(JobProvider.class);
Mockito.doAnswer(invocationOnMock -> {
String jobId = (String) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
Consumer<DataCounts> handler = (Consumer<DataCounts>) invocationOnMock.getArguments()[1];
handler.accept(new DataCounts(jobId));
return null;
}).when(jobProvider).dataCounts(any(), any(), any());
dataExtractorFactory = mock(DataExtractorFactory.class);
auditor = mock(Auditor.class);
threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class);
@ -146,107 +116,60 @@ public class DatafeedManagerTests extends ESTestCase {
}).when(executorService).submit(any(Runnable.class));
when(threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME)).thenReturn(executorService);
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
when(client.execute(same(PostDataAction.INSTANCE), any())).thenReturn(jobDataFuture);
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
datafeedManager = new DatafeedManager(threadPool, client, clusterService, jobProvider, () -> currentTime, auditor,
persistentTasksService) {
@Override
DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) {
return dataExtractorFactory;
}
};
verify(clusterService).addListener(capturedClusterStateListener.capture());
datafeedJob = mock(DatafeedJob.class);
when(datafeedJob.isRunning()).thenReturn(true);
when(datafeedJob.stop()).thenReturn(true);
DatafeedJobBuilder datafeedJobBuilder = mock(DatafeedJobBuilder.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("rawtypes")
Consumer consumer = (Consumer) invocationOnMock.getArguments()[3];
consumer.accept(new ResourceNotFoundException("dummy"));
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onResponse(datafeedJob);
return null;
}).when(jobProvider).bucketsViaInternalClient(any(), any(), any(), any());
}).when(datafeedJobBuilder).build(any(), any(), any());
datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor,
persistentTasksService);
verify(clusterService).addListener(capturedClusterStateListener.capture());
}
public void testLookbackOnly_WarnsWhenNoDataIsRetrieved() throws Exception {
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenReturn(Optional.empty());
when(datafeedJob.runLookBack(0L, 60000L)).thenThrow(new DatafeedJob.EmptyDataCountException(0L));
Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedManager.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client, never()).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo")));
verify(client, never()).execute(same(FlushJobAction.INSTANCE), any());
verify(auditor).warning("job_id", "Datafeed lookback retrieved no data");
}
public void testStart_GivenNewlyCreatedJobLookback() throws Exception {
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
byte[] contentBytes = "".getBytes(StandardCharsets.UTF_8);
XContentType xContentType = XContentType.JSON;
InputStream in = new ByteArrayInputStream(contentBytes);
when(dataExtractor.next()).thenReturn(Optional.of(in));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
new Date(0), new Date(0), new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
when(datafeedJob.runLookBack(0L, 60000L)).thenReturn(null);
Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedManager.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client).execute(same(PostDataAction.INSTANCE),
eq(createExpectedPostDataRequest("job_id", contentBytes, xContentType)));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
}
private static PostDataAction.Request createExpectedPostDataRequest(String jobId, byte[] contentBytes, XContentType xContentType) {
DataDescription.Builder expectedDataDescription = new DataDescription.Builder();
expectedDataDescription.setTimeFormat("epoch_ms");
expectedDataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
PostDataAction.Request expectedPostDataRequest = new PostDataAction.Request(jobId);
expectedPostDataRequest.setDataDescription(expectedDataDescription.build());
expectedPostDataRequest.setContent(new BytesArray(contentBytes), xContentType);
return expectedPostDataRequest;
}
public void testStart_extractionProblem() throws Exception {
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenThrow(new RuntimeException("dummy"));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
new Date(0), new Date(0), new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
when(datafeedJob.runLookBack(0, 60000L)).thenThrow(new DatafeedJob.ExtractionProblemException(0L, new RuntimeException("dummy")));
Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedManager.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client, never()).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo")));
verify(client, never()).execute(same(FlushJobAction.INSTANCE), any());
verify(auditor, times(1)).error(eq("job_id"), anyString());
}
public void testStart_emptyDataCountException() throws Exception {
currentTime = 6000000;
Job.Builder jobBuilder = createDatafeedJob();
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "job_id").build();
Job job = jobBuilder.build(new Date());
MlMetadata mlMetadata = new MlMetadata.Builder()
.putJob(job, false)
.putDatafeed(datafeedConfig)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata))
.build());
int[] counter = new int[] {0};
doAnswer(invocationOnMock -> {
if (counter[0]++ < 10) {
@ -257,34 +180,21 @@ public class DatafeedManagerTests extends ESTestCase {
return mock(ScheduledFuture.class);
}).when(threadPool).schedule(any(), any(), any());
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(false);
when(datafeedJob.runLookBack(anyLong(), anyLong())).thenThrow(new DatafeedJob.EmptyDataCountException(0L));
when(datafeedJob.runRealtime()).thenThrow(new DatafeedJob.EmptyDataCountException(0L));
Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, null);
DatafeedManager.Holder holder = datafeedManager.createJobDatafeed(datafeedConfig, job, 100, 100, handler, task);
datafeedManager.doDatafeedRealtime(10L, "foo", holder);
datafeedManager.run(task, handler);
verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any());
verify(auditor, times(1)).warning(eq("job_id"), anyString());
verify(client, never()).execute(same(PostDataAction.INSTANCE), any());
verify(client, never()).execute(same(FlushJobAction.INSTANCE), any());
}
public void testRealTime_GivenPostAnalysisProblemIsConflict() throws Exception {
Exception conflictProblem = ExceptionsHelper.conflictStatusException("conflict");
when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(conflictProblem);
public void testRealTime_GivenStoppingAnalysisProblem() throws Exception {
Exception cause = new RuntimeException("stopping");
when(datafeedJob.runLookBack(anyLong(), anyLong())).thenThrow(new DatafeedJob.AnalysisProblemException(0L, true, cause));
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
byte[] contentBytes = "".getBytes(StandardCharsets.UTF_8);
InputStream in = new ByteArrayInputStream(contentBytes);
when(dataExtractor.next()).thenReturn(Optional.of(in));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
new Date(0), new Date(0), new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.DatafeedParams params = new StartDatafeedAction.DatafeedParams("datafeed_id", 0L);
DatafeedTask task = StartDatafeedActionTests.createDatafeedTask(1, "type", "action", null,
@ -295,25 +205,15 @@ public class DatafeedManagerTests extends ESTestCase {
ArgumentCaptor<DatafeedJob.AnalysisProblemException> analysisProblemCaptor =
ArgumentCaptor.forClass(DatafeedJob.AnalysisProblemException.class);
verify(handler).accept(analysisProblemCaptor.capture());
assertThat(analysisProblemCaptor.getValue().getCause(), equalTo(conflictProblem));
verify(auditor).error("job_id", "Datafeed is encountering errors submitting data for analysis: conflict");
assertThat(analysisProblemCaptor.getValue().getCause(), equalTo(cause));
verify(auditor).error("job_id", "Datafeed is encountering errors submitting data for analysis: stopping");
assertThat(datafeedManager.isRunning(task.getAllocationId()), is(false));
}
public void testRealTime_GivenPostAnalysisProblemIsNonConflict() throws Exception {
Exception nonConflictProblem = new RuntimeException("just runtime");
when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(nonConflictProblem);
public void testRealTime_GivenNonStoppingAnalysisProblem() throws Exception {
Exception cause = new RuntimeException("non-stopping");
when(datafeedJob.runLookBack(anyLong(), anyLong())).thenThrow(new DatafeedJob.AnalysisProblemException(0L, false, cause));
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
byte[] contentBytes = "".getBytes(StandardCharsets.UTF_8);
InputStream in = new ByteArrayInputStream(contentBytes);
when(dataExtractor.next()).thenReturn(Optional.of(in));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
new Date(0), new Date(0), new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.DatafeedParams params = new StartDatafeedAction.DatafeedParams("datafeed_id", 0L);
DatafeedTask task = StartDatafeedActionTests.createDatafeedTask(1, "type", "action", null,
@ -321,21 +221,14 @@ public class DatafeedManagerTests extends ESTestCase {
task = spyDatafeedTask(task);
datafeedManager.run(task, handler);
verify(auditor).error("job_id", "Datafeed is encountering errors submitting data for analysis: just runtime");
verify(auditor).error("job_id", "Datafeed is encountering errors submitting data for analysis: non-stopping");
assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true));
}
public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception {
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
byte[] contentBytes = "".getBytes(StandardCharsets.UTF_8);
InputStream in = new ByteArrayInputStream(contentBytes);
XContentType xContentType = XContentType.JSON;
when(dataExtractor.next()).thenReturn(Optional.of(in));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
new Date(0), new Date(0), new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
when(datafeedJob.runLookBack(anyLong(), anyLong())).thenReturn(1L);
when(datafeedJob.runRealtime()).thenReturn(1L);
Consumer<Exception> handler = mockConsumer();
boolean cancelled = randomBoolean();
StartDatafeedAction.DatafeedParams params = new StartDatafeedAction.DatafeedParams("datafeed_id", 0L);
@ -350,10 +243,7 @@ public class DatafeedManagerTests extends ESTestCase {
verify(handler).accept(null);
assertThat(datafeedManager.isRunning(task.getAllocationId()), is(false));
} else {
verify(client).execute(same(PostDataAction.INSTANCE),
eq(createExpectedPostDataRequest("job_id", contentBytes, xContentType)));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any());
verify(threadPool, times(1)).schedule(eq(new TimeValue(1)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any());
assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true));
}
}
@ -366,10 +256,6 @@ public class DatafeedManagerTests extends ESTestCase {
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
when(clusterService.state()).thenReturn(cs.build());
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenReturn(Optional.empty());
Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedManager.run(task, handler);
@ -410,10 +296,6 @@ public class DatafeedManagerTests extends ESTestCase {
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
when(clusterService.state()).thenReturn(cs.build());
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenReturn(Optional.empty());
Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedManager.run(task, handler);
@ -442,10 +324,6 @@ public class DatafeedManagerTests extends ESTestCase {
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
when(clusterService.state()).thenReturn(cs.build());
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenReturn(Optional.empty());
Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedManager.run(task, handler);

View File

@ -68,9 +68,6 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
if (randomBoolean()) {
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
builder.setSource(randomBoolean());
}
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}
@ -127,7 +124,6 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
update.setQuery(QueryBuilders.termQuery("a", "b"));
update.setScriptFields(Arrays.asList(new SearchSourceBuilder.ScriptField("a", mockScript("b"), false)));
update.setScrollSize(8000);
update.setSource(true);
update.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1)));
DatafeedConfig updatedDatafeed = update.build().apply(datafeed);
@ -142,7 +138,6 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
assertThat(updatedDatafeed.getScriptFields(),
equalTo(Arrays.asList(new SearchSourceBuilder.ScriptField("a", mockScript("b"), false))));
assertThat(updatedDatafeed.getScrollSize(), equalTo(8000));
assertThat(updatedDatafeed.isSource(), is(true));
assertThat(updatedDatafeed.getChunkingConfig(), equalTo(ChunkingConfig.newManual(TimeValue.timeValueHours(1))));
}

View File

@ -5,6 +5,10 @@
*/
package org.elasticsearch.xpack.ml.datafeed.extractor;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@ -20,17 +24,35 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.junit.Before;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class DataExtractorFactoryTests extends ESTestCase {
private FieldCapabilitiesResponse fieldsCapabilities;
private Client client;
@Before
public void setUpTests() {
client = mock(Client.class);
fieldsCapabilities = mock(FieldCapabilitiesResponse.class);
givenAggregatableField("time", "date");
givenAggregatableField("field", "keyword");
doAnswer(invocationMock -> {
@SuppressWarnings("raw_types")
ActionListener listener = (ActionListener) invocationMock.getArguments()[2];
listener.onResponse(fieldsCapabilities);
return null;
}).when(client).execute(same(FieldCapabilitiesAction.INSTANCE), any(), any());
}
public void testCreateDataExtractorFactoryGivenDefaultScroll() {
@ -40,10 +62,12 @@ public class DataExtractorFactoryTests extends ESTestCase {
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build();
DataExtractorFactory dataExtractorFactory =
DataExtractorFactory.create(client, datafeedConfig, jobBuilder.build(new Date()));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)),
e -> fail()
);
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
DataExtractorFactory.create(client, datafeedConfig, jobBuilder.build(new Date()), listener);
}
public void testCreateDataExtractorFactoryGivenScrollWithAutoChunk() {
@ -54,10 +78,12 @@ public class DataExtractorFactoryTests extends ESTestCase {
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
DataExtractorFactory dataExtractorFactory =
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)),
e -> fail()
);
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener);
}
public void testCreateDataExtractorFactoryGivenScrollWithOffChunk() {
@ -68,10 +94,12 @@ public class DataExtractorFactoryTests extends ESTestCase {
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
DataExtractorFactory dataExtractorFactory =
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ScrollDataExtractorFactory.class)),
e -> fail()
);
assertThat(dataExtractorFactory, instanceOf(ScrollDataExtractorFactory.class));
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener);
}
public void testCreateDataExtractorFactoryGivenDefaultAggregation() {
@ -83,10 +111,12 @@ public class DataExtractorFactoryTests extends ESTestCase {
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000)));
DataExtractorFactory dataExtractorFactory =
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)),
e -> fail()
);
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener);
}
public void testCreateDataExtractorFactoryGivenAggregationWithOffChunk() {
@ -99,10 +129,12 @@ public class DataExtractorFactoryTests extends ESTestCase {
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000)));
DataExtractorFactory dataExtractorFactory =
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(AggregationDataExtractorFactory.class)),
e -> fail()
);
assertThat(dataExtractorFactory, instanceOf(AggregationDataExtractorFactory.class));
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener);
}
public void testCreateDataExtractorFactoryGivenDefaultAggregationWithAutoChunk() {
@ -115,9 +147,20 @@ public class DataExtractorFactoryTests extends ESTestCase {
AggregationBuilders.histogram("time").interval(300000)));
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
DataExtractorFactory dataExtractorFactory =
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)),
e -> fail()
);
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener);
}
private void givenAggregatableField(String field, String type) {
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isSearchable()).thenReturn(true);
when(fieldCaps.isAggregatable()).thenReturn(true);
Map<String, FieldCapabilities> fieldCapsMap = new HashMap<>();
fieldCapsMap.put(type, fieldCaps);
when(fieldsCapabilities.getField(field)).thenReturn(fieldCapsMap);
}
}

View File

@ -5,15 +5,30 @@
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.test.SearchHitBuilder;
import org.joda.time.DateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ExtractedFieldsTests extends ESTestCase {
@ -79,4 +94,111 @@ public class ExtractedFieldsTests extends ESTestCase {
expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit));
}
public void testBuildGivenMixtureOfTypes() {
Job.Builder jobBuilder = new Job.Builder("foo");
jobBuilder.setDataDescription(new DataDescription.Builder());
Detector.Builder detector = new Detector.Builder("mean", "value");
detector.setByFieldName("airline");
detector.setOverFieldName("airport");
jobBuilder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector.build())));
DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("feed", jobBuilder.getId());
datafeedBuilder.setIndices(Collections.singletonList("foo"));
datafeedBuilder.setTypes(Collections.singletonList("doc"));
datafeedBuilder.setScriptFields(Collections.singletonList(new SearchSourceBuilder.ScriptField("airport", null, false)));
Map<String, FieldCapabilities> timeCaps = new HashMap<>();
timeCaps.put("date", createFieldCaps(true));
Map<String, FieldCapabilities> valueCaps = new HashMap<>();
valueCaps.put("float", createFieldCaps(true));
valueCaps.put("keyword", createFieldCaps(true));
Map<String, FieldCapabilities> airlineCaps = new HashMap<>();
airlineCaps.put("text", createFieldCaps(false));
FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class);
when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps);
when(fieldCapabilitiesResponse.getField("value")).thenReturn(valueCaps);
when(fieldCapabilitiesResponse.getField("airline")).thenReturn(airlineCaps);
ExtractedFields extractedFields = ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(),
fieldCapabilitiesResponse);
assertThat(extractedFields.timeField(), equalTo("time"));
assertThat(extractedFields.getDocValueFields().length, equalTo(2));
assertThat(extractedFields.getDocValueFields()[0], equalTo("time"));
assertThat(extractedFields.getDocValueFields()[1], equalTo("value"));
assertThat(extractedFields.getSourceFields().length, equalTo(1));
assertThat(extractedFields.getSourceFields()[0], equalTo("airline"));
assertThat(extractedFields.getAllFields().size(), equalTo(4));
}
public void testBuildGivenTimeFieldIsNotAggregatable() {
Job.Builder jobBuilder = new Job.Builder("foo");
jobBuilder.setDataDescription(new DataDescription.Builder());
Detector.Builder detector = new Detector.Builder("count", null);
jobBuilder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector.build())));
DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("feed", jobBuilder.getId());
datafeedBuilder.setIndices(Collections.singletonList("foo"));
datafeedBuilder.setTypes(Collections.singletonList("doc"));
Map<String, FieldCapabilities> timeCaps = new HashMap<>();
timeCaps.put("date", createFieldCaps(false));
FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class);
when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("datafeed [feed] cannot retrieve time field [time] because it is not aggregatable"));
}
public void testBuildGivenTimeFieldIsNotAggregatableInSomeIndices() {
Job.Builder jobBuilder = new Job.Builder("foo");
jobBuilder.setDataDescription(new DataDescription.Builder());
Detector.Builder detector = new Detector.Builder("count", null);
jobBuilder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector.build())));
DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("feed", jobBuilder.getId());
datafeedBuilder.setIndices(Collections.singletonList("foo"));
datafeedBuilder.setTypes(Collections.singletonList("doc"));
Map<String, FieldCapabilities> timeCaps = new HashMap<>();
timeCaps.put("date", createFieldCaps(true));
timeCaps.put("text", createFieldCaps(false));
FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class);
when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("datafeed [feed] cannot retrieve time field [time] because it is not aggregatable"));
}
public void testBuildGivenFieldWithoutMappings() {
Job.Builder jobBuilder = new Job.Builder("foo");
jobBuilder.setDataDescription(new DataDescription.Builder());
Detector.Builder detector = new Detector.Builder("max", "value");
jobBuilder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector.build())));
DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("feed", jobBuilder.getId());
datafeedBuilder.setIndices(Collections.singletonList("foo"));
datafeedBuilder.setTypes(Collections.singletonList("doc"));
Map<String, FieldCapabilities> timeCaps = new HashMap<>();
timeCaps.put("date", createFieldCaps(true));
FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class);
when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("datafeed [feed] cannot retrieve field [value] because it has no mappings"));
}
private static FieldCapabilities createFieldCaps(boolean isAggregatable) {
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(isAggregatable);
return fieldCaps;
}
}

View File

@ -13,6 +13,8 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
@ -26,6 +28,9 @@ import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
@ -84,11 +89,19 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
internalCluster().ensureAtLeastNumDataNodes(4);
ensureStableCluster(4);
Job.Builder job = createScheduledJob("fail-over-basics_with-data-feeder-job");
Detector.Builder d = new Detector.Builder("count", null);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
analysisConfig.setSummaryCountFieldName("doc_count");
analysisConfig.setBucketSpan(TimeValue.timeValueHours(1));
Job.Builder job = new Job.Builder("fail-over-basics_with-data-feeder-job");
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(new DataDescription.Builder());
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
DatafeedConfig.Builder configBuilder = createDatafeedBuilder("data_feed_id", job.getId(), Collections.singletonList("*"));
configBuilder.setAggregations(AggregatorFactories.builder().addAggregator(AggregationBuilders.histogram("time").interval(300000)));
configBuilder.setFrequency(TimeValue.timeValueMinutes(2));
DatafeedConfig config = configBuilder.build();
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
@ -105,6 +118,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
});
StartDatafeedAction.Request startDataFeedRequest = new StartDatafeedAction.Request(config.getId(), 0L);
client().execute(StartDatafeedAction.INSTANCE, startDataFeedRequest);
assertBusy(() -> {
GetDatafeedsStatsAction.Response statsResponse =
client().execute(GetDatafeedsStatsAction.INSTANCE, new GetDatafeedsStatsAction.Request(config.getId())).actionGet();

View File

@ -41,7 +41,7 @@ public class CategorizationIT extends MlNativeAutodetectIntegTestCase {
public void setUpData() throws IOException {
client().admin().indices().prepareCreate(DATA_INDEX)
.addMapping(DATA_TYPE, "time", "type=date,format=epoch_millis",
"msg", "type=keyword")
"msg", "type=text")
.get();
nowMillis = System.currentTimeMillis();

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.integration;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
@ -205,50 +206,47 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
}
public void testLookbackOnly() throws Exception {
new LookbackOnlyTestHelper("lookback-1", "airline-data").setShouldSucceedProcessing(true).execute();
new LookbackOnlyTestHelper("test-lookback-only", "airline-data").setShouldSucceedProcessing(true).execute();
}
public void testLookbackOnlyWithDatafeedSourceEnabled() throws Exception {
new LookbackOnlyTestHelper("lookback-2", "airline-data").setEnableDatafeedSource(true).execute();
}
public void testLookbackOnlyWithDocValuesDisabledAndDatafeedSourceDisabled() throws Exception {
new LookbackOnlyTestHelper("lookback-3", "airline-data-disabled-doc-values").setShouldSucceedInput(false)
.setShouldSucceedProcessing(false).execute();
// Assert the error notification for doc values is there
client().performRequest("post", ".ml-notifications/_refresh");
String query = "{\"query\":{\"bool\":{\"filter\":[{\"term\":{\"job_id\":\"lookback-3\"}}, {\"term\":{\"level\":\"error\"}}]}}}";
Response response = client().performRequest("get", ".ml-notifications/_search", Collections.emptyMap(),
new StringEntity(query, ContentType.APPLICATION_JSON));
assertThat(responseEntityToString(response), containsString("One or more fields do not have doc values; please enable doc values " +
"for all analysis fields or enable _source on the datafeed"));
}
public void testLookbackOnlyWithDocValuesDisabledAndDatafeedSourceEnabled() throws Exception {
new LookbackOnlyTestHelper("lookback-4", "airline-data-disabled-doc-values").setEnableDatafeedSource(true).execute();
public void testLookbackOnlyWithDocValuesDisabled() throws Exception {
new LookbackOnlyTestHelper("test-lookback-only-with-doc-values-disabled", "airline-data-disabled-doc-values").execute();
}
public void testLookbackOnlyWithSourceDisabled() throws Exception {
new LookbackOnlyTestHelper("lookback-5", "airline-data-disabled-source").execute();
new LookbackOnlyTestHelper("test-lookback-only-with-source-disabled", "airline-data-disabled-source").execute();
}
@AwaitsFix(bugUrl = "This test uses painless which is not available in the integTest phase")
public void testLookbackOnlyWithScriptFields() throws Exception {
new LookbackOnlyTestHelper("lookback-6", "airline-data-disabled-source").setAddScriptedFields(true).execute();
new LookbackOnlyTestHelper("test-lookback-only-with-script-fields", "airline-data-disabled-source")
.setAddScriptedFields(true).execute();
}
public void testLookbackOnlyWithNestedFieldsAndDatafeedSourceDisabled() throws Exception {
executeTestLookbackOnlyWithNestedFields("lookback-7", false);
}
public void testLookbackOnlyWithNestedFields() throws Exception {
String jobId = "test-lookback-only-with-nested-fields";
String job = "{\"description\":\"Nested job\", \"analysis_config\" : {\"bucket_span\":\"1h\",\"detectors\" :"
+ "[{\"function\":\"mean\",\"field_name\":\"responsetime.millis\"}]}, \"data_description\" : {\"time_field\":\"time\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),
new StringEntity(job, ContentType.APPLICATION_JSON));
public void testLookbackOnlyWithNestedFieldsAndDatafeedSourceEnabled() throws Exception {
executeTestLookbackOnlyWithNestedFields("lookback-8", true);
String datafeedId = jobId + "-datafeed";
new DatafeedBuilder(datafeedId, jobId, "nested-data", "response").build();
openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
}
public void testLookbackOnlyGivenEmptyIndex() throws Exception {
new LookbackOnlyTestHelper("lookback-9", "airline-data-empty").setShouldSucceedInput(false).setShouldSucceedProcessing(false)
.execute();
new LookbackOnlyTestHelper("test-lookback-only-given-empty-index", "airline-data-empty")
.setShouldSucceedInput(false).setShouldSucceedProcessing(false).execute();
}
public void testInsufficientSearchPrivilegesOnPut() throws Exception {
@ -299,7 +297,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
new BasicHeader("Authorization", BASIC_AUTH_VALUE_ML_ADMIN)));
assertThat(e.getMessage(),
containsString("[indices:data/read/search] is unauthorized for user [ml_admin]"));
containsString("[indices:data/read/field_caps] is unauthorized for user [ml_admin]"));
}
public void testLookbackOnlyGivenAggregationsWithHistogram() throws Exception {
@ -435,7 +433,6 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
private String jobId;
private String dataIndex;
private boolean addScriptedFields;
private boolean enableDatafeedSource;
private boolean shouldSucceedInput;
private boolean shouldSucceedProcessing;
@ -451,11 +448,6 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
return this;
}
public LookbackOnlyTestHelper setEnableDatafeedSource(boolean value) {
enableDatafeedSource = value;
return this;
}
public LookbackOnlyTestHelper setShouldSucceedInput(boolean value) {
shouldSucceedInput = value;
return this;
@ -470,7 +462,6 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
createJob(jobId);
String datafeedId = "datafeed-" + jobId;
new DatafeedBuilder(datafeedId, jobId, dataIndex, "response")
.setSource(enableDatafeedSource)
.setScriptedFields(addScriptedFields ?
"{\"airline\":{\"script\":{\"lang\":\"painless\",\"inline\":\"doc['airline'].value\"}}}" : null)
.build();
@ -547,26 +538,6 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
}
private void executeTestLookbackOnlyWithNestedFields(String jobId, boolean source) throws Exception {
String job = "{\"description\":\"Nested job\", \"analysis_config\" : {\"bucket_span\":\"1h\",\"detectors\" :"
+ "[{\"function\":\"mean\",\"field_name\":\"responsetime.millis\"}]}, \"data_description\" : {\"time_field\":\"time\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),
new StringEntity(job, ContentType.APPLICATION_JSON));
String datafeedId = jobId + "-datafeed";
new DatafeedBuilder(datafeedId, jobId, "nested-data", "response").setSource(source).build();
openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
}
@After
public void clearMlState() throws Exception {
new MlRestTestStateCleaner(logger, adminClient(), this).clearMlMetadata();

View File

@ -1,4 +1,30 @@
setup:
- do:
indices.create:
index: index-1
body:
settings:
index:
number_of_replicas: 1
mappings:
type-1:
properties:
time:
type: date
- do:
indices.create:
index: index-2
body:
settings:
index:
number_of_replicas: 1
mappings:
type-2:
properties:
time:
type: date
- do:
xpack.ml.put_job:
job_id: get-datafeed-stats-1
@ -31,22 +57,6 @@ setup:
}
}
- do:
indices.create:
index: index-1
body:
settings:
index:
number_of_replicas: 1
- do:
indices.create:
index: index-2
body:
settings:
index:
number_of_replicas: 1
- do:
xpack.ml.put_datafeed:
datafeed_id: datafeed-1

View File

@ -8,6 +8,12 @@ setup:
properties:
time:
type: date
airline:
type: keyword
airport:
type: text
responsetime:
type: float
- do:
xpack.ml.put_job:
@ -18,7 +24,8 @@ setup:
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}],
"influencers": ["airport"]
},
"data_description" : {
"format":"xcontent",
@ -194,6 +201,41 @@ setup:
job_id: "start-stop-datafeed-job"
- do:
catch: /No node found to start datafeed \[start-stop-datafeed-datafeed-1\].*\[utopia\] does not exist.*/
catch: /datafeed \[start-stop-datafeed-datafeed-1] cannot retrieve data because index \[utopia\] does not exist/
xpack.ml.start_datafeed:
datafeed_id: "start-stop-datafeed-datafeed-1"
---
"Test start given field without mappings":
- do:
xpack.ml.put_job:
job_id: start-stop-datafeed-job-field-without-mappings
body: >
{
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"count","by_field_name":"airline2"}]
},
"data_description" : {
"time_field":"time"
}
}
- do:
xpack.ml.put_datafeed:
datafeed_id: start-stop-datafeed-job-field-without-mappings-feed
body: >
{
"job_id":"start-stop-datafeed-job-field-without-mappings",
"indexes":"airline-data",
"types":"response"
}
- do:
xpack.ml.open_job:
job_id: "start-stop-datafeed-job-field-without-mappings"
- do:
catch: /datafeed \[start-stop-datafeed-job-field-without-mappings-feed] cannot retrieve field \[airline2\] because it has no mappings/
xpack.ml.start_datafeed:
datafeed_id: "start-stop-datafeed-job-field-without-mappings-feed"

View File

@ -11,6 +11,7 @@ minimal:
- indices:admin/get
- indices:admin/mapping/put
- indices:admin/refresh
- indices:data/read/field_caps
- indices:data/read/search
- indices:data/write/bulk
- indices:data/write/index