[ML] Add support for mutli-job/multi-datafeed APIs (elastic/x-pack-elasticsearch#2079)

This commit enhances job/datafeed APIs that support acting
on multiple jobs/datafeeds at once so that they accept
expressions that may contain comma-separated lists or
wildcard patterns.

The APIs that are enhances are:

  - get jobs API
  - get job stats API
  - close job API
  - get datafeeds API
  - get datafeed stats API
  - stop datafeed API

relates elastic/x-pack-elasticsearch#1876

Original commit: elastic/x-pack-elasticsearch@45a1139d97
This commit is contained in:
Dimitris Athanasiou 2017-08-02 11:10:06 +01:00 committed by GitHub
parent c09430f3bf
commit 0125a332a1
45 changed files with 1003 additions and 291 deletions

View File

@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
@ -224,7 +225,7 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
);
// Step 1. Extract usage from jobs stats and then request stats for all datafeeds
GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(Job.ALL);
GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(MetaData.ALL);
ActionListener<GetJobsStatsAction.Response> jobStatsListener = ActionListener.wrap(
response -> {
addJobsUsage(response);

View File

@ -31,6 +31,7 @@ import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.NameResolver;
import org.elasticsearch.xpack.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -42,6 +43,7 @@ import java.util.EnumSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Supplier;
@ -74,6 +76,11 @@ public class MlMetadata implements MetaData.Custom {
return jobs;
}
public Set<String> expandJobIds(String expression, boolean allowNoJobs) {
return NameResolver.newUnaliased(jobs.keySet(), jobId -> ExceptionsHelper.missingJobException(jobId))
.expand(expression, allowNoJobs);
}
public boolean isJobDeleted(String jobId) {
Job job = jobs.get(jobId);
return job == null || job.isDeleted();
@ -91,6 +98,11 @@ public class MlMetadata implements MetaData.Custom {
return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst();
}
public Set<String> expandDatafeedIds(String expression, boolean allowNoDatafeeds) {
return NameResolver.newUnaliased(datafeeds.keySet(), datafeedId -> ExceptionsHelper.missingDatafeedException(datafeedId))
.expand(expression, allowNoDatafeeds);
}
@Override
public Version getMinimalSupportedVersion() {
return Version.V_5_4_0;

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
@ -58,7 +59,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@ -89,6 +89,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField FORCE = new ParseField("force");
public static final ParseField ALLOW_NO_JOBS = new ParseField("allow_no_jobs");
public static ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
static {
@ -96,6 +97,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
PARSER.declareString((request, val) ->
request.setCloseTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareBoolean(Request::setForce, FORCE);
PARSER.declareBoolean(Request::setAllowNoJobs, ALLOW_NO_JOBS);
}
public static Request parseRequest(String jobId, XContentParser parser) {
@ -108,6 +110,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
private String jobId;
private boolean force = false;
private boolean allowNoJobs = true;
// A big state can take a while to persist. For symmetry with the _open endpoint any
// changes here should be reflected there too.
private TimeValue timeout = MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT;
@ -149,6 +152,14 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
this.force = force;
}
public boolean allowNoJobs() {
return allowNoJobs;
}
public void setAllowNoJobs(boolean allowNoJobs) {
this.allowNoJobs = allowNoJobs;
}
public void setLocal(boolean local) {
this.local = local;
}
@ -165,6 +176,9 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
force = in.readBoolean();
openJobIds = in.readStringArray();
local = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoJobs = in.readBoolean();
}
}
@Override
@ -175,6 +189,9 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
out.writeBoolean(force);
out.writeStringArray(openJobIds);
out.writeBoolean(local);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeBoolean(allowNoJobs);
}
}
@Override
@ -194,6 +211,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
builder.field(FORCE.getPreferredName(), force);
builder.field(ALLOW_NO_JOBS.getPreferredName(), allowNoJobs);
builder.endObject();
return builder;
}
@ -201,7 +219,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
@Override
public int hashCode() {
// openJobIds are excluded
return Objects.hash(jobId, timeout, force);
return Objects.hash(jobId, timeout, force, allowNoJobs);
}
@Override
@ -216,7 +234,8 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
// openJobIds are excluded
return Objects.equals(jobId, other.jobId) &&
Objects.equals(timeout, other.timeout) &&
Objects.equals(force, other.force);
Objects.equals(force, other.force) &&
Objects.equals(allowNoJobs, other.allowNoJobs);
}
}
@ -337,7 +356,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
List<String> openJobIds = new ArrayList<>();
List<String> closingJobIds = new ArrayList<>();
resolveAndValidateJobId(request.getJobId(), state, openJobIds, closingJobIds, request.isForce());
resolveAndValidateJobId(request, state, openJobIds, closingJobIds);
request.setOpenJobIds(openJobIds.toArray(new String[0]));
if (openJobIds.isEmpty() && closingJobIds.isEmpty()) {
listener.onResponse(new Response(true));
@ -552,19 +571,17 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
}
/**
* Expand the {@code jobId} parameter and add the job Id to one of the list arguments
* Resolve the requested jobs and add their IDs to one of the list arguments
* depending on job state.
*
* Opened jobs are added to {@code openJobIds} and closing jobs added to {@code closingJobIds}. Failed jobs are added
* to {@code openJobIds} if allowFailed is set otherwise an exception is thrown.
* @param jobId The job Id. If jobId == {@link Job#ALL} then expand the job list.
* @param request The close job request
* @param state Cluster state
* @param openJobIds Opened or failed jobs are added to this list
* @param closingJobIds Closing jobs are added to this list
* @param allowFailed Whether failed jobs are allowed, if yes, they are added to {@code openJobIds}
*/
static void resolveAndValidateJobId(String jobId, ClusterState state, List<String> openJobIds, List<String> closingJobIds,
boolean allowFailed) {
static void resolveAndValidateJobId(Request request, ClusterState state, List<String> openJobIds, List<String> closingJobIds) {
PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata maybeNull = state.metaData().custom(MlMetadata.TYPE);
final MlMetadata mlMetadata = (maybeNull == null) ? MlMetadata.EMPTY_METADATA : maybeNull;
@ -580,24 +597,14 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
addJobAccordingToState(id, tasksMetaData, openJobIds, closingJobIds, failedJobs);
};
if (!Job.ALL.equals(jobId)) {
if (mlMetadata.getJobs().containsKey(jobId) == false) {
throw ExceptionsHelper.missingJobException(jobId);
}
jobIdProcessor.accept(jobId);
if (allowFailed == false && failedJobs.size() > 0) {
throw ExceptionsHelper.conflictStatusException("cannot close job [{}] because it failed, use force close", jobId);
}
} else {
for (Map.Entry<String, Job> jobEntry : mlMetadata.getJobs().entrySet()) {
jobIdProcessor.accept(jobEntry.getKey());
}
if (allowFailed == false && failedJobs.size() > 0) {
throw ExceptionsHelper.conflictStatusException("one or more jobs have state failed, use force close");
Set<String> expandedJobIds = mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs());
expandedJobIds.stream().forEach(jobIdProcessor::accept);
if (request.isForce() == false && failedJobs.size() > 0) {
if (expandedJobIds.size() == 1) {
throw ExceptionsHelper.conflictStatusException("cannot close job [{}] because it failed, use force close",
expandedJobIds.iterator().next());
}
throw ExceptionsHelper.conflictStatusException("one or more jobs have state failed, use force close");
}
// allowFailed == true

View File

@ -25,7 +25,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
@ -170,13 +169,11 @@ public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.
ModelSnapshot deleteCandidate = deleteCandidates.get(0);
// Verify the snapshot is not being used
QueryPage<Job> job = jobManager.getJob(request.getJobId(), clusterService.state());
if (job.count() > 0) {
String currentModelInUse = job.results().get(0).getModelSnapshotId();
if (currentModelInUse != null && currentModelInUse.equals(request.getSnapshotId())) {
throw new IllegalArgumentException(Messages.getMessage(Messages.REST_CANNOT_DELETE_HIGHEST_PRIORITY,
request.getSnapshotId(), request.getJobId()));
}
Job job = JobManager.getJobOrThrowIfUnknown(request.getJobId(), clusterService.state());
String currentModelInUse = job.getModelSnapshotId();
if (currentModelInUse != null && currentModelInUse.equals(request.getSnapshotId())) {
throw new IllegalArgumentException(Messages.getMessage(Messages.REST_CANNOT_DELETE_HIGHEST_PRIORITY,
request.getSnapshotId(), request.getJobId()));
}
// Delete the snapshot and any associated state files

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
@ -19,6 +20,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -35,9 +37,9 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
public class GetDatafeedsAction extends Action<GetDatafeedsAction.Request, GetDatafeedsAction.Response,
GetDatafeedsAction.RequestBuilder> {
@ -63,7 +65,10 @@ public class GetDatafeedsAction extends Action<GetDatafeedsAction.Request, GetDa
public static class Request extends MasterNodeReadRequest<Request> {
public static final ParseField ALLOW_NO_DATAFEEDS = new ParseField("allow_no_datafeeds");
private String datafeedId;
private boolean allowNoDatafeeds = true;
public Request(String datafeedId) {
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
@ -75,6 +80,14 @@ public class GetDatafeedsAction extends Action<GetDatafeedsAction.Request, GetDa
return datafeedId;
}
public boolean allowNoDatafeeds() {
return allowNoDatafeeds;
}
public void setAllowNoDatafeeds(boolean allowNoDatafeeds) {
this.allowNoDatafeeds = allowNoDatafeeds;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -84,17 +97,23 @@ public class GetDatafeedsAction extends Action<GetDatafeedsAction.Request, GetDa
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
datafeedId = in.readString();
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoDatafeeds = in.readBoolean();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(datafeedId);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeBoolean(allowNoDatafeeds);
}
}
@Override
public int hashCode() {
return Objects.hash(datafeedId);
return Objects.hash(datafeedId, allowNoDatafeeds);
}
@Override
@ -106,7 +125,7 @@ public class GetDatafeedsAction extends Action<GetDatafeedsAction.Request, GetDa
return false;
}
Request other = (Request) obj;
return Objects.equals(datafeedId, other.datafeedId);
return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(allowNoDatafeeds, other.allowNoDatafeeds);
}
}
@ -197,23 +216,17 @@ public class GetDatafeedsAction extends Action<GetDatafeedsAction.Request, GetDa
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
logger.debug("Get datafeed '{}'", request.getDatafeedId());
QueryPage<DatafeedConfig> response;
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
if (mlMetadata == null) {
mlMetadata = MlMetadata.EMPTY_METADATA;
}
if (ALL.equals(request.getDatafeedId())) {
List<DatafeedConfig> datafeedConfigs = new ArrayList<>(mlMetadata.getDatafeeds().values());
response = new QueryPage<>(datafeedConfigs, datafeedConfigs.size(), DatafeedConfig.RESULTS_FIELD);
} else {
DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId());
}
response = new QueryPage<>(Collections.singletonList(datafeed), 1, DatafeedConfig.RESULTS_FIELD);
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());
List<DatafeedConfig> datafeedConfigs = new ArrayList<>();
for (String expandedDatafeedId : expandedDatafeedIds) {
datafeedConfigs.add(mlMetadata.getDatafeed(expandedDatafeedId));
}
listener.onResponse(new Response(response));
listener.onResponse(new Response(new QueryPage<>(datafeedConfigs, datafeedConfigs.size(), DatafeedConfig.RESULTS_FIELD)));
}
@Override

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
@ -21,6 +22,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -42,10 +44,10 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Request, GetDatafeedsStatsAction.Response,
@ -73,7 +75,10 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
public static class Request extends MasterNodeReadRequest<Request> {
public static final ParseField ALLOW_NO_DATAFEEDS = new ParseField("allow_no_datafeeds");
private String datafeedId;
private boolean allowNoDatafeeds = true;
public Request(String datafeedId) {
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
@ -85,6 +90,14 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
return datafeedId;
}
public boolean allowNoDatafeeds() {
return allowNoDatafeeds;
}
public void setAllowNoDatafeeds(boolean allowNoDatafeeds) {
this.allowNoDatafeeds = allowNoDatafeeds;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -94,17 +107,23 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
datafeedId = in.readString();
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoDatafeeds = in.readBoolean();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(datafeedId);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeBoolean(allowNoDatafeeds);
}
}
@Override
public int hashCode() {
return Objects.hash(datafeedId);
return Objects.hash(datafeedId, allowNoDatafeeds);
}
@Override
@ -116,7 +135,7 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
return false;
}
Request other = (Request) obj;
return Objects.equals(datafeedId, other.datafeedId);
return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(allowNoDatafeeds, other.allowNoDatafeeds);
}
}
@ -309,18 +328,10 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
mlMetadata = MlMetadata.EMPTY_METADATA;
}
if (request.getDatafeedId().equals(ALL) == false
&& mlMetadata.getDatafeed(request.getDatafeedId()) == null) {
throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId());
}
List<String> expandedDatafeedsIds = request.getDatafeedId().equals(ALL) ?
mlMetadata.getDatafeeds().values().stream()
.map(d -> d.getId()).collect(Collectors.toList())
: Collections.singletonList(request.getDatafeedId());
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());
PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
List<DatafeedStats> results = expandedDatafeedsIds.stream()
List<DatafeedStats> results = expandedDatafeedIds.stream()
.map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress))
.collect(Collectors.toList());
QueryPage<DatafeedStats> statsPage = new QueryPage<>(results, results.size(),

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
@ -19,6 +20,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -57,7 +59,10 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
public static class Request extends MasterNodeReadRequest<Request> {
public static final ParseField ALLOW_NO_JOBS = new ParseField("allow_no_jobs");
private String jobId;
private boolean allowNoJobs = true;
public Request(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
@ -65,10 +70,18 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
Request() {}
public void setAllowNoJobs(boolean allowNoJobs) {
this.allowNoJobs = allowNoJobs;
}
public String getJobId() {
return jobId;
}
public boolean allowNoJobs() {
return allowNoJobs;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -78,17 +91,23 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoJobs = in.readBoolean();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeBoolean(allowNoJobs);
}
}
@Override
public int hashCode() {
return Objects.hash(jobId);
return Objects.hash(jobId, allowNoJobs);
}
@Override
@ -100,7 +119,7 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId);
return Objects.equals(jobId, other.jobId) && Objects.equals(allowNoJobs, other.allowNoJobs);
}
}
@ -195,7 +214,7 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
logger.debug("Get job '{}'", request.getJobId());
QueryPage<Job> jobs = jobManager.getJob(request.getJobId(), state);
QueryPage<Job> jobs = jobManager.expandJobs(request.getJobId(), request.allowNoJobs(), state);
listener.onResponse(new Response(jobs));
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
@ -18,9 +19,11 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
@ -88,7 +91,10 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
public static class Request extends BaseTasksRequest<Request> {
public static final ParseField ALLOW_NO_JOBS = new ParseField("allow_no_jobs");
private String jobId;
private boolean allowNoJobs = true;
// used internally to expand _all jobid to encapsulate all jobs in cluster:
private List<String> expandedJobsIds;
@ -100,13 +106,21 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
Request() {}
public void setAllowNoJobs(boolean allowNoJobs) {
this.allowNoJobs = allowNoJobs;
}
public String getJobId() {
return jobId;
}
public boolean allowNoJobs() {
return allowNoJobs;
}
@Override
public boolean match(Task task) {
return jobId.equals(Job.ALL) || OpenJobAction.JobTask.match(task, jobId);
return jobId.equals(MetaData.ALL) || OpenJobAction.JobTask.match(task, jobId);
}
@Override
@ -119,6 +133,9 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
super.readFrom(in);
jobId = in.readString();
expandedJobsIds = in.readList(StreamInput::readString);
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoJobs = in.readBoolean();
}
}
@Override
@ -126,11 +143,14 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
super.writeTo(out);
out.writeString(jobId);
out.writeStringList(expandedJobsIds);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeBoolean(allowNoJobs);
}
}
@Override
public int hashCode() {
return Objects.hash(jobId);
return Objects.hash(jobId, allowNoJobs);
}
@Override
@ -142,7 +162,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId);
return Objects.equals(jobId, other.jobId) && Objects.equals(allowNoJobs, other.allowNoJobs);
}
}
@ -378,15 +398,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
MlMetadata clusterMlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
MlMetadata mlMetadata = (clusterMlMetadata == null) ? MlMetadata.EMPTY_METADATA : clusterMlMetadata;
if (Job.ALL.equals(request.getJobId())) {
request.expandedJobsIds = new ArrayList<>(mlMetadata.getJobs().keySet());
} else {
if (mlMetadata.getJobs().containsKey(request.getJobId()) == false) {
throw ExceptionsHelper.missingJobException(request.getJobId());
}
}
request.expandedJobsIds = new ArrayList<>(mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs()));
ActionListener<Response> finalListener = listener;
listener = ActionListener.wrap(response -> gatherStatsForClosedJobs(mlMetadata,
request, response, finalListener), listener::onFailure);

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
@ -285,9 +284,9 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}",
request.getSnapshotId(), request.getJobId(), request.getDeleteInterveningResults());
QueryPage<Job> job = jobManager.getJob(request.getJobId(), clusterService.state());
JobState jobState = jobManager.getJobState(request.getJobId());
if (job.count() > 0 && jobState.equals(JobState.CLOSED) == false) {
Job job = JobManager.getJobOrThrowIfUnknown(request.getJobId(), clusterService.state());
JobState jobState = jobManager.getJobState(job.getId());
if (jobState.equals(JobState.CLOSED) == false) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT));
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
@ -45,7 +46,6 @@ import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
@ -67,8 +67,6 @@ public class StopDatafeedAction
public static final StopDatafeedAction INSTANCE = new StopDatafeedAction();
public static final String NAME = "cluster:admin/xpack/ml/datafeed/stop";
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField FORCE = new ParseField("force");
public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes(5);
private StopDatafeedAction() {
@ -87,6 +85,10 @@ public class StopDatafeedAction
public static class Request extends BaseTasksRequest<Request> implements ToXContentObject {
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField FORCE = new ParseField("force");
public static final ParseField ALLOW_NO_DATAFEEDS = new ParseField("allow_no_datafeeds");
public static ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
static {
@ -94,6 +96,7 @@ public class StopDatafeedAction
PARSER.declareString((request, val) ->
request.setStopTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareBoolean(Request::setForce, FORCE);
PARSER.declareBoolean(Request::setAllowNoDatafeeds, ALLOW_NO_DATAFEEDS);
}
public static Request fromXContent(XContentParser parser) {
@ -112,6 +115,7 @@ public class StopDatafeedAction
private String[] resolvedStartedDatafeedIds;
private TimeValue stopTimeout = DEFAULT_TIMEOUT;
private boolean force = false;
private boolean allowNoDatafeeds = true;
public Request(String datafeedId) {
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
@ -149,6 +153,14 @@ public class StopDatafeedAction
this.force = force;
}
public boolean allowNoDatafeeds() {
return allowNoDatafeeds;
}
public void setAllowNoDatafeeds(boolean allowNoDatafeeds) {
this.allowNoDatafeeds = allowNoDatafeeds;
}
@Override
public boolean match(Task task) {
for (String id : resolvedStartedDatafeedIds) {
@ -172,6 +184,9 @@ public class StopDatafeedAction
resolvedStartedDatafeedIds = in.readStringArray();
stopTimeout = new TimeValue(in);
force = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoDatafeeds = in.readBoolean();
}
}
@Override
@ -181,11 +196,14 @@ public class StopDatafeedAction
out.writeStringArray(resolvedStartedDatafeedIds);
stopTimeout.writeTo(out);
out.writeBoolean(force);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeBoolean(allowNoDatafeeds);
}
}
@Override
public int hashCode() {
return Objects.hash(datafeedId, stopTimeout, force);
return Objects.hash(datafeedId, stopTimeout, force, allowNoDatafeeds);
}
@Override
@ -194,6 +212,7 @@ public class StopDatafeedAction
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
builder.field(TIMEOUT.getPreferredName(), stopTimeout.getStringRep());
builder.field(FORCE.getPreferredName(), force);
builder.field(ALLOW_NO_DATAFEEDS.getPreferredName(), allowNoDatafeeds);
builder.endObject();
return builder;
}
@ -209,7 +228,8 @@ public class StopDatafeedAction
Request other = (Request) obj;
return Objects.equals(datafeedId, other.datafeedId) &&
Objects.equals(stopTimeout, other.stopTimeout) &&
Objects.equals(force, other.force);
Objects.equals(force, other.force) &&
Objects.equals(allowNoDatafeeds, other.allowNoDatafeeds);
}
}
@ -287,7 +307,7 @@ public class StopDatafeedAction
List<String> startedDatafeeds = new ArrayList<>();
List<String> stoppingDatafeeds = new ArrayList<>();
resolveDataFeedIds(request.getDatafeedId(), mlMetadata, tasks, startedDatafeeds, stoppingDatafeeds);
resolveDataFeedIds(request, mlMetadata, tasks, startedDatafeeds, stoppingDatafeeds);
if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) {
listener.onResponse(new Response(true));
return;
@ -475,32 +495,22 @@ public class StopDatafeedAction
}
/**
* Expand the {@code datafeedId} parameter and add the resolved datafeed Id to
* one of the list arguments depending on datafeed state.
* Resolve the requested datafeeds and add their IDs to one of the list
* arguments depending on datafeed state.
*
* @param datafeedId Datafeed Id. If datafeedId == "_all" then expand the datafeed list
* @param request The stop datafeed request
* @param mlMetadata ML Metadata
* @param tasks Persistent task meta data
* @param startedDatafeedIds Started datafeed ids are added to this list
* @param stoppingDatafeedIds Stopping datafeed ids are added to this list
*/
static void resolveDataFeedIds(String datafeedId, MlMetadata mlMetadata,
PersistentTasksCustomMetaData tasks,
List<String> startedDatafeedIds,
List<String> stoppingDatafeedIds) {
static void resolveDataFeedIds(Request request, MlMetadata mlMetadata,
PersistentTasksCustomMetaData tasks,
List<String> startedDatafeedIds,
List<String> stoppingDatafeedIds) {
if (!Job.ALL.equals(datafeedId)) {
validateDatafeedTask(datafeedId, mlMetadata);
addDatafeedTaskIdAccordingToState(datafeedId, MlMetadata.getDatafeedState(datafeedId, tasks),
startedDatafeedIds, stoppingDatafeedIds);
return;
}
if (mlMetadata.getDatafeeds().isEmpty()) {
return;
}
for (String expandedDatafeedId : mlMetadata.getDatafeeds().keySet()) {
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());
for (String expandedDatafeedId : expandedDatafeedIds) {
validateDatafeedTask(expandedDatafeedId, mlMetadata);
addDatafeedTaskIdAccordingToState(expandedDatafeedId, MlMetadata.getDatafeedState(expandedDatafeedId, tasks),
startedDatafeedIds, stoppingDatafeedIds);

View File

@ -58,7 +58,7 @@ public abstract class TransportJobTaskAction<Request extends TransportJobTaskAct
// We need to check whether there is at least an assigned task here, otherwise we cannot redirect to the
// node running the job task.
ClusterState state = clusterService.state();
JobManager.getJobOrThrowIfUnknown(state, jobId);
JobManager.getJobOrThrowIfUnknown(jobId, state);
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask == null || jobTask.isAssigned() == false) {

View File

@ -17,6 +17,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
@ -29,7 +30,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import java.io.IOException;
@ -158,8 +158,8 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
@Override
protected void masterOperation(Request request, ClusterState state,
ActionListener<PutJobAction.Response> listener) throws Exception {
if (request.getJobId().equals(Job.ALL)) {
throw new IllegalArgumentException("Job Id " + Job.ALL + " cannot be for update");
if (request.getJobId().equals(MetaData.ALL)) {
throw new IllegalArgumentException("Job Id " + MetaData.ALL + " cannot be for update");
}
jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, listener);

View File

@ -36,16 +36,14 @@ import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Allows interactions with jobs. The managed interactions include:
@ -78,81 +76,26 @@ public class JobManager extends AbstractComponent {
}
/**
* Get the jobs that match the given {@code jobId}.
* Note that when the {@code jocId} is {@link Job#ALL} all jobs are returned.
* Gets the job that matches the given {@code jobId}.
*
* @param jobId
* the jobId
* @return A {@link QueryPage} containing the matching {@code Job}s
*/
public QueryPage<Job> getJob(String jobId, ClusterState clusterState) {
if (jobId.equals(Job.ALL)) {
return getJobs(clusterState);
}
MlMetadata mlMetadata = clusterState.getMetaData().custom(MlMetadata.TYPE);
Job job = (mlMetadata == null) ? null : mlMetadata.getJobs().get(jobId);
if (job == null) {
logger.debug(String.format(Locale.ROOT, "Cannot find job '%s'", jobId));
throw ExceptionsHelper.missingJobException(jobId);
}
logger.debug("Returning job [" + jobId + "]");
return new QueryPage<>(Collections.singletonList(job), 1, Job.RESULTS_FIELD);
}
/**
* Get details of all Jobs.
*
* @return A query page object with hitCount set to the total number of jobs
* not the only the number returned here as determined by the
* <code>size</code> parameter.
*/
public QueryPage<Job> getJobs(ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.getMetaData().custom(MlMetadata.TYPE);
if (mlMetadata == null) {
mlMetadata = MlMetadata.EMPTY_METADATA;
}
List<Job> jobs = mlMetadata.getJobs().entrySet().stream()
.map(Map.Entry::getValue)
.collect(Collectors.toList());
return new QueryPage<>(jobs, mlMetadata.getJobs().size(), Job.RESULTS_FIELD);
}
/**
* Returns the non-null {@code Job} object for the given
* {@code jobId} or throws
* {@link org.elasticsearch.ResourceNotFoundException}
*
* @param jobId
* the jobId
* @return the {@code Job} if a job with the given {@code jobId}
* exists
* @throws org.elasticsearch.ResourceNotFoundException
* if there is no job with matching the given {@code jobId}
* @param jobId the jobId
* @return The {@link Job} matching the given {code jobId}
* @throws ResourceNotFoundException if no job matches {@code jobId}
*/
public Job getJobOrThrowIfUnknown(String jobId) {
return getJobOrThrowIfUnknown(clusterService.state(), jobId);
}
public JobState getJobState(String jobId) {
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
return MlMetadata.getJobState(jobId, tasks);
return getJobOrThrowIfUnknown(jobId, clusterService.state());
}
/**
* Returns the non-null {@code Job} object for the given
* {@code jobId} or throws
* {@link org.elasticsearch.ResourceNotFoundException}
* Gets the job that matches the given {@code jobId}.
*
* @param jobId
* the jobId
* @return the {@code Job} if a job with the given {@code jobId}
* exists
* @throws org.elasticsearch.ResourceNotFoundException
* if there is no job with matching the given {@code jobId}
* @param jobId the jobId
* @param clusterState the cluster state
* @return The {@link Job} matching the given {code jobId}
* @throws ResourceNotFoundException if no job matches {@code jobId}
*/
public static Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
public static Job getJobOrThrowIfUnknown(String jobId, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.getMetaData().custom(MlMetadata.TYPE);
Job job = (mlMetadata == null) ? null : mlMetadata.getJobs().get(jobId);
if (job == null) {
throw ExceptionsHelper.missingJobException(jobId);
@ -160,6 +103,34 @@ public class JobManager extends AbstractComponent {
return job;
}
/**
* Get the jobs that match the given {@code expression}.
* Note that when the {@code jobId} is {@link MetaData#ALL} all jobs are returned.
*
* @param expression the jobId or an expression matching jobIds
* @param clusterState the cluster state
* @param allowNoJobs if {@code false}, an error is thrown when no job matches the {@code jobId}
* @return A {@link QueryPage} containing the matching {@code Job}s
*/
public QueryPage<Job> expandJobs(String expression, boolean allowNoJobs, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.getMetaData().custom(MlMetadata.TYPE);
if (mlMetadata == null) {
mlMetadata = MlMetadata.EMPTY_METADATA;
}
Set<String> expandedJobIds = mlMetadata.expandJobIds(expression, allowNoJobs);
List<Job> jobs = new ArrayList<>();
for (String expandedJobId : expandedJobIds) {
jobs.add(mlMetadata.getJobs().get(expandedJobId));
}
logger.debug("Returning jobs matching [" + expression + "]");
return new QueryPage<>(jobs, jobs.size(), Job.RESULTS_FIELD);
}
public JobState getJobState(String jobId) {
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
return MlMetadata.getJobState(jobId, tasks);
}
/**
* Stores a job in the cluster state
*/
@ -263,7 +234,7 @@ public class JobManager extends AbstractComponent {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Job job = getJob(jobId, currentState).results().get(0);
Job job = getJobOrThrowIfUnknown(jobId, currentState);
updatedJob = jobUpdate.mergeWithJob(job);
return updateClusterState(updatedJob, true, currentState);
}
@ -384,7 +355,7 @@ public class JobManager extends AbstractComponent {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Job job = getJobOrThrowIfUnknown(currentState, request.getJobId());
Job job = getJobOrThrowIfUnknown(request.getJobId(), currentState);
Job.Builder builder = new Job.Builder(job);
builder.setModelSnapshotId(modelSnapshot.getSnapshotId());
return updateClusterState(builder.build(), true, currentState);

View File

@ -77,8 +77,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
// Used for QueryPage
public static final ParseField RESULTS_FIELD = new ParseField("jobs");
public static final String ALL = "_all";
// These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly
public static final ObjectParser<Builder, Void> METADATA_PARSER = new ObjectParser<>("job_details", true, Builder::new);
public static final ObjectParser<Builder, Void> CONFIG_PARSER = new ObjectParser<>("job_details", false, Builder::new);

View File

@ -40,6 +40,8 @@ public class RestGetDatafeedStatsAction extends BaseRestHandler {
datafeedId = GetDatafeedsStatsAction.ALL;
}
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);
request.setAllowNoDatafeeds(restRequest.paramAsBoolean(GetDatafeedsStatsAction.Request.ALLOW_NO_DATAFEEDS.getPreferredName(),
request.allowNoDatafeeds()));
return channel -> client.execute(GetDatafeedsStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -39,6 +39,8 @@ public class RestGetDatafeedsAction extends BaseRestHandler {
datafeedId = GetDatafeedsAction.ALL;
}
GetDatafeedsAction.Request request = new GetDatafeedsAction.Request(datafeedId);
request.setAllowNoDatafeeds(restRequest.paramAsBoolean(GetDatafeedsAction.Request.ALLOW_NO_DATAFEEDS.getPreferredName(),
request.allowNoDatafeeds()));
return channel -> client.execute(GetDatafeedsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -40,23 +40,26 @@ public class RestStopDatafeedAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName());
StopDatafeedAction.Request jobDatafeedRequest;
StopDatafeedAction.Request request;
if (restRequest.hasContentOrSourceParam()) {
XContentParser parser = restRequest.contentOrSourceParamParser();
jobDatafeedRequest = StopDatafeedAction.Request.parseRequest(datafeedId, parser);
request = StopDatafeedAction.Request.parseRequest(datafeedId, parser);
} else {
jobDatafeedRequest = new StopDatafeedAction.Request(datafeedId);
if (restRequest.hasParam(StopDatafeedAction.TIMEOUT.getPreferredName())) {
request = new StopDatafeedAction.Request(datafeedId);
if (restRequest.hasParam(StopDatafeedAction.Request.TIMEOUT.getPreferredName())) {
TimeValue stopTimeout = restRequest.paramAsTime(
StopDatafeedAction.TIMEOUT.getPreferredName(), StopDatafeedAction.DEFAULT_TIMEOUT);
jobDatafeedRequest.setStopTimeout(stopTimeout);
StopDatafeedAction.Request.TIMEOUT.getPreferredName(), StopDatafeedAction.DEFAULT_TIMEOUT);
request.setStopTimeout(stopTimeout);
}
if (restRequest.hasParam(StopDatafeedAction.FORCE.getPreferredName())) {
jobDatafeedRequest.setForce(
restRequest.paramAsBoolean(StopDatafeedAction.FORCE.getPreferredName(), false));
if (restRequest.hasParam(StopDatafeedAction.Request.FORCE.getPreferredName())) {
request.setForce(restRequest.paramAsBoolean(StopDatafeedAction.Request.FORCE.getPreferredName(), request.isForce()));
}
if (restRequest.hasParam(StopDatafeedAction.Request.ALLOW_NO_DATAFEEDS.getPreferredName())) {
request.setAllowNoDatafeeds(restRequest.paramAsBoolean(StopDatafeedAction.Request.ALLOW_NO_DATAFEEDS.getPreferredName(),
request.allowNoDatafeeds()));
}
}
return channel -> client.execute(StopDatafeedAction.INSTANCE, jobDatafeedRequest, new RestBuilderListener<Response>(channel) {
return channel -> client.execute(StopDatafeedAction.INSTANCE, request, new RestBuilderListener<Response>(channel) {
@Override
public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {

View File

@ -40,7 +40,10 @@ public class RestCloseJobAction extends BaseRestHandler {
restRequest.param(Request.TIMEOUT.getPreferredName()), Request.TIMEOUT.getPreferredName()));
}
if (restRequest.hasParam(Request.FORCE.getPreferredName())) {
request.setForce(restRequest.paramAsBoolean(Request.FORCE.getPreferredName(), false));
request.setForce(restRequest.paramAsBoolean(Request.FORCE.getPreferredName(), request.isForce()));
}
if (restRequest.hasParam(Request.ALLOW_NO_JOBS.getPreferredName())) {
request.setAllowNoJobs(restRequest.paramAsBoolean(Request.ALLOW_NO_JOBS.getPreferredName(), request.allowNoJobs()));
}
return channel -> client.execute(CloseJobAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.rest.job;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
@ -37,9 +38,11 @@ public class RestGetJobStatsAction extends BaseRestHandler {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String jobId = restRequest.param(Job.ID.getPreferredName());
if (Strings.isNullOrEmpty(jobId)) {
jobId = Job.ALL;
jobId = MetaData.ALL;
}
GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId);
request.setAllowNoJobs(restRequest.paramAsBoolean(GetJobsStatsAction.Request.ALLOW_NO_JOBS.getPreferredName(),
request.allowNoJobs()));
return channel -> client.execute(GetJobsStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.rest.job;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
@ -38,9 +39,10 @@ public class RestGetJobsAction extends BaseRestHandler {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String jobId = restRequest.param(Job.ID.getPreferredName());
if (Strings.isNullOrEmpty(jobId)) {
jobId = Job.ALL;
jobId = MetaData.ALL;
}
GetJobsAction.Request request = new GetJobsAction.Request(jobId);
request.setAllowNoJobs(restRequest.paramAsBoolean(GetJobsAction.Request.ALLOW_NO_JOBS.getPreferredName(), request.allowNoJobs()));
return channel -> client.execute(GetJobsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.xpack.ml.utils;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.cluster.metadata.MetaData;
import java.util.regex.Pattern;
@ -58,7 +58,7 @@ public final class MlStrings {
}
public static boolean isValidId(String id) {
return id != null && VALID_ID_CHAR_PATTERN.matcher(id).matches() && !Job.ALL.equals(id);
return id != null && VALID_ID_CHAR_PATTERN.matcher(id).matches() && !MetaData.ALL.equals(id);
}
/**

View File

@ -0,0 +1,127 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.utils;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.regex.Regex;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Expands an expression into the set of matching names.
* It optionally supports aliases to the name set.
*/
public abstract class NameResolver {
private final Function<String, ResourceNotFoundException> notFoundExceptionSupplier;
protected NameResolver(Function<String, ResourceNotFoundException> notFoundExceptionSupplier) {
this.notFoundExceptionSupplier = Objects.requireNonNull(notFoundExceptionSupplier);
}
/**
* Expands an expression into the set of matching names.
* For example, given a set of names ["foo-1", "foo-2", "bar-1", bar-2"],
* expressions resolve follows:
* <ul>
* <li>"foo-1" : ["foo-1"]</li>
* <li>"bar-1" : ["bar-1"]</li>
* <li>"foo-1,foo-2" : ["foo-1", "foo-2"]</li>
* <li>"foo-*" : ["foo-1", "foo-2"]</li>
* <li>"*-1" : ["bar-1", "foo-1"]</li>
* <li>"*" : ["bar-1", "bar-2", "foo-1", "foo-2"]</li>
* <li>"_all" : ["bar-1", "bar-2", "foo-1", "foo-2"]</li>
* </ul>
*
* @param expression the expression to resolve
* @param allowNoMatch if {@code false}, an error is thrown when no name matches the {@code expression}
* @return the sorted set of matching names
*/
public SortedSet<String> expand(String expression, boolean allowNoMatch) {
SortedSet<String> result = new TreeSet<>();
if (MetaData.ALL.equals(expression) || Regex.isMatchAllPattern(expression)) {
result.addAll(nameSet());
} else {
String[] tokens = Strings.tokenizeToStringArray(expression, ",");
for (String token : tokens) {
if (Regex.isSimpleMatchPattern(token)) {
List<String> expanded = keys().stream()
.filter(key -> Regex.simpleMatch(token, key))
.map(this::lookup)
.flatMap(List::stream)
.collect(Collectors.toList());
if (expanded.isEmpty() && allowNoMatch == false) {
throw notFoundExceptionSupplier.apply(token);
}
result.addAll(expanded);
} else {
List<String> matchingNames = lookup(token);
if (matchingNames == null) {
throw notFoundExceptionSupplier.apply(token);
}
result.addAll(matchingNames);
}
}
}
if (result.isEmpty() && allowNoMatch == false) {
throw notFoundExceptionSupplier.apply(expression);
}
return result;
}
/**
* @return the set of registered keys
*/
protected abstract Set<String> keys();
/**
* @return the set of all names
*/
protected abstract Set<String> nameSet();
/**
* Looks up a key and returns the matching names.
* @param key the key to look up
* @return a list of the matching names or {@code null} when no matching names exist
*/
@Nullable
protected abstract List<String> lookup(String key);
/**
* Creates a {@code NameResolver} that has no aliases
* @param nameSet the set of all names
* @param notFoundExceptionSupplier a supplier of {@link ResourceNotFoundException} to be used when an expression matches no name
* @return the unaliased {@code NameResolver}
*/
public static NameResolver newUnaliased(Set<String> nameSet, Function<String, ResourceNotFoundException> notFoundExceptionSupplier) {
return new NameResolver(notFoundExceptionSupplier) {
@Override
protected Set<String> keys() {
return nameSet;
}
@Override
protected Set<String> nameSet() {
return nameSet;
}
@Override
protected List<String> lookup(String key) {
return nameSet.contains(key) ? Collections.singletonList(key) : null;
}
};
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.monitoring.collector.ml;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
@ -13,7 +14,6 @@ import org.elasticsearch.xpack.XPackClient;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.client.MachineLearningClient;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
@ -60,7 +60,7 @@ public class JobStatsCollector extends Collector {
protected List<MonitoringDoc> doCollect() throws Exception {
// fetch details about all jobs
final GetJobsStatsAction.Response jobs =
client.getJobsStats(new GetJobsStatsAction.Request(Job.ALL))
client.getJobsStats(new GetJobsStatsAction.Request(MetaData.ALL))
.actionGet(monitoringSettings.jobStatsTimeout());
final long timestamp = System.currentTimeMillis();

View File

@ -37,6 +37,7 @@ import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDat
import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
@ -362,4 +363,36 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId("foo"), new JobTaskStatus(JobState.OPENED, tasksBuilder.getLastAllocationId()));
assertEquals(JobState.OPENED, MlMetadata.getJobState("foo", tasksBuilder.build()));
}
public void testExpandJobIds() {
MlMetadata mlMetadata = newMlMetadataWithJobs("bar-1", "foo-1", "foo-2").build();
assertThat(mlMetadata.expandJobIds("_all", false), contains("bar-1", "foo-1", "foo-2"));
assertThat(mlMetadata.expandJobIds("*", false), contains("bar-1", "foo-1", "foo-2"));
assertThat(mlMetadata.expandJobIds("foo-*", false), contains("foo-1", "foo-2"));
assertThat(mlMetadata.expandJobIds("foo-1,bar-*", false), contains("bar-1", "foo-1"));
}
public void testExpandDatafeedIds() {
MlMetadata.Builder mlMetadataBuilder = newMlMetadataWithJobs("bar-1", "foo-1", "foo-2");
mlMetadataBuilder.putDatafeed(createDatafeedConfig("bar-1-feed", "bar-1").build());
mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-1-feed", "foo-1").build());
mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-2-feed", "foo-2").build());
MlMetadata mlMetadata = mlMetadataBuilder.build();
assertThat(mlMetadata.expandDatafeedIds("_all", false), contains("bar-1-feed", "foo-1-feed", "foo-2-feed"));
assertThat(mlMetadata.expandDatafeedIds("*", false), contains("bar-1-feed", "foo-1-feed", "foo-2-feed"));
assertThat(mlMetadata.expandDatafeedIds("foo-*", false), contains("foo-1-feed", "foo-2-feed"));
assertThat(mlMetadata.expandDatafeedIds("foo-1-feed,bar-1*", false), contains("bar-1-feed", "foo-1-feed"));
}
private static MlMetadata.Builder newMlMetadataWithJobs(String... jobIds) {
MlMetadata.Builder builder = new MlMetadata.Builder();
for (String jobId : jobIds) {
Job job = buildJobBuilder(jobId).build();
builder.putJob(job, false);
}
return builder;
}
}

View File

@ -55,6 +55,9 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
if (randomBoolean()) {
request.setForce(randomBoolean());
}
if (randomBoolean()) {
request.setAllowNoJobs(randomBoolean());
}
return request;
}
@ -140,12 +143,15 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
List<String> openJobs = new ArrayList<>();
List<String> closingJobs = new ArrayList<>();
CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs, true);
CloseJobAction.Request request = new CloseJobAction.Request("_all");
request.setForce(true);
CloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs);
assertEquals(Arrays.asList("job_id_1", "job_id_2", "job_id_3"), openJobs);
assertEquals(Arrays.asList("job_id_4"), closingJobs);
request.setForce(false);
expectThrows(ElasticsearchStatusException.class,
() -> CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs, false));
() -> CloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs));
}
public void testResolve_givenJobId() {
@ -163,7 +169,8 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
List<String> openJobs = new ArrayList<>();
List<String> closingJobs = new ArrayList<>();
CloseJobAction.resolveAndValidateJobId("job_id_1", cs1, openJobs, closingJobs, false);
CloseJobAction.Request request = new CloseJobAction.Request("job_id_1");
CloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs);
assertEquals(Arrays.asList("job_id_1"), openJobs);
assertEquals(Collections.emptyList(), closingJobs);
@ -174,7 +181,7 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
openJobs.clear();
closingJobs.clear();
CloseJobAction.resolveAndValidateJobId("job_id_1", cs1, openJobs, closingJobs, false);
CloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs);
assertEquals(Collections.emptyList(), openJobs);
assertEquals(Collections.emptyList(), closingJobs);
}
@ -190,8 +197,9 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
List<String> openJobs = new ArrayList<>();
List<String> closingJobs = new ArrayList<>();
CloseJobAction.Request request = new CloseJobAction.Request("missing-job");
expectThrows(ResourceNotFoundException.class,
() -> CloseJobAction.resolveAndValidateJobId("missing-job", cs1, openJobs, closingJobs, false));
() -> CloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs));
}
public void testResolve_givenJobIdFailed() {
@ -207,26 +215,31 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
List<String> openJobs = new ArrayList<>();
List<String> closingJobs = new ArrayList<>();
CloseJobAction.resolveAndValidateJobId("job_id_failed", cs1, openJobs, closingJobs, true);
CloseJobAction.Request request = new CloseJobAction.Request("job_id_failed");
request.setForce(true);
CloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs);
assertEquals(Arrays.asList("job_id_failed"), openJobs);
assertEquals(Collections.emptyList(), closingJobs);
openJobs.clear();
closingJobs.clear();
expectThrows(ElasticsearchStatusException.class,
() -> CloseJobAction.resolveAndValidateJobId("job_id_failed", cs1, openJobs, closingJobs, false));
request.setForce(false);
expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs));
}
public void testResolve_withSpecificJobIds() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_closing").build(new Date()), false);
mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_open").build(new Date()), false);
mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_open-1").build(new Date()), false);
mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_open-2").build(new Date()), false);
mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_closed").build(new Date()), false);
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id_closing", null, JobState.CLOSING, tasksBuilder);
addJobTask("job_id_open", null, JobState.OPENED, tasksBuilder);
addJobTask("job_id_open-1", null, JobState.OPENED, tasksBuilder);
addJobTask("job_id_open-2", null, JobState.OPENED, tasksBuilder);
// closed job has no task
ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
@ -237,20 +250,26 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
List<String> openJobs = new ArrayList<>();
List<String> closingJobs = new ArrayList<>();
CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs, false);
assertEquals(Arrays.asList("job_id_open"), openJobs);
CloseJobAction.resolveAndValidateJobId(new CloseJobAction.Request("_all"), cs1, openJobs, closingJobs);
assertEquals(Arrays.asList("job_id_open-1", "job_id_open-2"), openJobs);
assertEquals(Arrays.asList("job_id_closing"), closingJobs);
openJobs.clear();
closingJobs.clear();
CloseJobAction.resolveAndValidateJobId("job_id_closing", cs1, openJobs, closingJobs, false);
CloseJobAction.resolveAndValidateJobId(new CloseJobAction.Request("*open*"), cs1, openJobs, closingJobs);
assertEquals(Arrays.asList("job_id_open-1", "job_id_open-2"), openJobs);
assertEquals(Collections.emptyList(), closingJobs);
openJobs.clear();
closingJobs.clear();
CloseJobAction.resolveAndValidateJobId(new CloseJobAction.Request("job_id_closing"), cs1, openJobs, closingJobs);
assertEquals(Collections.emptyList(), openJobs);
assertEquals(Arrays.asList("job_id_closing"), closingJobs);
openJobs.clear();
closingJobs.clear();
CloseJobAction.resolveAndValidateJobId("job_id_open", cs1, openJobs, closingJobs, false);
assertEquals(Arrays.asList("job_id_open"), openJobs);
CloseJobAction.resolveAndValidateJobId(new CloseJobAction.Request("job_id_open-1"), cs1, openJobs, closingJobs);
assertEquals(Arrays.asList("job_id_open-1"), openJobs);
assertEquals(Collections.emptyList(), closingJobs);
openJobs.clear();
closingJobs.clear();

View File

@ -5,15 +5,17 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction.Request;
import org.elasticsearch.xpack.ml.job.config.Job;
public class GetDatafeedStatsActionRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomBoolean() ? Job.ALL : randomAlphaOfLengthBetween(1, 20));
Request request = new Request(randomBoolean() ? MetaData.ALL : randomAlphaOfLengthBetween(1, 20));
request.setAllowNoDatafeeds(randomBoolean());
return request;
}
@Override

View File

@ -5,15 +5,17 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.ml.action.GetDatafeedsAction.Request;
import org.elasticsearch.xpack.ml.job.config.Job;
public class GetDatafeedsActionRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomBoolean() ? Job.ALL : randomAlphaOfLengthBetween(1, 20));
Request request = new Request(randomBoolean() ? MetaData.ALL : randomAlphaOfLengthBetween(1, 20));
request.setAllowNoDatafeeds(randomBoolean());
return request;
}
@Override

View File

@ -5,15 +5,17 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction.Request;
import org.elasticsearch.xpack.ml.job.config.Job;
public class GetJobStatsActionRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomBoolean() ? Job.ALL : randomAlphaOfLengthBetween(1, 20));
Request request = new Request(randomBoolean() ? MetaData.ALL : randomAlphaOfLengthBetween(1, 20));
request.setAllowNoJobs(randomBoolean());
return request;
}
@Override

View File

@ -5,20 +5,21 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.ml.action.GetJobsAction.Request;
import org.elasticsearch.xpack.ml.job.config.Job;
public class GetJobsActionRequestTests extends AbstractStreamableTestCase<GetJobsAction.Request> {
@Override
protected Request createTestInstance() {
return new Request(randomBoolean() ? Job.ALL : randomAlphaOfLengthBetween(1, 20));
Request request = new Request(randomBoolean() ? MetaData.ALL : randomAlphaOfLengthBetween(1, 20));
request.setAllowNoJobs(randomBoolean());
return request;
}
@Override
protected Request createBlankInstance() {
return new Request();
}
}

View File

@ -37,7 +37,12 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
if (randomBoolean()) {
request.setStopTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
request.setForce(randomBoolean());
if (randomBoolean()) {
request.setForce(randomBoolean());
}
if (randomBoolean()) {
request.setAllowNoDatafeeds(randomBoolean());
}
return request;
}
@ -95,13 +100,15 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
List<String> startedDatafeeds = new ArrayList<>();
List<String> stoppingDatafeeds = new ArrayList<>();
StopDatafeedAction.resolveDataFeedIds("datafeed_1", mlMetadata, tasks, startedDatafeeds, stoppingDatafeeds);
StopDatafeedAction.resolveDataFeedIds(new StopDatafeedAction.Request("datafeed_1"), mlMetadata, tasks, startedDatafeeds,
stoppingDatafeeds);
assertEquals(Arrays.asList("datafeed_1"), startedDatafeeds);
assertEquals(Collections.emptyList(), stoppingDatafeeds);
startedDatafeeds.clear();
stoppingDatafeeds.clear();
StopDatafeedAction.resolveDataFeedIds("datafeed_2", mlMetadata, tasks, startedDatafeeds, stoppingDatafeeds);
StopDatafeedAction.resolveDataFeedIds(new StopDatafeedAction.Request("datafeed_2"), mlMetadata, tasks, startedDatafeeds,
stoppingDatafeeds);
assertEquals(Collections.emptyList(), startedDatafeeds);
assertEquals(Collections.emptyList(), stoppingDatafeeds);
}
@ -130,13 +137,15 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
List<String> startedDatafeeds = new ArrayList<>();
List<String> stoppingDatafeeds = new ArrayList<>();
StopDatafeedAction.resolveDataFeedIds("_all", mlMetadata, tasks, startedDatafeeds, stoppingDatafeeds);
StopDatafeedAction.resolveDataFeedIds(new StopDatafeedAction.Request("_all"), mlMetadata, tasks, startedDatafeeds,
stoppingDatafeeds);
assertEquals(Arrays.asList("datafeed_1"), startedDatafeeds);
assertEquals(Arrays.asList("datafeed_3"), stoppingDatafeeds);
startedDatafeeds.clear();
stoppingDatafeeds.clear();
StopDatafeedAction.resolveDataFeedIds("datafeed_2", mlMetadata, tasks, startedDatafeeds, stoppingDatafeeds);
StopDatafeedAction.resolveDataFeedIds(new StopDatafeedAction.Request("datafeed_2"), mlMetadata, tasks, startedDatafeeds,
stoppingDatafeeds);
assertEquals(Collections.emptyList(), startedDatafeeds);
assertEquals(Collections.emptyList(), stoppingDatafeeds);
}

View File

@ -56,20 +56,9 @@ public class JobManagerTests extends ESTestCase {
auditor = mock(Auditor.class);
}
public void testGetJob() {
JobManager jobManager = createJobManager();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(buildJobBuilder("foo").build(), false);
ClusterState clusterState = ClusterState.builder(new ClusterName("name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, builder.build())).build();
QueryPage<Job> doc = jobManager.getJob("foo", clusterState);
assertTrue(doc.count() > 0);
assertThat(doc.results().get(0).getId(), equalTo("foo"));
}
public void testGetJobOrThrowIfUnknown_GivenUnknownJob() {
ClusterState cs = createClusterState();
ESTestCase.expectThrows(ResourceNotFoundException.class, () -> JobManager.getJobOrThrowIfUnknown(cs, "foo"));
ESTestCase.expectThrows(ResourceNotFoundException.class, () -> JobManager.getJobOrThrowIfUnknown("foo", cs));
}
public void testGetJobOrThrowIfUnknown_GivenKnownJob() {
@ -78,10 +67,10 @@ public class JobManagerTests extends ESTestCase {
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata)).build();
assertEquals(job, JobManager.getJobOrThrowIfUnknown(cs, "foo"));
assertEquals(job, JobManager.getJobOrThrowIfUnknown("foo", cs));
}
public void testGetJob_GivenJobIdIsAll() {
public void testExpandJobs_GivenAll() {
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
for (int i = 0; i < 3; i++) {
mlMetadata.putJob(buildJobBuilder(Integer.toString(i)).build(), false);
@ -90,36 +79,14 @@ public class JobManagerTests extends ESTestCase {
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata.build())).build();
JobManager jobManager = createJobManager();
QueryPage<Job> result = jobManager.getJob("_all", clusterState);
QueryPage<Job> result = jobManager.expandJobs("_all", true, clusterState);
assertThat(result.count(), equalTo(3L));
assertThat(result.results().get(0).getId(), equalTo("0"));
assertThat(result.results().get(1).getId(), equalTo("1"));
assertThat(result.results().get(2).getId(), equalTo("2"));
}
public void testGetJobs() {
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
for (int i = 0; i < 10; i++) {
mlMetadata.putJob(buildJobBuilder(Integer.toString(i)).build(), false);
}
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata.build())).build();
JobManager jobManager = createJobManager();
QueryPage<Job> result = jobManager.getJobs(clusterState);
assertThat(result.count(), equalTo(10L));
assertThat(result.results().get(0).getId(), equalTo("0"));
assertThat(result.results().get(1).getId(), equalTo("1"));
assertThat(result.results().get(2).getId(), equalTo("2"));
assertThat(result.results().get(3).getId(), equalTo("3"));
assertThat(result.results().get(4).getId(), equalTo("4"));
assertThat(result.results().get(5).getId(), equalTo("5"));
assertThat(result.results().get(6).getId(), equalTo("6"));
assertThat(result.results().get(7).getId(), equalTo("7"));
assertThat(result.results().get(8).getId(), equalTo("8"));
assertThat(result.results().get(9).getId(), equalTo("9"));
}
@SuppressWarnings("unchecked")
public void testPutJob_AddsCreateTime() {
JobManager jobManager = createJobManager();

View File

@ -294,15 +294,15 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE);
try {
CloseJobAction.Request closeRequest = new CloseJobAction.Request(Job.ALL);
CloseJobAction.Request closeRequest = new CloseJobAction.Request(MetaData.ALL);
closeRequest.setCloseTimeout(TimeValue.timeValueSeconds(20L));
logger.info("Closing jobs using [{}]", Job.ALL);
logger.info("Closing jobs using [{}]", MetaData.ALL);
CloseJobAction.Response response = client.execute(CloseJobAction.INSTANCE, closeRequest)
.get();
assertTrue(response.isClosed());
} catch (Exception e1) {
try {
CloseJobAction.Request closeRequest = new CloseJobAction.Request(Job.ALL);
CloseJobAction.Request closeRequest = new CloseJobAction.Request(MetaData.ALL);
closeRequest.setForce(true);
closeRequest.setCloseTimeout(TimeValue.timeValueSeconds(20L));
CloseJobAction.Response response =

View File

@ -0,0 +1,146 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.utils;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class NameResolverTests extends ESTestCase {
public void testNoMatchingNames() {
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class,
() -> newUnaliasedResolver().expand("foo", false));
assertThat(e.getMessage(), equalTo("foo"));
}
public void testNoMatchingNames_GivenPatternAndAllowNoMatch() {
assertThat(newUnaliasedResolver().expand("foo*", true).isEmpty(), is(true));
}
public void testNoMatchingNames_GivenPatternAndNotAllowNoMatch() {
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class,
() -> newUnaliasedResolver().expand("foo*", false));
assertThat(e.getMessage(), equalTo("foo*"));
}
public void testNoMatchingNames_GivenMatchingNameAndNonMatchingPatternAndNotAllowNoMatch() {
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class,
() -> newUnaliasedResolver("foo").expand("foo, bar*", false));
assertThat(e.getMessage(), equalTo("bar*"));
}
public void testUnaliased() {
NameResolver nameResolver = newUnaliasedResolver("foo-1", "foo-2", "bar-1", "bar-2");
assertThat(nameResolver.expand("foo-1", false), equalTo(newSortedSet("foo-1")));
assertThat(nameResolver.expand("foo-2", false), equalTo(newSortedSet("foo-2")));
assertThat(nameResolver.expand("bar-1", false), equalTo(newSortedSet("bar-1")));
assertThat(nameResolver.expand("bar-2", false), equalTo(newSortedSet("bar-2")));
assertThat(nameResolver.expand("foo-1,foo-2", false), equalTo(newSortedSet("foo-1", "foo-2")));
assertThat(nameResolver.expand("foo-*", false), equalTo(newSortedSet("foo-1", "foo-2")));
assertThat(nameResolver.expand("bar-*", false), equalTo(newSortedSet("bar-1", "bar-2")));
assertThat(nameResolver.expand("*oo-*", false), equalTo(newSortedSet("foo-1", "foo-2")));
assertThat(nameResolver.expand("*-1", false), equalTo(newSortedSet("foo-1", "bar-1")));
assertThat(nameResolver.expand("*-2", false), equalTo(newSortedSet("foo-2", "bar-2")));
assertThat(nameResolver.expand("*", false), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2")));
assertThat(nameResolver.expand("_all", false), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2")));
assertThat(nameResolver.expand("foo-1,foo-2", false), equalTo(newSortedSet("foo-1", "foo-2")));
assertThat(nameResolver.expand("foo-1,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1")));
assertThat(nameResolver.expand("foo-*,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1", "foo-2")));
}
public void testAliased() {
Map<String, List<String>> namesAndAliasesMap = new HashMap<>();
namesAndAliasesMap.put("foo-1", Collections.singletonList("foo-1"));
namesAndAliasesMap.put("foo-2", Collections.singletonList("foo-2"));
namesAndAliasesMap.put("bar-1", Collections.singletonList("bar-1"));
namesAndAliasesMap.put("bar-2", Collections.singletonList("bar-2"));
namesAndAliasesMap.put("foo-group", Arrays.asList("foo-1", "foo-2"));
namesAndAliasesMap.put("bar-group", Arrays.asList("bar-1", "bar-2"));
NameResolver nameResolver = new TestAliasNameResolver(namesAndAliasesMap);
// First try same set of assertions as unaliases
assertThat(nameResolver.expand("foo-1", false), equalTo(newSortedSet("foo-1")));
assertThat(nameResolver.expand("foo-2", false), equalTo(newSortedSet("foo-2")));
assertThat(nameResolver.expand("bar-1", false), equalTo(newSortedSet("bar-1")));
assertThat(nameResolver.expand("bar-2", false), equalTo(newSortedSet("bar-2")));
assertThat(nameResolver.expand("foo-1,foo-2", false), equalTo(newSortedSet("foo-1", "foo-2")));
assertThat(nameResolver.expand("foo-*", false), equalTo(newSortedSet("foo-1", "foo-2")));
assertThat(nameResolver.expand("bar-*", false), equalTo(newSortedSet("bar-1", "bar-2")));
assertThat(nameResolver.expand("*oo-*", false), equalTo(newSortedSet("foo-1", "foo-2")));
assertThat(nameResolver.expand("*-1", false), equalTo(newSortedSet("foo-1", "bar-1")));
assertThat(nameResolver.expand("*-2", false), equalTo(newSortedSet("foo-2", "bar-2")));
assertThat(nameResolver.expand("*", false), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2")));
assertThat(nameResolver.expand("_all", false), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2")));
assertThat(nameResolver.expand("foo-1,foo-2", false), equalTo(newSortedSet("foo-1", "foo-2")));
assertThat(nameResolver.expand("foo-1,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1")));
assertThat(nameResolver.expand("foo-*,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1", "foo-2")));
// No let's test the aliases
assertThat(nameResolver.expand("foo-group", false), equalTo(newSortedSet("foo-1", "foo-2")));
assertThat(nameResolver.expand("bar-group", false), equalTo(newSortedSet("bar-1", "bar-2")));
assertThat(nameResolver.expand("foo-group,bar-group", false), equalTo(newSortedSet("bar-1", "bar-2", "foo-1", "foo-2")));
assertThat(nameResolver.expand("foo-group,foo-1", false), equalTo(newSortedSet("foo-1", "foo-2")));
assertThat(nameResolver.expand("foo-group,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1", "foo-2")));
assertThat(nameResolver.expand("foo-group,bar-*", false), equalTo(newSortedSet("bar-1", "bar-2", "foo-1", "foo-2")));
}
private static NameResolver newUnaliasedResolver(String... names) {
return NameResolver.newUnaliased(new HashSet<>(Arrays.asList(names)), notFoundExceptionSupplier());
}
private static SortedSet<String> newSortedSet(String... names) {
SortedSet<String> result = new TreeSet<>();
for (String name : names) {
result.add(name);
}
return result;
}
private static Function<String, ResourceNotFoundException> notFoundExceptionSupplier() {
return s -> new ResourceNotFoundException(s);
}
private static class TestAliasNameResolver extends NameResolver {
private final Map<String, List<String>> lookup;
TestAliasNameResolver(Map<String, List<String>> lookup) {
super(notFoundExceptionSupplier());
this.lookup = lookup;
}
@Override
protected Set<String> keys() {
return lookup.keySet();
}
@Override
protected Set<String> nameSet() {
return lookup.values().stream().flatMap(List::stream).collect(Collectors.toSet());
}
@Override
protected List<String> lookup(String key) {
return lookup.get(key);
}
}
}

View File

@ -151,7 +151,7 @@ public class JobStatsCollectorTests extends ESTestCase {
final ActionFuture<Response> future = (ActionFuture<Response>)mock(ActionFuture.class);
final Response response = new Response(new QueryPage<>(jobStats, jobStats.size(), Job.RESULTS_FIELD));
when(client.getJobsStats(eq(new Request(Job.ALL)))).thenReturn(future);
when(client.getJobsStats(eq(new Request(MetaData.ALL)))).thenReturn(future);
when(future.actionGet(timeout)).thenReturn(response);
final List<MonitoringDoc> monitoringDocs = collector.doCollect();

View File

@ -13,6 +13,11 @@
}
},
"params": {
"allow_no_jobs": {
"type": "boolean",
"required": false,
"description": "Whether to ignore if a wildcard expression matches no jobs. (This includes `_all` string or when no jobs have been specified)"
},
"force": {
"type": "boolean",
"required": false,

View File

@ -13,6 +13,13 @@
"type": "string",
"description": "The ID of the datafeeds stats to fetch"
}
},
"params": {
"allow_no_datafeeds": {
"type": "boolean",
"required": false,
"description": "Whether to ignore if a wildcard expression matches no datafeeds. (This includes `_all` string or when no datafeeds have been specified)"
}
}
},
"body": null

View File

@ -13,6 +13,13 @@
"type": "string",
"description": "The ID of the datafeeds to fetch"
}
},
"params": {
"allow_no_datafeeds": {
"type": "boolean",
"required": false,
"description": "Whether to ignore if a wildcard expression matches no datafeeds. (This includes `_all` string or when no datafeeds have been specified)"
}
}
},
"body": null

View File

@ -13,6 +13,13 @@
"type": "string",
"description": "The ID of the jobs stats to fetch"
}
},
"params": {
"allow_no_jobs": {
"type": "boolean",
"required": false,
"description": "Whether to ignore if a wildcard expression matches no jobs. (This includes `_all` string or when no jobs have been specified)"
}
}
},
"body": null

View File

@ -13,6 +13,13 @@
"type": "string",
"description": "The ID of the jobs to fetch"
}
},
"params": {
"allow_no_jobs": {
"type": "boolean",
"required": false,
"description": "Whether to ignore if a wildcard expression matches no jobs. (This includes `_all` string or when no jobs have been specified)"
}
}
},
"body": null

View File

@ -17,6 +17,11 @@
}
},
"params": {
"allow_no_datafeeds": {
"type": "boolean",
"required": false,
"description": "Whether to ignore if a wildcard expression matches no datafeeds. (This includes `_all` string or when no datafeeds have been specified)"
},
"force": {
"type": "boolean",
"required": false,

View File

@ -45,6 +45,25 @@ setup:
- match: { count: 0 }
- match: { datafeeds: [] }
---
"Test get datafeed with expression that does not match and allow_no_datafeeds":
- do:
xpack.ml.get_datafeeds:
datafeed_id: "missing-*"
allow_no_datafeeds: true
- match: { count: 0 }
- match: { datafeeds: [] }
---
"Test get datafeed with expression that does not match and not allow_no_datafeeds":
- do:
catch: missing
xpack.ml.get_datafeeds:
datafeed_id: "missing-*"
allow_no_datafeeds: false
---
"Test put datafeed referring to missing job_id":
- do:

View File

@ -85,6 +85,25 @@ setup:
xpack.ml.get_datafeed_stats:
datafeed_id: missing-datafeed
---
"Test get datafeed stats with expression that does not match and allow_no_datafeeds":
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: "missing-*"
allow_no_datafeeds: true
- match: { count: 0 }
- match: { datafeeds: [] }
---
"Test get datafeed stats with expression that does not match and not allow_no_datafeeds":
- do:
catch: missing
xpack.ml.get_datafeed_stats:
datafeed_id: "missing-*"
allow_no_datafeeds: false
---
"Test get single datafeed stats":

View File

@ -13,6 +13,25 @@
- match: { count: 0 }
- match: { jobs: [] }
---
"Test get jobs with expression that does not match and allow_no_jobs":
- do:
xpack.ml.get_jobs:
job_id: "missing-*"
allow_no_jobs: true
- match: { count: 0 }
- match: { jobs: [] }
---
"Test get jobs with expression that does not match and not allow_no_jobs":
- do:
catch: missing
xpack.ml.get_jobs:
job_id: "missing-*"
allow_no_jobs: false
---
"Test job crud apis":
@ -521,6 +540,87 @@
- match: { jobs.0.state: closed }
- match: { jobs.1.state: closed }
---
"Test close jobs with expression that matches":
- do:
xpack.ml.put_job:
job_id: jobs-crud-with-expression-that-matches-foo-1
body: >
{
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"data_description" : {}
}
- do:
xpack.ml.put_job:
job_id: jobs-crud-with-expression-that-matches-foo-2
body: >
{
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"data_description" : {}
}
- do:
xpack.ml.put_job:
job_id: jobs-crud-with-expression-that-matches-bar-1
body: >
{
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"data_description" : {}
}
- do:
xpack.ml.open_job:
job_id: jobs-crud-with-expression-that-matches-foo-1
- do:
xpack.ml.open_job:
job_id: jobs-crud-with-expression-that-matches-foo-2
- do:
xpack.ml.open_job:
job_id: jobs-crud-with-expression-that-matches-bar-1
- do:
xpack.ml.close_job:
job_id: "*foo-*"
- match: { closed: true }
- do:
xpack.ml.get_job_stats:
job_id: "*foo-*"
- match: { jobs.0.state: closed }
- match: { jobs.1.state: closed }
- do:
xpack.ml.get_job_stats:
job_id: "*bar-1"
- match: { jobs.0.state: opened }
---
"Test close jobs with expression that does not match and allow_no_jobs":
- do:
xpack.ml.close_job:
job_id: "missing-*"
allow_no_jobs: true
- match: { closed: true }
---
"Test close jobs with expression that does not match and not allow_no_jobs":
- do:
catch: missing
xpack.ml.close_job:
job_id: "missing-*"
allow_no_jobs: false
---
"Test force close job":

View File

@ -127,7 +127,7 @@ setup:
- is_true: jobs.0.open_time
---
"Test get all job stats explicitly":
"Test get all job stats with _all":
- do:
xpack.ml.get_job_stats:
@ -136,6 +136,16 @@ setup:
- match: { jobs.0.state: opened }
- match: { jobs.1.state: opened }
---
"Test get all job stats with wildcard":
- do:
xpack.ml.get_job_stats:
job_id: "*"
- match: { count: 2 }
- match: { jobs.0.state: opened }
- match: { jobs.1.state: opened }
---
"Test get all job stats implicitly":
@ -151,6 +161,24 @@ setup:
xpack.ml.get_job_stats:
job_id: unknown-job
---
"Test get job stats given pattern and allow_no_jobs":
- do:
xpack.ml.get_job_stats:
job_id: "missing-*"
allow_no_jobs: true
- match: { count: 0 }
---
"Test get job stats given pattern and not allow_no_jobs":
- do:
catch: missing
xpack.ml.get_job_stats:
job_id: "missing-*"
allow_no_jobs: false
---
"Test reading v54 data counts and model size stats":

View File

@ -143,23 +143,6 @@ setup:
datafeed_id: "start-stop-datafeed-datafeed-1"
start: 0
---
"Test stop non existing datafeed":
- do:
catch: missing
xpack.ml.stop_datafeed:
datafeed_id: "non-existing-datafeed"
---
"Test stop already stopped datafeed job is not an error":
- do:
xpack.ml.stop_datafeed:
datafeed_id: "start-stop-datafeed-datafeed-1"
- do:
xpack.ml.stop_datafeed:
datafeed_id: "start-stop-datafeed-datafeed-1"
---
"Test start given end earlier than start":
- do:
@ -239,3 +222,155 @@ setup:
catch: /datafeed \[start-stop-datafeed-job-field-without-mappings-feed] cannot retrieve field \[airline2\] because it has no mappings/
xpack.ml.start_datafeed:
datafeed_id: "start-stop-datafeed-job-field-without-mappings-feed"
---
"Test stop non existing datafeed":
- do:
catch: missing
xpack.ml.stop_datafeed:
datafeed_id: "non-existing-datafeed"
---
"Test stop with expression that does not match and allow_no_datafeeds":
- do:
xpack.ml.stop_datafeed:
datafeed_id: "missing-*"
allow_no_datafeeds: true
- match: { stopped: true }
---
"Test stop with expression that does not match and not allow_no_datafeeds":
- do:
catch: missing
xpack.ml.stop_datafeed:
datafeed_id: "missing-*"
allow_no_datafeeds: false
---
"Test stop already stopped datafeed job is not an error":
- do:
xpack.ml.stop_datafeed:
datafeed_id: "start-stop-datafeed-datafeed-1"
- do:
xpack.ml.stop_datafeed:
datafeed_id: "start-stop-datafeed-datafeed-1"
---
"Test stop given expression":
- do:
xpack.ml.put_job:
job_id: start-stop-datafeed-job-foo-1
body: >
{
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"data_description" : {
"time_field":"time"
}
}
- do:
xpack.ml.put_job:
job_id: start-stop-datafeed-job-foo-2
body: >
{
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"data_description" : {
"time_field":"time"
}
}
- do:
xpack.ml.put_job:
job_id: start-stop-datafeed-job-bar-1
body: >
{
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"data_description" : {
"time_field":"time"
}
}
- do:
xpack.ml.put_datafeed:
datafeed_id: start-stop-datafeed-job-foo-1-feed
body: >
{
"job_id":"start-stop-datafeed-job-foo-1",
"indexes":"airline-data",
"types":"response"
}
- do:
xpack.ml.put_datafeed:
datafeed_id: start-stop-datafeed-job-foo-2-feed
body: >
{
"job_id":"start-stop-datafeed-job-foo-2",
"indexes":"airline-data",
"types":"response"
}
- do:
xpack.ml.put_datafeed:
datafeed_id: start-stop-datafeed-job-bar-1-feed
body: >
{
"job_id":"start-stop-datafeed-job-bar-1",
"indexes":"airline-data",
"types":"response"
}
- do:
xpack.ml.open_job:
job_id: start-stop-datafeed-job-foo-1
- match: { opened: true }
- do:
xpack.ml.open_job:
job_id: start-stop-datafeed-job-foo-2
- match: { opened: true }
- do:
xpack.ml.open_job:
job_id: start-stop-datafeed-job-bar-1
- match: { opened: true }
- do:
xpack.ml.start_datafeed:
datafeed_id: "start-stop-datafeed-job-foo-1-feed"
- match: { started: true }
- do:
xpack.ml.start_datafeed:
datafeed_id: "start-stop-datafeed-job-foo-2-feed"
- match: { started: true }
- do:
xpack.ml.start_datafeed:
datafeed_id: "start-stop-datafeed-job-bar-1-feed"
- match: { started: true }
- do:
xpack.ml.stop_datafeed:
datafeed_id: "start-stop-datafeed-job-foo-*"
- match: { stopped: true }
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: "start-stop-datafeed-job-foo-*"
- match: { datafeeds.0.state: "stopped"}
- match: { datafeeds.1.state: "stopped"}
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: "start-stop-datafeed-job-bar-1-feed"
- match: { datafeeds.0.state: "started"}