[ML] Added the node the job is running on to the response of the job stats api.

Original commit: elastic/x-pack-elasticsearch@d92149ca59
This commit is contained in:
Martijn van Groningen 2017-02-23 17:44:38 +01:00
parent 405230c308
commit 9f48a1f677
4 changed files with 60 additions and 17 deletions

View File

@ -16,7 +16,9 @@ import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -48,6 +50,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@ -64,6 +67,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
private static final String DATA_COUNTS = "data_counts";
private static final String MODEL_SIZE_STATS = "model_size_stats";
private static final String STATE = "state";
private static final String NODE = "node";
private GetJobsStatsAction() {
super(NAME);
@ -154,12 +158,15 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
@Nullable
private ModelSizeStats modelSizeStats;
private JobState state;
@Nullable
private DiscoveryNode node;
JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobState state) {
JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobState state, DiscoveryNode node) {
this.jobId = Objects.requireNonNull(jobId);
this.dataCounts = Objects.requireNonNull(dataCounts);
this.modelSizeStats = modelSizeStats;
this.state = Objects.requireNonNull(state);
this.node = node;
}
JobStats(StreamInput in) throws IOException {
@ -167,6 +174,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
dataCounts = new DataCounts(in);
modelSizeStats = in.readOptionalWriteable(ModelSizeStats::new);
state = JobState.fromStream(in);
node = in.readOptionalWriteable(DiscoveryNode::new);
}
public String getJobId() {
@ -185,6 +193,10 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
return state;
}
public DiscoveryNode getNode() {
return node;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -194,8 +206,21 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
builder.field(MODEL_SIZE_STATS, modelSizeStats);
}
builder.field(STATE, state.toString());
builder.endObject();
if (node != null) {
builder.startObject(NODE);
builder.field("id", node.getId());
builder.field("name", node.getName());
builder.field("ephemeral_id", node.getEphemeralId());
builder.field("transport_address", node.getAddress().toString());
builder.startObject("attributes");
for (Map.Entry<String, String> entry : node.getAttributes().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
builder.endObject();
}
builder.endObject();
return builder;
}
@ -205,11 +230,12 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
dataCounts.writeTo(out);
out.writeOptionalWriteable(modelSizeStats);
state.writeTo(out);
out.writeOptionalWriteable(node);
}
@Override
public int hashCode() {
return Objects.hash(jobId, dataCounts, modelSizeStats, state);
return Objects.hash(jobId, dataCounts, modelSizeStats, state, node);
}
@Override
@ -224,7 +250,8 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
return Objects.equals(jobId, other.jobId)
&& Objects.equals(this.dataCounts, other.dataCounts)
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
&& Objects.equals(this.state, other.state);
&& Objects.equals(this.state, other.state)
&& Objects.equals(this.node, other.node);
}
}
@ -348,11 +375,14 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
ActionListener<QueryPage<Response.JobStats>> listener) {
String jobId = task.getJobId();
logger.debug("Get stats for job '{}'", jobId);
PersistentTasksInProgress tasks = clusterService.state().getMetaData().custom(PersistentTasksInProgress.TYPE);
ClusterState state = clusterService.state();
PersistentTasksInProgress tasks = state.getMetaData().custom(PersistentTasksInProgress.TYPE);
Optional<Tuple<DataCounts, ModelSizeStats>> stats = processManager.getStatistics(jobId);
if (stats.isPresent()) {
PersistentTasksInProgress.PersistentTaskInProgress<?> pTask = MlMetadata.getJobTask(jobId, tasks);
DiscoveryNode node = state.nodes().get(pTask.getExecutorNode());
JobState jobState = MlMetadata.getJobState(jobId, tasks);
Response.JobStats jobStats = new Response.JobStats(jobId, stats.get().v1(), stats.get().v2(), jobState);
Response.JobStats jobStats = new Response.JobStats(jobId, stats.get().v1(), stats.get().v2(), jobState, node);
listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD));
} else {
listener.onResponse(new QueryPage<>(Collections.emptyList(), 0, Job.RESULTS_FIELD));
@ -376,7 +406,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
String jobId = jobIds.get(i);
gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> {
JobState jobState = MlMetadata.getJobState(request.jobId, tasks);
jobStats.set(slot, new Response.JobStats(jobId, dataCounts, modelSizeStats, jobState));
jobStats.set(slot, new Response.JobStats(jobId, dataCounts, modelSizeStats, jobState, null));
if (counter.decrementAndGet() == 0) {
List<Response.JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList().stream()

View File

@ -5,15 +5,19 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction.Response;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
import org.joda.time.DateTime;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@ -40,7 +44,11 @@ public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase<R
}
JobState jobState = randomFrom(EnumSet.allOf(JobState.class));
Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, jobState);
DiscoveryNode node = null;
if (randomBoolean()) {
node = new DiscoveryNode("_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT);
}
Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, jobState, node);
jobStatsList.add(jobStats);
}

View File

@ -23,7 +23,7 @@ public class GetJobsStatsActionTests extends ESTestCase {
assertEquals("id1", result.get(0));
result = determineJobIdsWithoutLiveStats(Collections.singletonList("id1"), Collections.singletonList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED)));
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null)));
assertEquals(0, result.size());
result = determineJobIdsWithoutLiveStats(
@ -35,23 +35,24 @@ public class GetJobsStatsActionTests extends ESTestCase {
result = determineJobIdsWithoutLiveStats(
Arrays.asList("id1", "id2", "id3"),
Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED))
Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null,
JobState.CLOSED, null))
);
assertEquals(2, result.size());
assertEquals("id2", result.get(0));
assertEquals("id3", result.get(1));
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.CLOSED)
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.CLOSED, null)
));
assertEquals(1, result.size());
assertEquals("id2", result.get(0));
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"),
Arrays.asList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED),
new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, JobState.CLOSED),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.CLOSED)));
Arrays.asList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null),
new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, JobState.CLOSED, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.CLOSED, null)));
assertEquals(0, result.size());
}

View File

@ -77,6 +77,9 @@ setup:
- match: { jobs.0.data_counts.input_field_count: 4 }
- match: { jobs.0.model_size_stats.model_bytes: 0 }
- match: { jobs.0.state: opened }
- is_true: jobs.0.node.name
- is_true: jobs.0.node.transport_address
- match: { jobs.0.node.attributes.max_running_jobs: "10"}
---
"Test get job stats for closed job":
@ -107,6 +110,7 @@ setup:
- match: { jobs.0.data_counts.input_field_count: 4 }
- gt: { jobs.0.model_size_stats.model_bytes: 0 }
- match: { jobs.0.state: closed }
- is_false: jobs.0.node
---
"Test get job stats of datafeed job that has not received and data":