[ML] Cut open job and start datafeed apis to use persistent task's assignment explanation.

Also included assignment explanation to both job and datafeed stats apis and
included executor node to datafeed stats api

Original commit: elastic/x-pack-elasticsearch@783bc77ef6
This commit is contained in:
Martijn van Groningen 2017-02-24 20:17:28 +01:00
parent 69880373fc
commit 93a2a567cb
14 changed files with 303 additions and 139 deletions

View File

@ -18,7 +18,9 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -31,6 +33,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction.Response.DatafeedStats;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
@ -40,12 +43,9 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Request, GetDatafeedsStatsAction.Response,
GetDatafeedsStatsAction.RequestBuilder> {
@ -132,15 +132,24 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
private final String datafeedId;
private final DatafeedState datafeedState;
@Nullable
private DiscoveryNode node;
@Nullable
private String assignmentExplanation;
DatafeedStats(String datafeedId, DatafeedState datafeedState) {
DatafeedStats(String datafeedId, DatafeedState datafeedState, @Nullable DiscoveryNode node,
@Nullable String assignmentExplanation) {
this.datafeedId = Objects.requireNonNull(datafeedId);
this.datafeedState = Objects.requireNonNull(datafeedState);
this.node = node;
this.assignmentExplanation = assignmentExplanation;
}
DatafeedStats(StreamInput in) throws IOException {
datafeedId = in.readString();
datafeedState = DatafeedState.fromStream(in);
node = in.readOptionalWriteable(DiscoveryNode::new);
assignmentExplanation = in.readOptionalString();
}
public String getDatafeedId() {
@ -151,13 +160,37 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
return datafeedState;
}
public DiscoveryNode getNode() {
return node;
}
public String getAssignmentExplanation() {
return assignmentExplanation;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
builder.field(STATE, datafeedState.toString());
builder.endObject();
if (node != null) {
builder.startObject("node");
builder.field("id", node.getId());
builder.field("name", node.getName());
builder.field("ephemeral_id", node.getEphemeralId());
builder.field("transport_address", node.getAddress().toString());
builder.startObject("attributes");
for (Map.Entry<String, String> entry : node.getAttributes().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
builder.endObject();
}
if (assignmentExplanation != null) {
builder.field("assigment_explanation", assignmentExplanation);
}
builder.endObject();
return builder;
}
@ -165,11 +198,13 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
public void writeTo(StreamOutput out) throws IOException {
out.writeString(datafeedId);
datafeedState.writeTo(out);
out.writeOptionalWriteable(node);
out.writeOptionalString(assignmentExplanation);
}
@Override
public int hashCode() {
return Objects.hash(datafeedId, datafeedState);
return Objects.hash(datafeedId, datafeedState, node, assignmentExplanation);
}
@Override
@ -180,8 +215,11 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
if (getClass() != obj.getClass()) {
return false;
}
GetDatafeedsStatsAction.Response.DatafeedStats other = (GetDatafeedsStatsAction.Response.DatafeedStats) obj;
return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(this.datafeedState, other.datafeedState);
DatafeedStats other = (DatafeedStats) obj;
return Objects.equals(datafeedId, other.datafeedId) &&
Objects.equals(this.datafeedState, other.datafeedState) &&
Objects.equals(this.node, other.node) &&
Objects.equals(this.assignmentExplanation, other.assignmentExplanation);
}
}
@ -264,34 +302,28 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
logger.debug("Get stats for datafeed '{}'", request.getDatafeedId());
Map<String, DatafeedState> states = new HashMap<>();
PersistentTasksInProgress tasksInProgress = state.getMetaData().custom(PersistentTasksInProgress.TYPE);
if (tasksInProgress != null) {
Predicate<PersistentTaskInProgress<?>> predicate = ALL.equals(request.getDatafeedId()) ? p -> true :
p -> request.getDatafeedId().equals(((StartDatafeedAction.Request) p.getRequest()).getDatafeedId());
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.findTasks(StartDatafeedAction.NAME, predicate)) {
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) taskInProgress.getRequest();
states.put(storedRequest.getDatafeedId(), DatafeedState.STARTED);
}
}
List<Response.DatafeedStats> stats = new ArrayList<>();
Map<String, DatafeedStats> results = new HashMap<>();
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
if (ALL.equals(request.getDatafeedId())) {
Collection<DatafeedConfig> datafeeds = mlMetadata.getDatafeeds().values();
for (DatafeedConfig datafeed : datafeeds) {
DatafeedState datafeedState = states.getOrDefault(datafeed.getId(), DatafeedState.STOPPED);
stats.add(new Response.DatafeedStats(datafeed.getId(), datafeedState));
}
} else {
DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
if (datafeed == null) {
PersistentTasksInProgress tasksInProgress = state.getMetaData().custom(PersistentTasksInProgress.TYPE);
if (request.getDatafeedId().equals(ALL) == false && mlMetadata.getDatafeed(request.getDatafeedId()) == null) {
throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId());
}
DatafeedState datafeedState = states.getOrDefault(datafeed.getId(), DatafeedState.STOPPED);
stats.add(new Response.DatafeedStats(datafeed.getId(), datafeedState));
for (DatafeedConfig datafeedConfig : mlMetadata.getDatafeeds().values()) {
if (request.getDatafeedId().equals(ALL) || datafeedConfig.getId().equals(request.getDatafeedId())) {
PersistentTaskInProgress<?> task = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasksInProgress);
DatafeedState datafeedState = MlMetadata.getDatafeedState(request.getDatafeedId(), tasksInProgress);
DiscoveryNode node = null;
String explanation = null;
if (task != null) {
node = state.nodes().get(task.getExecutorNode());
explanation = task.getAssignment().getExplanation();
}
QueryPage<Response.DatafeedStats> statsPage = new QueryPage<>(stats, stats.size(), DatafeedConfig.RESULTS_FIELD);
results.put(datafeedConfig.getId(), new DatafeedStats(datafeedConfig.getId(), datafeedState, node, explanation));
}
}
QueryPage<DatafeedStats> statsPage = new QueryPage<>(new ArrayList<>(results.values()), results.size(),
DatafeedConfig.RESULTS_FIELD);
listener.onResponse(new Response(statsPage));
}

View File

@ -160,13 +160,17 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
private JobState state;
@Nullable
private DiscoveryNode node;
@Nullable
private String assignmentExplanation;
JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobState state, DiscoveryNode node) {
JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobState state,
@Nullable DiscoveryNode node, @Nullable String assignmentExplanation) {
this.jobId = Objects.requireNonNull(jobId);
this.dataCounts = Objects.requireNonNull(dataCounts);
this.modelSizeStats = modelSizeStats;
this.state = Objects.requireNonNull(state);
this.node = node;
this.assignmentExplanation = assignmentExplanation;
}
JobStats(StreamInput in) throws IOException {
@ -175,6 +179,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
modelSizeStats = in.readOptionalWriteable(ModelSizeStats::new);
state = JobState.fromStream(in);
node = in.readOptionalWriteable(DiscoveryNode::new);
assignmentExplanation = in.readOptionalString();
}
public String getJobId() {
@ -197,6 +202,10 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
return node;
}
public String getAssignmentExplanation() {
return assignmentExplanation;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -220,6 +229,9 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
builder.endObject();
builder.endObject();
}
if (assignmentExplanation != null) {
builder.field("assigment_explanation", assignmentExplanation);
}
builder.endObject();
return builder;
}
@ -231,11 +243,12 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
out.writeOptionalWriteable(modelSizeStats);
state.writeTo(out);
out.writeOptionalWriteable(node);
out.writeOptionalString(assignmentExplanation);
}
@Override
public int hashCode() {
return Objects.hash(jobId, dataCounts, modelSizeStats, state, node);
return Objects.hash(jobId, dataCounts, modelSizeStats, state, node, assignmentExplanation);
}
@Override
@ -251,7 +264,8 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
&& Objects.equals(this.dataCounts, other.dataCounts)
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
&& Objects.equals(this.state, other.state)
&& Objects.equals(this.node, other.node);
&& Objects.equals(this.node, other.node)
&& Objects.equals(this.assignmentExplanation, other.assignmentExplanation);
}
}
@ -378,7 +392,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
protected void taskOperation(Request request, OpenJobAction.JobTask task,
ActionListener<QueryPage<Response.JobStats>> listener) {
String jobId = task.getJobId();
logger.debug("Get stats for job '{}'", jobId);
logger.debug("Get stats for job [{}]", jobId);
ClusterState state = clusterService.state();
PersistentTasksInProgress tasks = state.getMetaData().custom(PersistentTasksInProgress.TYPE);
Optional<Tuple<DataCounts, ModelSizeStats>> stats = processManager.getStatistics(jobId);
@ -386,7 +400,9 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
PersistentTasksInProgress.PersistentTaskInProgress<?> pTask = MlMetadata.getJobTask(jobId, tasks);
DiscoveryNode node = state.nodes().get(pTask.getExecutorNode());
JobState jobState = MlMetadata.getJobState(jobId, tasks);
Response.JobStats jobStats = new Response.JobStats(jobId, stats.get().v1(), stats.get().v2(), jobState, node);
String assignmentExplanation = pTask.getAssignment().getExplanation();
Response.JobStats jobStats = new Response.JobStats(jobId, stats.get().v1(), stats.get().v2(), jobState,
node, assignmentExplanation);
listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD));
} else {
listener.onResponse(new QueryPage<>(Collections.emptyList(), 0, Job.RESULTS_FIELD));
@ -410,7 +426,13 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
String jobId = jobIds.get(i);
gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> {
JobState jobState = MlMetadata.getJobState(request.jobId, tasks);
jobStats.set(slot, new Response.JobStats(jobId, dataCounts, modelSizeStats, jobState, null));
PersistentTasksInProgress.PersistentTaskInProgress<?> pTask = MlMetadata.getJobTask(jobId, tasks);
String assignmentExplanation = null;
if (pTask != null) {
assignmentExplanation = pTask.getAssignment().getExplanation();
}
jobStats.set(slot, new Response.JobStats(jobId, dataCounts, modelSizeStats, jobState, null,
assignmentExplanation));
if (counter.decrementAndGet() == 0) {
List<Response.JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList().stream()
@ -433,7 +455,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
}
static List<String> determineJobIdsWithoutLiveStats(List<String> requestedJobIds, List<Response.JobStats> stats) {
Set<String> excludeJobIds = stats.stream().map(s -> s.getJobId()).collect(Collectors.toSet());
Set<String> excludeJobIds = stats.stream().map(Response.JobStats::getJobId).collect(Collectors.toSet());
return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId)).collect(Collectors.toList());
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -61,6 +62,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa
import org.elasticsearch.xpack.persistent.TransportPersistentAction;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -256,7 +258,8 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
private final JobStateObserver observer;
private final ClusterService clusterService;
private final AutodetectProcessManager autodetectProcessManager;
private XPackLicenseState licenseState;
private final XPackLicenseState licenseState;
private final Auditor auditor;
private volatile int maxConcurrentJobAllocations;
@ -264,12 +267,13 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState,
PersistentActionService persistentActionService, PersistentActionRegistry persistentActionRegistry,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, AutodetectProcessManager autodetectProcessManager) {
ClusterService clusterService, AutodetectProcessManager autodetectProcessManager, Auditor auditor) {
super(settings, OpenJobAction.NAME, false, threadPool, transportService, persistentActionService,
persistentActionRegistry, actionFilters, indexNameExpressionResolver, Request::new, ThreadPool.Names.MANAGEMENT);
this.licenseState = licenseState;
this.clusterService = clusterService;
this.autodetectProcessManager = autodetectProcessManager;
this.auditor = auditor;
this.observer = new JobStateObserver(threadPool, clusterService);
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
clusterService.getClusterSettings()
@ -283,9 +287,10 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
// simply because there are no ml nodes in the cluster then we fail quickly here:
ClusterState clusterState = clusterService.state();
validate(request, clusterState);
if (selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger) == null) {
throw new ElasticsearchStatusException("no nodes available to open job [" + request.getJobId() + "]",
RestStatus.TOO_MANY_REQUESTS);
Assignment assignment = selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger);
if (assignment.getExecutorNode() == null) {
throw new ElasticsearchStatusException("cannot open job [" + request.getJobId() + "], no suitable nodes found, " +
"allocation explanation [{}]", RestStatus.TOO_MANY_REQUESTS, assignment.getExplanation());
}
ActionListener<PersistentActionResponse> finalListener =
@ -308,13 +313,9 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
@Override
public Assignment getAssignment(Request request, ClusterState clusterState) {
DiscoveryNode discoveryNode = selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger);
// TODO: Add proper explanation
if (discoveryNode == null) {
return NO_NODE_FOUND;
} else {
return new Assignment(discoveryNode.getId(), "");
}
Assignment assignment = selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger);
writeAssignmentNotification(request.getJobId(), assignment, clusterState);
return assignment;
}
@Override
@ -349,6 +350,27 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
this.maxConcurrentJobAllocations, maxConcurrentJobAllocations);
this.maxConcurrentJobAllocations = maxConcurrentJobAllocations;
}
private void writeAssignmentNotification(String jobId, Assignment assignment, ClusterState state) {
// Forking as this code is called from cluster state update thread:
// Should be ok as auditor uses index api which has its own tp
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("Failed to write assignment notification for job [" + jobId + "]", e);
}
@Override
protected void doRun() throws Exception {
if (assignment.getExecutorNode() == null) {
auditor.warning(jobId, "No node found to open job. Reasons [" + assignment.getExplanation() + "]");
} else {
DiscoveryNode node = state.nodes().get(assignment.getExecutorNode());
auditor.info(jobId, "Found node [" + node + "] to open job");
}
}
});
}
}
/**
@ -384,10 +406,14 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
}
}
static DiscoveryNode selectLeastLoadedMlNode(String jobId, ClusterState clusterState, int maxConcurrentJobAllocations,
static Assignment selectLeastLoadedMlNode(String jobId, ClusterState clusterState, int maxConcurrentJobAllocations,
Logger logger) {
if (verifyIndicesPrimaryShardsAreActive(logger, jobId, clusterState) == false) {
return null;
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(logger, jobId, clusterState);
if (unavailableIndices.size() != 0) {
String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
String.join(",", unavailableIndices) + "]";
logger.debug(reason);
return new Assignment(null, reason);
}
long maxAvailable = Long.MIN_VALUE;
@ -399,7 +425,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
String maxNumberOfOpenJobsStr = nodeAttributes.get(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey());
if (maxNumberOfOpenJobsStr == null) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node isn't a ml node.";
logger.debug(reason);
logger.trace(reason);
reasons.add(reason);
continue;
}
@ -425,7 +451,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
if (numberOfAllocatingJobs >= maxConcurrentJobAllocations) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because node exceeds [" + numberOfAllocatingJobs +
"] the maximum number of jobs [" + maxConcurrentJobAllocations + "] in opening state";
logger.debug(reason);
logger.trace(reason);
reasons.add(reason);
continue;
}
@ -436,7 +462,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node is full. " +
"Number of opened jobs [" + numberOfAssignedJobs + "], " + MAX_RUNNING_JOBS_PER_NODE.getKey() +
" [" + maxNumberOfOpenJobs + "]";
logger.debug(reason);
logger.trace(reason);
reasons.add(reason);
continue;
}
@ -447,11 +473,13 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
}
}
if (minLoadedNode != null) {
logger.info("selected node [{}] for job [{}]", minLoadedNode, jobId);
logger.debug("selected node [{}] for job [{}]", minLoadedNode, jobId);
return new Assignment(minLoadedNode.getId(), "");
} else {
logger.warn("no node selected for job [{}], reasons [{}]", jobId, String.join(",", reasons));
String explanation = String.join("|", reasons);
logger.debug("no node selected for job [{}], reasons [{}]", jobId, explanation);
return new Assignment(null, explanation);
}
return minLoadedNode;
}
static String[] indicesOfInterest(Job job) {
@ -459,10 +487,11 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
return new String[]{AnomalyDetectorsIndex.jobStateIndexName(), jobResultIndex, JobProvider.ML_META_INDEX};
}
static boolean verifyIndicesPrimaryShardsAreActive(Logger logger, String jobId, ClusterState clusterState) {
static List<String> verifyIndicesPrimaryShardsAreActive(Logger logger, String jobId, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
Job job = mlMetadata.getJobs().get(jobId);
String[] indices = indicesOfInterest(job);
List<String> unavailableIndices = new ArrayList<>(indices.length);
for (String index : indices) {
// Indices are created on demand from templates.
// It is not an error if the index doesn't exist yet
@ -471,10 +500,9 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
}
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index);
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
logger.warn("Not opening job [{}], because not all primary shards are active for the [{}] index.", jobId, index);
return false;
unavailableIndices.add(index);
}
}
return true;
return unavailableIndices;
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -45,6 +46,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
@ -276,17 +278,19 @@ public class StartDatafeedAction
private final DatafeedStateObserver observer;
private final DatafeedJobRunner datafeedJobRunner;
private XPackLicenseState licenseState;
private final XPackLicenseState licenseState;
private final Auditor auditor;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState,
PersistentActionService persistentActionService, PersistentActionRegistry persistentActionRegistry,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, DatafeedJobRunner datafeedJobRunner) {
ClusterService clusterService, DatafeedJobRunner datafeedJobRunner, Auditor auditor) {
super(settings, NAME, false, threadPool, transportService, persistentActionService, persistentActionRegistry,
actionFilters, indexNameExpressionResolver, Request::new, ThreadPool.Names.MANAGEMENT);
this.licenseState = licenseState;
this.datafeedJobRunner = datafeedJobRunner;
this.auditor = auditor;
this.observer = new DatafeedStateObserver(threadPool, clusterService);
}
@ -315,13 +319,9 @@ public class StartDatafeedAction
@Override
public Assignment getAssignment(Request request, ClusterState clusterState) {
DiscoveryNode discoveryNode = selectNode(logger, request.getDatafeedId(), clusterState);
// TODO: Add proper explanation
if (discoveryNode == null) {
return NO_NODE_FOUND;
} else {
return new Assignment(discoveryNode.getId(), "");
}
Assignment assignment = selectNode(logger, request.getDatafeedId(), clusterState);
writeAssignmentNotification(request.getDatafeedId(), assignment, clusterState);
return assignment;
}
@Override
@ -345,6 +345,31 @@ public class StartDatafeedAction
});
}
private void writeAssignmentNotification(String datafeedId, Assignment assignment, ClusterState state) {
// Forking as this code is called from cluster state update thread:
// Should be ok as auditor uses index api which has its own tp
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("Failed to write assignment notification for datafeed [" + datafeedId + "]", e);
}
@Override
protected void doRun() throws Exception {
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
String jobId = datafeed.getJobId();
if (assignment.getExecutorNode() == null) {
auditor.warning(jobId, "No node found to start datafeed [" + datafeedId +"]. Reasons [" +
assignment.getExplanation() + "]");
} else {
DiscoveryNode node = state.nodes().get(assignment.getExecutorNode());
auditor.info(jobId, "Found node [" + node + "] to start datafeed [" + datafeedId + "]");
}
}
});
}
}
static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks, DiscoveryNodes nodes) {
@ -383,7 +408,7 @@ public class StartDatafeedAction
}
}
public static DiscoveryNode selectNode(Logger logger, String datafeedId, ClusterState clusterState) {
public static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
@ -391,21 +416,24 @@ public class StartDatafeedAction
PersistentTaskInProgress<?> jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks);
if (jobTask == null) {
logger.debug("cannot start datafeed [{}], job task doesn't yet exist", datafeed.getId());
return null;
String reason = "cannot start datafeed [" + datafeed.getId() + "], job task doesn't yet exist";
logger.debug(reason);
return new Assignment(null, reason);
}
if (jobTask.needsReassignment(nodes)) {
logger.debug("cannot start datafeed [{}], job [{}] is unassigned or unassigned to a non existing node",
datafeed.getId(), datafeed.getJobId());
return null;
String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() +
"] is unassigned or unassigned to a non existing node";
logger.debug(reason);
return new Assignment(null, reason);
}
if (jobTask.getStatus() != JobState.OPENED) {
// lets try again later when the job has been opened:
logger.debug("cannot start datafeed [{}], because job's [{}] state is [{}] while state [{}] is required",
datafeed.getId(), datafeed.getJobId(), jobTask.getStatus(), JobState.OPENED);
return null;
String reason = "cannot start datafeed [" + datafeed.getId() + "], because job's [" + datafeed.getJobId() +
"] state is [" + jobTask.getStatus() + "] while state [" + JobState.OPENED + "] is required";
logger.debug(reason);
return new Assignment(null, reason);
}
return nodes.get(jobTask.getExecutorNode());
return new Assignment(jobTask.getExecutorNode(), "");
}
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
import java.time.Duration;
@ -75,9 +76,10 @@ public class DatafeedJobRunner extends AbstractComponent {
ClusterState state = clusterService.state();
// CS on master node can be ahead on the node where job and datafeed tasks run,
// so check again and fail if in case of unexpected cs. Persist tasks will retry later then.
if (StartDatafeedAction.selectNode(logger, datafeedId, state) == null) {
handler.accept(new ElasticsearchStatusException("Local cs [{}] isn't ready to start datafeed [{}] yet",
RestStatus.CONFLICT, state.getVersion(), datafeedId));
Assignment assignment = StartDatafeedAction.selectNode(logger, datafeedId, state);
if (assignment.getExecutorNode() == null) {
handler.accept(new ElasticsearchStatusException("cannot start datafeed [{}] yet, local cs [{}], allocation explanation [{}]",
RestStatus.CONFLICT, datafeedId, state.getVersion(), assignment.getExplanation()));
return;
}
logger.info("Attempt to start datafeed based on cluster state version [{}]", state.getVersion());

View File

@ -5,12 +5,16 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction.Response;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
@ -26,7 +30,15 @@ public class GetDatafeedStatsActionResponseTests extends AbstractStreamableTestC
String datafeedId = randomAsciiOfLength(10);
DatafeedState datafeedState = randomFrom(DatafeedState.values());
Response.DatafeedStats datafeedStats = new Response.DatafeedStats(datafeedId, datafeedState);
DiscoveryNode node = null;
if (randomBoolean()) {
node = new DiscoveryNode("_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT);
}
String explanation = null;
if (randomBoolean()) {
explanation = randomAsciiOfLength(3);
}
Response.DatafeedStats datafeedStats = new Response.DatafeedStats(datafeedId, datafeedState, node, explanation);
datafeedStatsList.add(datafeedStats);
}

View File

@ -48,7 +48,11 @@ public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase<R
if (randomBoolean()) {
node = new DiscoveryNode("_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT);
}
Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, jobState, node);
String explanation = null;
if (randomBoolean()) {
explanation = randomAsciiOfLength(3);
}
Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, jobState, node, explanation);
jobStatsList.add(jobStats);
}

View File

@ -23,7 +23,7 @@ public class GetJobsStatsActionTests extends ESTestCase {
assertEquals("id1", result.get(0));
result = determineJobIdsWithoutLiveStats(Collections.singletonList("id1"), Collections.singletonList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null)));
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null, null)));
assertEquals(0, result.size());
result = determineJobIdsWithoutLiveStats(
@ -36,23 +36,23 @@ public class GetJobsStatsActionTests extends ESTestCase {
result = determineJobIdsWithoutLiveStats(
Arrays.asList("id1", "id2", "id3"),
Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null,
JobState.CLOSED, null))
JobState.CLOSED, null, null))
);
assertEquals(2, result.size());
assertEquals("id2", result.get(0));
assertEquals("id3", result.get(1));
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.CLOSED, null)
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.CLOSED, null, null)
));
assertEquals(1, result.size());
assertEquals("id2", result.get(0));
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"),
Arrays.asList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null),
new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, JobState.CLOSED, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.CLOSED, null)));
Arrays.asList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null, null),
new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, JobState.CLOSED, null, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.CLOSED, null, null)));
assertEquals(0, result.size());
}

View File

@ -139,8 +139,8 @@ public class OpenJobActionTests extends ESTestCase {
metaData.putCustom(PersistentTasksInProgress.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, logger);
assertEquals("_node_id3", result.getId());
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, logger);
assertEquals("_node_id3", result.getExecutorNode());
}
public void testSelectLeastLoadedMlNode_maxCapacity() {
@ -157,8 +157,7 @@ public class OpenJobActionTests extends ESTestCase {
nodes.add(new DiscoveryNode("_node_name" + i, nodeId, address, nodeAttr, Collections.emptySet(), Version.CURRENT));
for (int j = 0; j < maxRunningJobsPerNode; j++) {
long id = j + (maxRunningJobsPerNode * i);
taskMap.put(id, new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request("job_id" + id),
false, true, new Assignment(nodeId, "test assignment")));
taskMap.put(id, createJobTask(id, "job_id" + id, nodeId, JobState.OPENED));
}
}
PersistentTasksInProgress tasks = new PersistentTasksInProgress(numNodes * maxRunningJobsPerNode, taskMap);
@ -171,8 +170,10 @@ public class OpenJobActionTests extends ESTestCase {
metaData.putCustom(PersistentTasksInProgress.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
assertNull(result);
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
assertNull(result.getExecutorNode());
assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode
+ "], max_running_jobs [" + maxRunningJobsPerNode + "]"));
}
public void testSelectLeastLoadedMlNode_noMlNodes() {
@ -196,8 +197,9 @@ public class OpenJobActionTests extends ESTestCase {
metaData.putCustom(PersistentTasksInProgress.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
assertNull(result);
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
assertTrue(result.getExplanation().contains("because this node isn't a ml node"));
assertNull(result.getExecutorNode());
}
public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
@ -230,8 +232,8 @@ public class OpenJobActionTests extends ESTestCase {
csBuilder.metaData(metaData);
ClusterState cs = csBuilder.build();
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger);
assertEquals("_node_id3", result.getId());
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger);
assertEquals("_node_id3", result.getExecutorNode());
PersistentTaskInProgress<OpenJobAction.Request> lastTask = createJobTask(5L, "job_id6", "_node_id3", JobState.OPENING);
taskMap.put(5L, lastTask);
@ -241,7 +243,8 @@ public class OpenJobActionTests extends ESTestCase {
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
assertNull("no node selected, because OPENING state", result);
assertNull("no node selected, because OPENING state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, false, new Assignment("_node_id3", "test assignment")));
tasks = new PersistentTasksInProgress(6L, taskMap);
@ -250,7 +253,8 @@ public class OpenJobActionTests extends ESTestCase {
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
assertNull("no node selected, because stale task", result);
assertNull("no node selected, because stale task", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, null));
tasks = new PersistentTasksInProgress(6L, taskMap);
@ -259,7 +263,8 @@ public class OpenJobActionTests extends ESTestCase {
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
assertNull("no node selected, because null state", result);
assertNull("no node selected, because null state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
}
public void testVerifyIndicesPrimaryShardsAreActive() {
@ -272,7 +277,7 @@ public class OpenJobActionTests extends ESTestCase {
csBuilder.metaData(metaData);
ClusterState cs = csBuilder.build();
assertTrue(OpenJobAction.verifyIndicesPrimaryShardsAreActive(logger, "job_id", cs));
assertEquals(0, OpenJobAction.verifyIndicesPrimaryShardsAreActive(logger, "job_id", cs).size());
metaData = new MetaData.Builder(cs.metaData());
routingTable = new RoutingTable.Builder(cs.routingTable());
@ -294,7 +299,9 @@ public class OpenJobActionTests extends ESTestCase {
csBuilder.routingTable(routingTable.build());
csBuilder.metaData(metaData);
assertFalse(OpenJobAction.verifyIndicesPrimaryShardsAreActive(logger, "job_id", csBuilder.build()));
List<String> result = OpenJobAction.verifyIndicesPrimaryShardsAreActive(logger, "job_id", csBuilder.build());
assertEquals(1, result.size());
assertEquals(indexToRemove, result.get(0));
}
public static PersistentTaskInProgress<OpenJobAction.Request> createJobTask(long id, String jobId, String nodeId, JobState jobState) {

View File

@ -58,8 +58,10 @@ public class StartDatafeedActionTests extends ESTestCase {
.putCustom(PersistentTasksInProgress.TYPE, tasks))
.nodes(nodes);
DiscoveryNode node = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertNull(node);
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertNull(result.getExecutorNode());
assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState +
"] while state [opened] is required", result.getExplanation());
task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED);
tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
@ -67,9 +69,8 @@ public class StartDatafeedActionTests extends ESTestCase {
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
.putCustom(PersistentTasksInProgress.TYPE, tasks))
.nodes(nodes);
node = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertNotNull(node);
assertEquals("node_id", node.getId());
result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertEquals("node_id", result.getExecutorNode());
}
public void testSelectNode_jobTaskStale() {
@ -92,8 +93,10 @@ public class StartDatafeedActionTests extends ESTestCase {
.putCustom(PersistentTasksInProgress.TYPE, tasks))
.nodes(nodes);
DiscoveryNode node = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertNull(node);
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertNull(result.getExecutorNode());
assertEquals("cannot start datafeed [datafeed_id], job [job_id] is unassigned or unassigned to a non existing node",
result.getExplanation());
task = createJobTask(0L, job.getId(), "node_id1", JobState.OPENED);
tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
@ -101,9 +104,8 @@ public class StartDatafeedActionTests extends ESTestCase {
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
.putCustom(PersistentTasksInProgress.TYPE, tasks))
.nodes(nodes);
node = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertNotNull(node);
assertEquals("node_id1", node.getId());
result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertEquals("node_id1", result.getExecutorNode());
}
public void testValidate() {

View File

@ -340,7 +340,8 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet());
assertEquals("no nodes available to open job [job_id]", e.getMessage());
assertTrue(e.getMessage().startsWith("cannot open job [job_id], no suitable nodes found, allocation explanation"));
assertTrue(e.getMessage().endsWith("because not all primary shards are active for the following indices [.ml-anomalies-job_id]]"));
logger.info("Start data node");
internalCluster().startNode(Settings.builder()

View File

@ -10,7 +10,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction.Response.DatafeedStats;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction.Response.JobStats;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
@ -24,7 +27,6 @@ import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@ -126,18 +128,21 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
PersistentTasksInProgress tasks = clusterState.metaData().custom(PersistentTasksInProgress.TYPE);
assertNotNull(tasks);
assertEquals(2, tasks.taskMap().size());
Collection<PersistentTaskInProgress<?>> taskCollection = tasks.findTasks(OpenJobAction.NAME, p -> true);
assertEquals(1, taskCollection.size());
PersistentTaskInProgress<?> task = taskCollection.iterator().next();
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
assertFalse(task.needsReassignment(clusterState.nodes()));
assertEquals(JobState.OPENED, task.getStatus());
}
taskCollection = tasks.findTasks(StartDatafeedAction.NAME, p -> true);
assertEquals(1, taskCollection.size());
task = taskCollection.iterator().next();
assertEquals(DatafeedState.STARTED, task.getStatus());
assertFalse(task.needsReassignment(clusterState.nodes()));
GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request("job_id");
JobStats jobStats = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet()
.getResponse().results().get(0);
assertEquals(JobState.OPENED, jobStats.getState());
assertNotNull(jobStats.getNode());
GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request("data_feed_id");
DatafeedStats datafeedStats = client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet()
.getResponse().results().get(0);
assertEquals(DatafeedState.STARTED, datafeedStats.getDatafeedState());
assertNotNull(datafeedStats.getNode());
});
long numDocs2 = randomIntBetween(2, 64);

View File

@ -19,8 +19,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManage
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import java.util.concurrent.ExecutionException;
public class TooManyJobsIT extends BaseMlIntegTestCase {
public void testCloseFailedJob() throws Exception {
@ -79,17 +77,17 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
try {
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
});
logger.info("Opened {}th job", i);
} catch (ExecutionException e) {
Exception cause = (Exception) e.getCause();
assertEquals(ElasticsearchStatusException.class, cause.getClass());
assertEquals("no nodes available to open job [" + i + "]", cause.getMessage());
} catch (ElasticsearchStatusException e) {
assertTrue(e.getMessage().startsWith("cannot open job [" + i + "], no suitable nodes found, allocation explanation"));
assertTrue(e.getMessage().endsWith("because this node is full. Number of opened jobs [" + maxNumberOfJobsPerNode +
"], max_running_jobs [" + maxNumberOfJobsPerNode + "]]"));
logger.info("good news everybody --> reached maximum number of allowed opened jobs, after trying to open the {}th job", i);
// close the first job and check if the latest job gets opened:

View File

@ -67,12 +67,35 @@ setup:
datafeed_id: datafeed-1
- match: { datafeeds.0.datafeed_id: "datafeed-1"}
- match: { datafeeds.0.state: "stopped"}
- is_false: datafeeds.0.node
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: datafeed-2
- match: { datafeeds.0.datafeed_id: "datafeed-2"}
- match: { datafeeds.0.state: "stopped"}
- is_false: datafeeds.0.node
---
"Test get stats for started datafeed":
- do:
xpack.ml.open_job:
job_id: job-1
- do:
xpack.ml.start_datafeed:
"datafeed_id": "datafeed-1"
"start": 0
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: datafeed-1
- match: { datafeeds.0.datafeed_id: "datafeed-1"}
- match: { datafeeds.0.state: "started"}
- is_true: datafeeds.0.node.name
- is_true: datafeeds.0.node.transport_address
- match: { datafeeds.0.node.attributes.max_running_jobs: "10"}
---
"Test explicit get all datafeed stats":