Add update datafeed endpoint

- Adds /_xpack/ml/datafeeds/{datafeed_id}/_update

Fixes elastic/elasticsearch#4954

Original commit: elastic/x-pack-elasticsearch@7a1887089d
This commit is contained in:
Dimitrios Athanasiou 2017-02-10 11:08:33 +00:00
parent 232f3e76a4
commit 9c60ee076b
16 changed files with 1110 additions and 38 deletions

View File

@ -62,6 +62,7 @@ import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.ml.action.UpdateJobAction; import org.elasticsearch.xpack.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction; import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.ml.action.UpdateProcessAction;
@ -92,6 +93,7 @@ import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedsAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestPutDatafeedAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestPutDatafeedAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestStartDatafeedAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestStartDatafeedAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestStopDatafeedAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestStopDatafeedAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestUpdateDatafeedAction;
import org.elasticsearch.xpack.ml.rest.filter.RestDeleteFilterAction; import org.elasticsearch.xpack.ml.rest.filter.RestDeleteFilterAction;
import org.elasticsearch.xpack.ml.rest.filter.RestGetFiltersAction; import org.elasticsearch.xpack.ml.rest.filter.RestGetFiltersAction;
import org.elasticsearch.xpack.ml.rest.filter.RestPutFilterAction; import org.elasticsearch.xpack.ml.rest.filter.RestPutFilterAction;
@ -293,6 +295,7 @@ public class MlPlugin extends Plugin implements ActionPlugin {
new RestGetDatafeedsAction(settings, restController), new RestGetDatafeedsAction(settings, restController),
new RestGetDatafeedStatsAction(settings, restController), new RestGetDatafeedStatsAction(settings, restController),
new RestPutDatafeedAction(settings, restController), new RestPutDatafeedAction(settings, restController),
new RestUpdateDatafeedAction(settings, restController),
new RestDeleteDatafeedAction(settings, restController), new RestDeleteDatafeedAction(settings, restController),
new RestStartDatafeedAction(settings, restController), new RestStartDatafeedAction(settings, restController),
new RestStopDatafeedAction(settings, restController), new RestStopDatafeedAction(settings, restController),
@ -330,6 +333,7 @@ public class MlPlugin extends Plugin implements ActionPlugin {
new ActionHandler<>(GetDatafeedsAction.INSTANCE, GetDatafeedsAction.TransportAction.class), new ActionHandler<>(GetDatafeedsAction.INSTANCE, GetDatafeedsAction.TransportAction.class),
new ActionHandler<>(GetDatafeedsStatsAction.INSTANCE, GetDatafeedsStatsAction.TransportAction.class), new ActionHandler<>(GetDatafeedsStatsAction.INSTANCE, GetDatafeedsStatsAction.TransportAction.class),
new ActionHandler<>(PutDatafeedAction.INSTANCE, PutDatafeedAction.TransportAction.class), new ActionHandler<>(PutDatafeedAction.INSTANCE, PutDatafeedAction.TransportAction.class),
new ActionHandler<>(UpdateDatafeedAction.INSTANCE, UpdateDatafeedAction.TransportAction.class),
new ActionHandler<>(DeleteDatafeedAction.INSTANCE, DeleteDatafeedAction.TransportAction.class), new ActionHandler<>(DeleteDatafeedAction.INSTANCE, DeleteDatafeedAction.TransportAction.class),
new ActionHandler<>(StartDatafeedAction.INSTANCE, StartDatafeedAction.TransportAction.class), new ActionHandler<>(StartDatafeedAction.INSTANCE, StartDatafeedAction.TransportAction.class),
new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class), new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class),

View File

@ -0,0 +1,179 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import java.io.IOException;
import java.util.Objects;
public class UpdateDatafeedAction extends Action<UpdateDatafeedAction.Request, PutDatafeedAction.Response,
UpdateDatafeedAction.RequestBuilder> {
public static final UpdateDatafeedAction INSTANCE = new UpdateDatafeedAction();
public static final String NAME = "cluster:admin/ml/datafeeds/update";
private UpdateDatafeedAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public PutDatafeedAction.Response newResponse() {
return new PutDatafeedAction.Response();
}
public static class Request extends AcknowledgedRequest<Request> implements ToXContent {
public static Request parseRequest(String datafeedId, XContentParser parser) {
DatafeedUpdate.Builder update = DatafeedUpdate.PARSER.apply(parser, null);
update.setId(datafeedId);
return new Request(update.build());
}
private DatafeedUpdate update;
public Request(DatafeedUpdate update) {
this.update = update;
}
Request() {
}
public DatafeedUpdate getUpdate() {
return update;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
update = new DatafeedUpdate(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
update.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
update.toXContent(builder, params);
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(update, request.update);
}
@Override
public int hashCode() {
return Objects.hash(update);
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, PutDatafeedAction.Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, UpdateDatafeedAction action) {
super(client, action, new Request());
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, PutDatafeedAction.Response> {
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, UpdateDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected PutDatafeedAction.Response newResponse() {
return new PutDatafeedAction.Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<PutDatafeedAction.Response> listener)
throws Exception {
clusterService.submitStateUpdateTask("update-datafeed-" + request.getUpdate().getId(),
new AckedClusterStateUpdateTask<PutDatafeedAction.Response>(request, listener) {
private DatafeedConfig updatedDatafeed;
@Override
protected PutDatafeedAction.Response newResponse(boolean acknowledged) {
if (acknowledged) {
logger.info("Updated datafeed [{}]", request.getUpdate().getId());
}
return new PutDatafeedAction.Response(acknowledged, updatedDatafeed);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
DatafeedUpdate update = request.getUpdate();
MlMetadata currentMetadata = state.getMetaData().custom(MlMetadata.TYPE);
PersistentTasksInProgress persistentTasksInProgress = state.custom(PersistentTasksInProgress.TYPE);
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
.updateDatafeed(update, persistentTasksInProgress).build();
updatedDatafeed = newMetadata.getDatafeed(update.getId());
return ClusterState.builder(currentState).metaData(
MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build()).build();
}
});
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}

View File

@ -177,22 +177,10 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
return frequency; return frequency;
} }
/**
* For the ELASTICSEARCH data source only, one or more indexes to search for
* input data.
*
* @return The indexes to search, or <code>null</code> if not set.
*/
public List<String> getIndexes() { public List<String> getIndexes() {
return indexes; return indexes;
} }
/**
* For the ELASTICSEARCH data source only, one or more types to search for
* input data.
*
* @return The types to search, or <code>null</code> if not set.
*/
public List<String> getTypes() { public List<String> getTypes() {
return types; return types;
} }
@ -213,6 +201,13 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
return aggregations; return aggregations;
} }
/**
* @return {@code true} when there are non-empty aggregations, {@code false} otherwise
*/
public boolean hasAggregations() {
return aggregations != null && aggregations.count() > 0;
}
public List<SearchSourceBuilder.ScriptField> getScriptFields() { public List<SearchSourceBuilder.ScriptField> getScriptFields() {
return scriptFields == null ? Collections.emptyList() : scriptFields; return scriptFields == null ? Collections.emptyList() : scriptFields;
} }

View File

@ -164,7 +164,7 @@ public class DatafeedJobRunner extends AbstractComponent {
} }
DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) { DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) {
boolean isScrollSearch = datafeedConfig.getAggregations() == null; boolean isScrollSearch = datafeedConfig.hasAggregations() == false;
DataExtractorFactory dataExtractorFactory = isScrollSearch ? new ScrollDataExtractorFactory(client, datafeedConfig, job) DataExtractorFactory dataExtractorFactory = isScrollSearch ? new ScrollDataExtractorFactory(client, datafeedConfig, job)
: new AggregationDataExtractorFactory(client, datafeedConfig, job); : new AggregationDataExtractorFactory(client, datafeedConfig, job);
ChunkingConfig chunkingConfig = datafeedConfig.getChunkingConfig(); ChunkingConfig chunkingConfig = datafeedConfig.getChunkingConfig();

View File

@ -23,7 +23,7 @@ public final class DatafeedJobValidator {
if (analysisConfig.getLatency() != null && analysisConfig.getLatency() > 0) { if (analysisConfig.getLatency() != null && analysisConfig.getLatency() > 0) {
throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY)); throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY));
} }
if (datafeedConfig.getAggregations() != null && !DatafeedConfig.DOC_COUNT.equals(analysisConfig.getSummaryCountFieldName())) { if (datafeedConfig.hasAggregations() && !DatafeedConfig.DOC_COUNT.equals(analysisConfig.getSummaryCountFieldName())) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
Messages.getMessage(Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, DatafeedConfig.DOC_COUNT)); Messages.getMessage(Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, DatafeedConfig.DOC_COUNT));
} }

View File

@ -0,0 +1,371 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
/**
* A datafeed update contains partial properties to update a {@link DatafeedConfig}.
* The main difference between this class and {@link DatafeedConfig} is that here all
* fields are nullable.
*/
public class DatafeedUpdate implements Writeable, ToXContent {
public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("datafeed_update", Builder::new);
static {
PARSER.declareString(Builder::setId, DatafeedConfig.ID);
PARSER.declareString(Builder::setJobId, Job.ID);
PARSER.declareStringArray(Builder::setIndexes, DatafeedConfig.INDEXES);
PARSER.declareStringArray(Builder::setTypes, DatafeedConfig.TYPES);
PARSER.declareLong(Builder::setQueryDelay, DatafeedConfig.QUERY_DELAY);
PARSER.declareLong(Builder::setFrequency, DatafeedConfig.FREQUENCY);
PARSER.declareObject(Builder::setQuery,
(p, c) -> new QueryParseContext(p).parseInnerQueryBuilder(), DatafeedConfig.QUERY);
PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(new QueryParseContext(p)),
DatafeedConfig.AGGREGATIONS);
PARSER.declareObject(Builder::setAggregations,(p, c) -> AggregatorFactories.parseAggregators(new QueryParseContext(p)),
DatafeedConfig.AGGS);
PARSER.declareObject(Builder::setScriptFields, (p, c) -> {
List<SearchSourceBuilder.ScriptField> parsedScriptFields = new ArrayList<>();
while (p.nextToken() != XContentParser.Token.END_OBJECT) {
parsedScriptFields.add(new SearchSourceBuilder.ScriptField(new QueryParseContext(p)));
}
parsedScriptFields.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName));
return parsedScriptFields;
}, DatafeedConfig.SCRIPT_FIELDS);
PARSER.declareInt(Builder::setScrollSize, DatafeedConfig.SCROLL_SIZE);
PARSER.declareBoolean(Builder::setSource, DatafeedConfig.SOURCE);
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, DatafeedConfig.CHUNKING_CONFIG);
}
private final String id;
private final String jobId;
private final Long queryDelay;
private final Long frequency;
private final List<String> indexes;
private final List<String> types;
private final QueryBuilder query;
private final AggregatorFactories.Builder aggregations;
private final List<SearchSourceBuilder.ScriptField> scriptFields;
private final Integer scrollSize;
private final Boolean source;
private final ChunkingConfig chunkingConfig;
private DatafeedUpdate(String id, String jobId, Long queryDelay, Long frequency, List<String> indexes, List<String> types,
QueryBuilder query, AggregatorFactories.Builder aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, Boolean source, ChunkingConfig chunkingConfig) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
this.frequency = frequency;
this.indexes = indexes;
this.types = types;
this.query = query;
this.aggregations = aggregations;
this.scriptFields = scriptFields;
this.scrollSize = scrollSize;
this.source = source;
this.chunkingConfig = chunkingConfig;
}
public DatafeedUpdate(StreamInput in) throws IOException {
this.id = in.readString();
this.jobId = in.readOptionalString();
this.queryDelay = in.readOptionalLong();
this.frequency = in.readOptionalLong();
if (in.readBoolean()) {
this.indexes = in.readList(StreamInput::readString);
} else {
this.indexes = null;
}
if (in.readBoolean()) {
this.types = in.readList(StreamInput::readString);
} else {
this.types = null;
}
this.query = in.readOptionalNamedWriteable(QueryBuilder.class);
this.aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new);
if (in.readBoolean()) {
this.scriptFields = in.readList(SearchSourceBuilder.ScriptField::new);
} else {
this.scriptFields = null;
}
this.scrollSize = in.readOptionalVInt();
this.source = in.readOptionalBoolean();
this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new);
}
/**
* Get the id of the datafeed to update
*/
public String getId() {
return id;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeOptionalString(jobId);
out.writeOptionalLong(queryDelay);
out.writeOptionalLong(frequency);
if (indexes != null) {
out.writeBoolean(true);
out.writeStringList(indexes);
} else {
out.writeBoolean(false);
}
if (types != null) {
out.writeBoolean(true);
out.writeStringList(types);
} else {
out.writeBoolean(false);
}
out.writeOptionalNamedWriteable(query);
out.writeOptionalWriteable(aggregations);
if (scriptFields != null) {
out.writeBoolean(true);
out.writeList(scriptFields);
} else {
out.writeBoolean(false);
}
out.writeOptionalVInt(scrollSize);
out.writeOptionalBoolean(source);
out.writeOptionalWriteable(chunkingConfig);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DatafeedConfig.ID.getPreferredName(), id);
addOptionalField(builder, Job.ID, jobId);
addOptionalField(builder, DatafeedConfig.QUERY_DELAY, queryDelay);
addOptionalField(builder, DatafeedConfig.FREQUENCY, frequency);
addOptionalField(builder, DatafeedConfig.INDEXES, indexes);
addOptionalField(builder, DatafeedConfig.TYPES, types);
addOptionalField(builder, DatafeedConfig.QUERY, query);
addOptionalField(builder, DatafeedConfig.AGGREGATIONS, aggregations);
if (scriptFields != null) {
builder.startObject(DatafeedConfig.SCRIPT_FIELDS.getPreferredName());
for (SearchSourceBuilder.ScriptField scriptField : scriptFields) {
scriptField.toXContent(builder, params);
}
builder.endObject();
}
addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize);
addOptionalField(builder, DatafeedConfig.SOURCE, source);
addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
builder.endObject();
return builder;
}
private void addOptionalField(XContentBuilder builder, ParseField field, Object value) throws IOException {
if (value != null) {
builder.field(field.getPreferredName(), value);
}
}
/**
* Applies the update to the given {@link DatafeedConfig}
* @return a new {@link DatafeedConfig} that contains the update
*/
public DatafeedConfig apply(DatafeedConfig datafeedConfig) {
if (id.equals(datafeedConfig.getId()) == false) {
throw new IllegalArgumentException("Cannot apply update to datafeedConfig with different id");
}
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedConfig);
if (jobId != null) {
builder.setJobId(jobId);
}
if (queryDelay != null) {
builder.setQueryDelay(queryDelay);
}
if (frequency != null) {
builder.setFrequency(frequency);
}
if (indexes != null) {
builder.setIndexes(indexes);
}
if (types != null) {
builder.setTypes(types);
}
if (query != null) {
builder.setQuery(query);
}
if (aggregations != null) {
builder.setAggregations(aggregations);
}
if (scriptFields != null) {
builder.setScriptFields(scriptFields);
}
if (scrollSize != null) {
builder.setScrollSize(scrollSize);
}
if (source != null) {
builder.setSource(source);
}
if (chunkingConfig != null) {
builder.setChunkingConfig(chunkingConfig);
}
return builder.build();
}
/**
* The lists of indexes and types are compared for equality but they are not
* sorted first so this test could fail simply because the indexes and types
* lists are in different orders.
*/
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof DatafeedUpdate == false) {
return false;
}
DatafeedUpdate that = (DatafeedUpdate) other;
return Objects.equals(this.id, that.id)
&& Objects.equals(this.jobId, that.jobId)
&& Objects.equals(this.frequency, that.frequency)
&& Objects.equals(this.queryDelay, that.queryDelay)
&& Objects.equals(this.indexes, that.indexes)
&& Objects.equals(this.types, that.types)
&& Objects.equals(this.query, that.query)
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(this.aggregations, that.aggregations)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.source, that.source)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
}
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, aggregations, scriptFields, source,
chunkingConfig);
}
@Override
public String toString() {
return Strings.toString(this);
}
public static class Builder {
private String id;
private String jobId;
private Long queryDelay;
private Long frequency;
private List<String> indexes;
private List<String> types;
private QueryBuilder query;
private AggregatorFactories.Builder aggregations;
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize;
private Boolean source;
private ChunkingConfig chunkingConfig;
public Builder() {
}
public Builder(String id) {
this.id = ExceptionsHelper.requireNonNull(id, DatafeedConfig.ID.getPreferredName());
}
public Builder(DatafeedUpdate config) {
this.id = config.id;
this.jobId = config.jobId;
this.queryDelay = config.queryDelay;
this.frequency = config.frequency;
this.indexes = config.indexes;
this.types = config.types;
this.query = config.query;
this.aggregations = config.aggregations;
this.scriptFields = config.scriptFields;
this.scrollSize = config.scrollSize;
this.source = config.source;
this.chunkingConfig = config.chunkingConfig;
}
public void setId(String datafeedId) {
id = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
}
public void setJobId(String jobId) {
this.jobId = jobId;
}
public void setIndexes(List<String> indexes) {
this.indexes = indexes;
}
public void setTypes(List<String> types) {
this.types = types;
}
public void setQueryDelay(long queryDelay) {
this.queryDelay = queryDelay;
}
public void setFrequency(long frequency) {
this.frequency = frequency;
}
public void setQuery(QueryBuilder query) {
this.query = query;
}
public void setAggregations(AggregatorFactories.Builder aggregations) {
this.aggregations = aggregations;
}
public void setScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
List<SearchSourceBuilder.ScriptField> sorted = new ArrayList<>(scriptFields);
sorted.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName));
this.scriptFields = sorted;
}
public void setScrollSize(int scrollSize) {
this.scrollSize = scrollSize;
}
public void setSource(boolean enabled) {
this.source = enabled;
}
public void setChunkingConfig(ChunkingConfig chunkingConfig) {
this.chunkingConfig = chunkingConfig;
}
public DatafeedUpdate build() {
return new DatafeedUpdate(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize,
source, chunkingConfig);
}
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
@ -43,6 +44,7 @@ import java.util.Optional;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.function.Supplier;
public class MlMetadata implements MetaData.Custom { public class MlMetadata implements MetaData.Custom {
@ -254,6 +256,14 @@ public class MlMetadata implements MetaData.Custom {
throw new ResourceAlreadyExistsException("A datafeed with id [" + datafeedConfig.getId() + "] already exists"); throw new ResourceAlreadyExistsException("A datafeed with id [" + datafeedConfig.getId() + "] already exists");
} }
String jobId = datafeedConfig.getJobId(); String jobId = datafeedConfig.getJobId();
checkJobIsAvailableForDatafeed(jobId);
Job job = jobs.get(jobId);
DatafeedJobValidator.validate(datafeedConfig, job);
datafeeds.put(datafeedConfig.getId(), datafeedConfig);
return this;
}
private void checkJobIsAvailableForDatafeed(String jobId) {
Job job = jobs.get(jobId); Job job = jobs.get(jobId);
if (job == null) { if (job == null) {
throw ExceptionsHelper.missingJobException(jobId); throw ExceptionsHelper.missingJobException(jobId);
@ -263,9 +273,23 @@ public class MlMetadata implements MetaData.Custom {
throw ExceptionsHelper.conflictStatusException("A datafeed [" + existingDatafeed.get().getId() throw ExceptionsHelper.conflictStatusException("A datafeed [" + existingDatafeed.get().getId()
+ "] already exists for job [" + jobId + "]"); + "] already exists for job [" + jobId + "]");
} }
DatafeedJobValidator.validate(datafeedConfig, job); }
datafeeds.put(datafeedConfig.getId(), datafeedConfig); public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksInProgress persistentTasksInProgress) {
String datafeedId = update.getId();
DatafeedConfig oldDatafeedConfig = datafeeds.get(datafeedId);
if (oldDatafeedConfig == null) {
throw ExceptionsHelper.missingDatafeedException(datafeedId);
}
checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE, datafeedId,
DatafeedState.STARTED), datafeedId, persistentTasksInProgress);
DatafeedConfig newDatafeedConfig = update.apply(oldDatafeedConfig);
if (newDatafeedConfig.getJobId().equals(oldDatafeedConfig.getJobId()) == false) {
checkJobIsAvailableForDatafeed(newDatafeedConfig.getJobId());
}
Job job = jobs.get(newDatafeedConfig.getJobId());
DatafeedJobValidator.validate(newDatafeedConfig, job);
datafeeds.put(datafeedId, newDatafeedConfig);
return this; return this;
} }
@ -274,17 +298,8 @@ public class MlMetadata implements MetaData.Custom {
if (datafeed == null) { if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(datafeedId); throw ExceptionsHelper.missingDatafeedException(datafeedId);
} }
if (persistentTasksInProgress != null) { checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, datafeedId,
Predicate<PersistentTaskInProgress<?>> predicate = t -> { DatafeedState.STARTED), datafeedId, persistentTasksInProgress);
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest();
return storedRequest.getDatafeedId().equals(datafeedId);
};
if (persistentTasksInProgress.tasksExist(StartDatafeedAction.NAME, predicate)) {
String msg = Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, datafeedId,
DatafeedState.STARTED);
throw ExceptionsHelper.conflictStatusException(msg);
}
}
datafeeds.remove(datafeedId); datafeeds.remove(datafeedId);
return this; return this;
} }
@ -293,6 +308,18 @@ public class MlMetadata implements MetaData.Custom {
return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst(); return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst();
} }
private void checkDatafeedIsStopped(Supplier<String> msg, String datafeedId, PersistentTasksInProgress persistentTasksInProgress) {
if (persistentTasksInProgress != null) {
Predicate<PersistentTaskInProgress<?>> predicate = t -> {
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest();
return storedRequest.getDatafeedId().equals(datafeedId);
};
if (persistentTasksInProgress.tasksExist(StartDatafeedAction.NAME, predicate)) {
throw ExceptionsHelper.conflictStatusException(msg.get());
}
}
}
private Builder putJobs(Collection<Job> jobs) { private Builder putJobs(Collection<Job> jobs) {
for (Job job : jobs) { for (Job job : jobs) {
putJob(job, true); putJob(job, true);

View File

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.datafeeds;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import java.io.IOException;
public class RestUpdateDatafeedAction extends BaseRestHandler {
public RestUpdateDatafeedAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.POST, MlPlugin.BASE_PATH + "datafeeds/{"
+ DatafeedConfig.ID.getPreferredName() + "}/_update", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName());
XContentParser parser = restRequest.contentParser();
UpdateDatafeedAction.Request updateDatafeedRequest = UpdateDatafeedAction.Request.parseRequest(datafeedId, parser);
return channel -> client.execute(UpdateDatafeedAction.INSTANCE, updateDatafeedRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.action.UpdateDatafeedAction.Request;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests;
import org.elasticsearch.xpack.ml.datafeed.DatafeedUpdateTests;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
import org.junit.Before;
public class UpdateDatafeedActionRequestTests extends AbstractStreamableXContentTestCase<Request> {
private String datafeedId;
@Before
public void setUpDatafeedId() {
datafeedId = DatafeedConfigTests.randomValidDatafeedId();
}
@Override
protected Request createTestInstance() {
return new Request(DatafeedUpdateTests.createRandomized(datafeedId));
}
@Override
protected Request createBlankInstance() {
return new Request();
}
@Override
protected Request parseInstance(XContentParser parser) {
return Request.parseRequest(datafeedId, parser);
}
}

View File

@ -25,6 +25,7 @@ import java.util.List;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedConfig> { public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedConfig> {
@ -40,13 +41,17 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
if (randomBoolean()) { if (randomBoolean()) {
builder.setQuery(QueryBuilders.termQuery(randomAsciiOfLength(10), randomAsciiOfLength(10))); builder.setQuery(QueryBuilders.termQuery(randomAsciiOfLength(10), randomAsciiOfLength(10)));
} }
int scriptsSize = randomInt(3); boolean addScriptFields = randomBoolean();
List<SearchSourceBuilder.ScriptField> scriptFields = new ArrayList<>(scriptsSize); if (addScriptFields) {
for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) { int scriptsSize = randomInt(3);
scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)), List<SearchSourceBuilder.ScriptField> scriptFields = new ArrayList<>(scriptsSize);
randomBoolean())); for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) {
scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)),
randomBoolean()));
}
builder.setScriptFields(scriptFields);
} }
if (randomBoolean() && scriptsSize == 0) { if (randomBoolean() && addScriptFields == false) {
// can only test with a single agg as the xcontent order gets randomized by test base class and then // can only test with a single agg as the xcontent order gets randomized by test base class and then
// the actual xcontent isn't the same and test fail. // the actual xcontent isn't the same and test fail.
// Testing with a single agg is ok as we don't have special list writeable / xconent logic // Testing with a single agg is ok as we don't have special list writeable / xconent logic
@ -54,7 +59,6 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
aggs.addAggregator(AggregationBuilders.avg(randomAsciiOfLength(10)).field(randomAsciiOfLength(10))); aggs.addAggregator(AggregationBuilders.avg(randomAsciiOfLength(10)).field(randomAsciiOfLength(10)));
builder.setAggregations(aggs); builder.setAggregations(aggs);
} }
builder.setScriptFields(scriptFields);
if (randomBoolean()) { if (randomBoolean()) {
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE)); builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
} }
@ -73,7 +77,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
return builder.build(); return builder.build();
} }
private static List<String> randomStringList(int min, int max) { public static List<String> randomStringList(int min, int max) {
int size = scaledRandomIntBetween(min, max); int size = scaledRandomIntBetween(min, max);
List<String> list = new ArrayList<>(); List<String> list = new ArrayList<>();
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
@ -257,6 +261,35 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
assertThat(e.getMessage(), equalTo("script_fields cannot be used in combination with aggregations")); assertThat(e.getMessage(), equalTo("script_fields cannot be used in combination with aggregations"));
} }
public void testHasAggregations_GivenNull() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndexes(Arrays.asList("myIndex"));
builder.setTypes(Arrays.asList("myType"));
DatafeedConfig datafeedConfig = builder.build();
assertThat(datafeedConfig.hasAggregations(), is(false));
}
public void testHasAggregations_GivenEmpty() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndexes(Arrays.asList("myIndex"));
builder.setTypes(Arrays.asList("myType"));
builder.setAggregations(new AggregatorFactories.Builder());
DatafeedConfig datafeedConfig = builder.build();
assertThat(datafeedConfig.hasAggregations(), is(false));
}
public void testHasAggregations_NonEmpty() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndexes(Arrays.asList("myIndex"));
builder.setTypes(Arrays.asList("myType"));
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo")));
DatafeedConfig datafeedConfig = builder.build();
assertThat(datafeedConfig.hasAggregations(), is(true));
}
public void testDomainSplitInjection() { public void testDomainSplitInjection() {
DatafeedConfig.Builder datafeed = new DatafeedConfig.Builder("datafeed1", "job1"); DatafeedConfig.Builder datafeed = new DatafeedConfig.Builder("datafeed1", "job1");
datafeed.setIndexes(Arrays.asList("my_index")); datafeed.setIndexes(Arrays.asList("my_index"));

View File

@ -14,6 +14,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.mock.orig.Mockito; import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -254,13 +255,13 @@ public class DatafeedJobRunnerTests extends ESTestCase {
assertThat(dataExtractorFactory, instanceOf(ScrollDataExtractorFactory.class)); assertThat(dataExtractorFactory, instanceOf(ScrollDataExtractorFactory.class));
} }
public void testCreateDataExtractorFactoryGivenDefaultAggregation() { public void testCreateDataExtractorFactoryGivenAggregation() {
DataDescription.Builder dataDescription = new DataDescription.Builder(); DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time"); dataDescription.setTimeField("time");
Job.Builder jobBuilder = createDatafeedJob(); Job.Builder jobBuilder = createDatafeedJob();
jobBuilder.setDataDescription(dataDescription); jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo"); DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setAggregations(AggregatorFactories.builder()); datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(AggregationBuilders.avg("a")));
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime); DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime);
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build()); DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build());

View File

@ -0,0 +1,165 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpdate> {
@Override
protected DatafeedUpdate createTestInstance() {
return createRandomized(DatafeedConfigTests.randomValidDatafeedId());
}
public static DatafeedUpdate createRandomized(String datafeedId) {
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(datafeedId);
if (randomBoolean()) {
builder.setJobId(randomAsciiOfLength(10));
}
if (randomBoolean()) {
builder.setQueryDelay(randomNonNegativeLong());
}
if (randomBoolean()) {
builder.setFrequency(randomNonNegativeLong());
}
if (randomBoolean()) {
builder.setIndexes(DatafeedConfigTests.randomStringList(1, 10));
}
if (randomBoolean()) {
builder.setTypes(DatafeedConfigTests.randomStringList(1, 10));
}
if (randomBoolean()) {
builder.setQuery(QueryBuilders.termQuery(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
if (randomBoolean()) {
int scriptsSize = randomInt(3);
List<SearchSourceBuilder.ScriptField> scriptFields = new ArrayList<>(scriptsSize);
for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) {
scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)),
randomBoolean()));
}
builder.setScriptFields(scriptFields);
}
if (randomBoolean()) {
// can only test with a single agg as the xcontent order gets randomized by test base class and then
// the actual xcontent isn't the same and test fail.
// Testing with a single agg is ok as we don't have special list writeable / xconent logic
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
aggs.addAggregator(AggregationBuilders.avg(randomAsciiOfLength(10)).field(randomAsciiOfLength(10)));
builder.setAggregations(aggs);
}
if (randomBoolean()) {
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
builder.setSource(randomBoolean());
}
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}
return builder.build();
}
@Override
protected Writeable.Reader<DatafeedUpdate> instanceReader() {
return DatafeedUpdate::new;
}
@Override
protected DatafeedUpdate parseInstance(XContentParser parser) {
return DatafeedUpdate.PARSER.apply(parser, null).build();
}
public void testApply_failBecauseTargetDatafeedHasDifferentId() {
DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo");
expectThrows(IllegalArgumentException.class, () -> createRandomized(datafeed.getId() + "_2").apply(datafeed));
}
public void testApply_givenEmptyUpdate() {
DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo");
DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId()).build().apply(datafeed);
assertThat(datafeed, equalTo(updatedDatafeed));
}
public void testApply_givenPartialUpdate() {
DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo");
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeed.getId());
update.setScrollSize(datafeed.getScrollSize() + 1);
DatafeedUpdate.Builder updated = new DatafeedUpdate.Builder(datafeed.getId());
updated.setScrollSize(datafeed.getScrollSize() + 1);
DatafeedConfig updatedDatafeed = update.build().apply(datafeed);
DatafeedConfig.Builder expectedDatafeed = new DatafeedConfig.Builder(datafeed);
expectedDatafeed.setScrollSize(datafeed.getScrollSize() + 1);
assertThat(updatedDatafeed, equalTo(expectedDatafeed.build()));
}
public void testApply_givenFullUpdateNoAggregations() {
DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("foo", "foo-feed");
datafeedBuilder.setIndexes(Arrays.asList("i_1"));
datafeedBuilder.setTypes(Arrays.asList("t_1"));
DatafeedConfig datafeed = datafeedBuilder.build();
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeed.getId());
update.setJobId("bar");
update.setIndexes(Arrays.asList("i_2"));
update.setTypes(Arrays.asList("t_2"));
update.setQueryDelay(42L);
update.setFrequency(142L);
update.setQuery(QueryBuilders.termQuery("a", "b"));
update.setScriptFields(Arrays.asList(new SearchSourceBuilder.ScriptField("a", new Script("b"), false)));
update.setScrollSize(8000);
update.setSource(true);
update.setChunkingConfig(ChunkingConfig.newManual(3600L));
DatafeedConfig updatedDatafeed = update.build().apply(datafeed);
assertThat(updatedDatafeed.getJobId(), equalTo("bar"));
assertThat(updatedDatafeed.getIndexes(), equalTo(Arrays.asList("i_2")));
assertThat(updatedDatafeed.getTypes(), equalTo(Arrays.asList("t_2")));
assertThat(updatedDatafeed.getQueryDelay(), equalTo(42L));
assertThat(updatedDatafeed.getFrequency(), equalTo(142L));
assertThat(updatedDatafeed.getQuery(), equalTo(QueryBuilders.termQuery("a", "b")));
assertThat(updatedDatafeed.hasAggregations(), is(false));
assertThat(updatedDatafeed.getScriptFields(),
equalTo(Arrays.asList(new SearchSourceBuilder.ScriptField("a", new Script("b"), false))));
assertThat(updatedDatafeed.getScrollSize(), equalTo(8000));
assertThat(updatedDatafeed.isSource(), is(true));
assertThat(updatedDatafeed.getChunkingConfig(), equalTo(ChunkingConfig.newManual(3600L)));
}
public void testApply_givenAggregations() {
DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("foo", "foo-feed");
datafeedBuilder.setIndexes(Arrays.asList("i_1"));
datafeedBuilder.setTypes(Arrays.asList("t_1"));
DatafeedConfig datafeed = datafeedBuilder.build();
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeed.getId());
update.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("a")));
DatafeedConfig updatedDatafeed = update.build().apply(datafeed);
assertThat(updatedDatafeed.getIndexes(), equalTo(Arrays.asList("i_1")));
assertThat(updatedDatafeed.getTypes(), equalTo(Arrays.asList("t_1")));
assertThat(updatedDatafeed.getAggregations(),
equalTo(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("a"))));
}
}

View File

@ -19,6 +19,7 @@ import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests;
import org.elasticsearch.xpack.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
@ -47,7 +48,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
Job job = JobTests.createRandomizedJob(); Job job = JobTests.createRandomizedJob();
if (randomBoolean()) { if (randomBoolean()) {
DatafeedConfig datafeedConfig = DatafeedConfigTests.createRandomizedDatafeedConfig(job.getId()); DatafeedConfig datafeedConfig = DatafeedConfigTests.createRandomizedDatafeedConfig(job.getId());
if (datafeedConfig.getAggregations() != null) { if (datafeedConfig.hasAggregations()) {
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job.getAnalysisConfig().getDetectors()); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job.getAnalysisConfig().getDetectors());
analysisConfig.setSummaryCountFieldName("doc_count"); analysisConfig.setSummaryCountFieldName("doc_count");
Job.Builder jobBuilder = new Job.Builder(job); Job.Builder jobBuilder = new Job.Builder(job);
@ -236,6 +237,90 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
expectThrows(IllegalArgumentException.class, () -> builder.putDatafeed(datafeedConfig1)); expectThrows(IllegalArgumentException.class, () -> builder.putDatafeed(datafeedConfig1));
} }
public void testUpdateDatafeed() {
Job job1 = createDatafeedJob().build();
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1, false);
builder.putDatafeed(datafeedConfig1);
MlMetadata beforeMetadata = builder.build();
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId());
update.setScrollSize(5000);
MlMetadata updatedMetadata = new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null).build();
DatafeedConfig updatedDatafeed = updatedMetadata.getDatafeed(datafeedConfig1.getId());
assertThat(updatedDatafeed.getJobId(), equalTo(datafeedConfig1.getJobId()));
assertThat(updatedDatafeed.getIndexes(), equalTo(datafeedConfig1.getIndexes()));
assertThat(updatedDatafeed.getTypes(), equalTo(datafeedConfig1.getTypes()));
assertThat(updatedDatafeed.getScrollSize(), equalTo(5000));
}
public void testUpdateDatafeed_failBecauseDatafeedDoesNotExist() {
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder("foo");
update.setScrollSize(5000);
expectThrows(ResourceNotFoundException.class, () -> new MlMetadata.Builder().updateDatafeed(update.build(), null).build());
}
public void testUpdateDatafeed_failBecauseDatafeedIsNotStopped() {
Job job1 = createDatafeedJob().build();
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1, false);
builder.putDatafeed(datafeedConfig1);
MlMetadata beforeMetadata = builder.build();
StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L);
PersistentTaskInProgress<StartDatafeedAction.Request> taskInProgress =
new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, null);
PersistentTasksInProgress tasksInProgress =
new PersistentTasksInProgress(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId());
update.setScrollSize(5000);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), tasksInProgress));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
}
public void testUpdateDatafeed_failBecauseNewJobIdDoesNotExist() {
Job job1 = createDatafeedJob().build();
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1, false);
builder.putDatafeed(datafeedConfig1);
MlMetadata beforeMetadata = builder.build();
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId());
update.setJobId(job1.getId() + "_2");
expectThrows(ResourceNotFoundException.class,
() -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null));
}
public void testUpdateDatafeed_failBecauseNewJobHasAnotherDatafeedAttached() {
Job job1 = createDatafeedJob().build();
Job.Builder job2 = new Job.Builder(job1);
job2.setId(job1.getId() + "_2");
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
DatafeedConfig datafeedConfig2 = createDatafeedConfig("datafeed2", job2.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1, false);
builder.putJob(job2.build(), false);
builder.putDatafeed(datafeedConfig1);
builder.putDatafeed(datafeedConfig2);
MlMetadata beforeMetadata = builder.build();
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId());
update.setJobId(job2.getId());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
assertThat(e.getMessage(), equalTo("A datafeed [datafeed2] already exists for job [foo_2]"));
}
public void testRemoveDatafeed_failBecauseDatafeedStarted() { public void testRemoveDatafeed_failBecauseDatafeedStarted() {
Job job1 = createDatafeedJob().build(); Job job1 = createDatafeedJob().build();
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();

View File

@ -113,6 +113,7 @@ cluster:admin/ml/anomaly_detectors/model_snapshots/get
cluster:admin/ml/anomaly_detectors/results/records/get cluster:admin/ml/anomaly_detectors/results/records/get
cluster:admin/ml/anomaly_detectors/results/influencers/get cluster:admin/ml/anomaly_detectors/results/influencers/get
cluster:admin/ml/datafeeds/put cluster:admin/ml/datafeeds/put
cluster:admin/ml/datafeeds/update
cluster:admin/ml/anomaly_detectors/model_snapshots/delete cluster:admin/ml/anomaly_detectors/model_snapshots/delete
cluster:admin/ml/anomaly_detectors/validate/detector cluster:admin/ml/anomaly_detectors/validate/detector
cluster:admin/ml/anomaly_detectors/validate cluster:admin/ml/anomaly_detectors/validate

View File

@ -0,0 +1,20 @@
{
"xpack.ml.update_datafeed": {
"methods": [ "POST" ],
"url": {
"path": "/_xpack/ml/datafeeds/{datafeed_id}/_update",
"paths": [ "/_xpack/ml/datafeeds/{datafeed_id}/_update" ],
"parts": {
"datafeed_id": {
"type": "string",
"required": true,
"description": "The ID of the datafeed to update"
}
}
},
"body": {
"description" : "The datafeed update settings",
"required" : true
}
}
}

View File

@ -133,6 +133,121 @@ setup:
"query":{"match_all_mispelled":{}} "query":{"match_all_mispelled":{}}
} }
---
"Test update datafeed":
- do:
xpack.ml.put_datafeed:
datafeed_id: test-datafeed-1
body: >
{
"job_id":"job-1",
"indexes":["index-foo"],
"types":["type-bar"],
"scroll_size": 2000
}
- do:
xpack.ml.update_datafeed:
datafeed_id: test-datafeed-1
body: >
{
"indexes":["index-*"],
"scroll_size": 10000
}
- match: { datafeed_id: "test-datafeed-1" }
- match: { job_id: "job-1" }
- match: { indexes: ["index-*"] }
- match: { types: ["type-bar"] }
- match: { scroll_size: 10000 }
---
"Test update datafeed to point to different job":
- do:
xpack.ml.put_datafeed:
datafeed_id: test-datafeed-1
body: >
{
"job_id":"job-1",
"indexes":["index-foo"],
"types":["type-bar"],
"scroll_size": 2000
}
- do:
xpack.ml.update_datafeed:
datafeed_id: test-datafeed-1
body: >
{
"job_id": "job-2"
}
- match: { datafeed_id: "test-datafeed-1" }
- match: { job_id: "job-2" }
- match: { indexes: ["index-foo"] }
- match: { types: ["type-bar"] }
---
"Test update datafeed with missing id":
- do:
catch: /resource_not_found_exception/
xpack.ml.update_datafeed:
datafeed_id: a-missing-datafeed
body: >
{}
---
"Test update datafeed to point to missing job":
- do:
xpack.ml.put_datafeed:
datafeed_id: test-datafeed-1
body: >
{
"job_id":"job-1",
"indexes":["index-foo"],
"types":["type-bar"],
"scroll_size": 2000
}
- do:
catch: /resource_not_found_exception/
xpack.ml.update_datafeed:
datafeed_id: test-datafeed-1
body: >
{
"job_id": "job-3"
}
---
"Test update datafeed to point to job already attached to another datafeed":
- do:
xpack.ml.put_datafeed:
datafeed_id: test-datafeed-1
body: >
{
"job_id":"job-1",
"indexes":["index-foo"],
"types":["type-bar"]
}
- do:
xpack.ml.put_datafeed:
datafeed_id: test-datafeed-2
body: >
{
"job_id":"job-2",
"indexes":["index-foo"],
"types":["type-bar"]
}
- do:
catch: /A datafeed \[test-datafeed-2\] already exists for job \[job-2\]/
xpack.ml.update_datafeed:
datafeed_id: test-datafeed-1
body: >
{
"job_id": "job-2"
}
--- ---
"Test delete datafeed with missing id": "Test delete datafeed with missing id":
- do: - do: