Make job stats api task aware.

This will allow the job stats api to redirect the request to node where job is running.

Original commit: elastic/x-pack-elasticsearch@9f1d12dfcb
This commit is contained in:
Martijn van Groningen 2017-01-24 23:00:59 +01:00
parent b07e9bbd07
commit ce6dc4a506
9 changed files with 275 additions and 149 deletions

View File

@ -7,14 +7,15 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -28,24 +29,24 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -74,12 +75,16 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
return new Response();
}
public static class Request extends ActionRequest {
public static class Request extends BaseTasksRequest<Request> {
private String jobId;
// used internally to expand _all jobid to encapsulate all jobs in cluster:
private List<String> expandedJobsIds;
public Request(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
this.expandedJobsIds = Collections.singletonList(jobId);
}
Request() {}
@ -88,6 +93,11 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
return jobId;
}
@Override
public boolean match(Task task) {
return jobId.equals(Job.ALL) || InternalOpenJobAction.JobTask.match(task, jobId);
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -97,12 +107,14 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
expandedJobsIds = in.readList(StreamInput::readString);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeStringList(expandedJobsIds);
}
@Override
@ -130,7 +142,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
}
}
public static class Response extends ActionResponse implements ToXContentObject {
public static class Response extends BaseTasksResponse implements ToXContentObject {
public static class JobStats implements ToXContent, Writeable {
private final String jobId;
@ -215,10 +227,19 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
private QueryPage<JobStats> jobsStats;
public Response(QueryPage<JobStats> jobsStats) {
super(Collections.emptyList(), Collections.emptyList());
this.jobsStats = jobsStats;
}
public Response() {}
Response(List<TaskOperationFailure> taskFailures, List<? extends FailedNodeException> nodeFailures,
QueryPage<JobStats> jobsStats) {
super(taskFailures, nodeFailures);
this.jobsStats = jobsStats;
}
public Response() {
super(Collections.emptyList(), Collections.emptyList());
}
public QueryPage<JobStats> getResponse() {
return jobsStats;
@ -278,80 +299,121 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
public static class TransportAction extends TransportTasksAction<InternalOpenJobAction.JobTask, Request, Response,
QueryPage<Response.JobStats>> {
private final ClusterService clusterService;
private final JobManager jobManager;
private final AutodetectProcessManager processManager;
private final JobProvider jobProvider;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager, AutodetectProcessManager processManager, JobProvider jobProvider) {
super(settings, GetJobsStatsAction.NAME, false, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
AutodetectProcessManager processManager, JobProvider jobProvider) {
super(settings, GetJobsStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, Response::new, ThreadPool.Names.MANAGEMENT);
this.clusterService = clusterService;
this.jobManager = jobManager;
this.processManager = processManager;
this.jobProvider = jobProvider;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
if (Job.ALL.equals(request.getJobId())) {
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
request.expandedJobsIds = mlMetadata.getJobs().keySet().stream().collect(Collectors.toList());
}
ActionListener<Response> finalListener = listener;
listener = ActionListener.wrap(response -> gatherStatsForClosedJobs(request, response, finalListener), listener::onFailure);
super.doExecute(task, request, listener);
}
@Override
protected Response newResponse(Request request, List<QueryPage<Response.JobStats>> tasks,
List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {
List<Response.JobStats> stats = new ArrayList<>();
for (QueryPage<Response.JobStats> task : tasks) {
stats.addAll(task.results());
}
return new Response(taskOperationFailures, failedNodeExceptions, new QueryPage<>(stats, stats.size(), Job.RESULTS_FIELD));
}
@Override
protected QueryPage<Response.JobStats> readTaskResponse(StreamInput in) throws IOException {
return new QueryPage<>(in, Response.JobStats::new);
}
@Override
protected boolean accumulateExceptions() {
return true;
}
@Override
protected void taskOperation(Request request, InternalOpenJobAction.JobTask task,
ActionListener<QueryPage<Response.JobStats>> listener) {
logger.debug("Get stats for job '{}'", request.getJobId());
ClusterState clusterState = clusterService.state();
QueryPage<Job> jobs = jobManager.getJob(request.getJobId(), clusterState);
if (jobs.count() == 0) {
listener.onResponse(new GetJobsStatsAction.Response(new QueryPage<>(Collections.emptyList(), 0, Job.RESULTS_FIELD)));
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
List<Response.JobStats> stats = processManager.getStatistics(request.getJobId()).map(t -> {
String jobId = t.v1().getJobid();
return new Response.JobStats(jobId, t.v1(), t.v2(), mlMetadata.getAllocations().get(jobId).getStatus());
}).collect(Collectors.toList());
listener.onResponse(new QueryPage<>(stats, stats.size(), Job.RESULTS_FIELD));
}
// Up until now we gathered the stats for jobs that were open,
// This method will fetch the stats for missing jobs, that was stored in the jobs index
void gatherStatsForClosedJobs(Request request, Response response, ActionListener<Response> listener) {
List<String> jobIds = determineJobIdsWithoutLiveStats(request.expandedJobsIds, response.jobsStats.results());
if (jobIds.isEmpty()) {
listener.onResponse(response);
return;
}
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
AtomicInteger counter = new AtomicInteger(0);
AtomicArray<Response.JobStats> jobsStats = new AtomicArray<>(jobs.results().size());
for (int i = 0; i < jobs.results().size(); i++) {
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
AtomicInteger counter = new AtomicInteger(jobIds.size());
AtomicArray<Response.JobStats> jobStats = new AtomicArray<>(jobIds.size());
for (int i = 0; i < jobIds.size(); i++) {
int slot = i;
Job job = jobs.results().get(slot);
gatherDataCountsAndModelSizeStats(job.getId(), (dataCounts, modelSizeStats) -> {
JobStatus status = mlMetadata.getAllocations().get(job.getId()).getStatus();
jobsStats.setOnce(slot, new Response.JobStats(job.getId(), dataCounts, modelSizeStats, status));
if (counter.incrementAndGet() == jobsStats.length()) {
List<Response.JobStats> results =
jobsStats.asList().stream().map(entry -> entry.value).collect(Collectors.toList());
QueryPage<Response.JobStats> jobsStatsPage = new QueryPage<>(results, results.size(), Job.RESULTS_FIELD);
listener.onResponse(new GetJobsStatsAction.Response(jobsStatsPage));
String jobId = jobIds.get(i);
gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> {
JobStatus jobStatus = mlMetadata.getAllocations().get(jobId).getStatus();
jobStats.set(slot, new Response.JobStats(jobId, dataCounts, modelSizeStats, jobStatus));
if (counter.decrementAndGet() == 0) {
List<Response.JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList().stream()
.map(e -> e.value)
.collect(Collectors.toList()));
listener.onResponse(new Response(response.getTaskFailures(), response.getNodeFailures(),
new QueryPage<>(results, results.size(), Job.RESULTS_FIELD)));
}
}, listener::onFailure);
}
}
private void gatherDataCountsAndModelSizeStats(String jobId, BiConsumer<DataCounts, ModelSizeStats> handler,
void gatherDataCountsAndModelSizeStats(String jobId, BiConsumer<DataCounts, ModelSizeStats> handler,
Consumer<Exception> errorHandler) {
readDataCounts(jobId, dataCounts -> {
readModelSizeStats(jobId, modelSizeStats -> {
jobProvider.dataCounts(jobId, dataCounts -> {
jobProvider.modelSizeStats(jobId, modelSizeStats -> {
handler.accept(dataCounts, modelSizeStats);
}, errorHandler);
}, errorHandler);
}
private void readDataCounts(String jobId, Consumer<DataCounts> handler, Consumer<Exception> errorHandler) {
Optional<DataCounts> counts = processManager.getDataCounts(jobId);
if (counts.isPresent()) {
handler.accept(counts.get());
} else {
jobProvider.dataCounts(jobId, handler, errorHandler);
static List<String> determineJobIdsWithoutLiveStats(List<String> requestedJobIds, List<Response.JobStats> stats) {
List<String> jobIds = new ArrayList<>();
outer: for (String jobId : requestedJobIds) {
for (Response.JobStats stat : stats) {
if (stat.getJobid().equals(jobId)) {
// we already have stats, no need to get stats for this job from an index
continue outer;
}
}
jobIds.add(jobId);
}
return jobIds;
}
private void readModelSizeStats(String jobId, Consumer<ModelSizeStats> handler, Consumer<Exception> errorHandler) {
Optional<ModelSizeStats> sizeStats = processManager.getModelSizeStats(jobId);
if (sizeStats.isPresent()) {
handler.accept(sizeStats.get());
} else {
jobProvider.modelSizeStats(jobId, handler, errorHandler);
}
}
}
}

View File

@ -8,22 +8,23 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.Closeable;
@ -63,7 +64,7 @@ public class AutodetectCommunicator implements Closeable {
AnalysisConfig analysisConfig = job.getAnalysisConfig();
boolean usePerPartitionNormalization = analysisConfig.getUsePerPartitionNormalization();
autoDetectExecutor.execute(() ->
autoDetectResultProcessor.process(job.getId(), process.getProcessOutStream(), usePerPartitionNormalization)
autoDetectResultProcessor.process(process.getProcessOutStream(), usePerPartitionNormalization)
);
autoDetectExecutor.execute(() ->
stateProcessor.process(job.getId(), process.getPersistStream())
@ -152,20 +153,20 @@ public class AutodetectCommunicator implements Closeable {
return autodetectProcess.getProcessStartTime();
}
public Optional<ModelSizeStats> getModelSizeStats() {
public ModelSizeStats getModelSizeStats() {
return autoDetectResultProcessor.modelSizeStats();
}
public Optional<DataCounts> getDataCounts() {
return Optional.ofNullable(dataCountsReporter.runningTotalStats());
public DataCounts getDataCounts() {
return dataCountsReporter.runningTotalStats();
}
private <T> T checkAndRun(Supplier<String> errorMessage, Callback<T> callback, boolean wait) throws IOException {
private <T> T checkAndRun(Supplier<String> errorMessage, CheckedSupplier<T, IOException> callback, boolean wait) throws IOException {
CountDownLatch latch = new CountDownLatch(1);
if (inUse.compareAndSet(null, latch)) {
try {
checkProcessIsAlive();
return callback.run();
return callback.get();
} finally {
latch.countDown();
inUse.set(null);
@ -182,17 +183,11 @@ public class AutodetectCommunicator implements Closeable {
}
}
checkProcessIsAlive();
return callback.run();
return callback.get();
} else {
throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS);
}
}
}
private interface Callback<T> {
T run() throws IOException;
}
}

View File

@ -9,6 +9,7 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -46,7 +47,7 @@ import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Locale;
import java.util.Optional;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -55,6 +56,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class AutodetectProcessManager extends AbstractComponent {
@ -234,7 +236,7 @@ public class AutodetectProcessManager extends AbstractComponent {
normalizerFactory);
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdator,
threadPool.executor(MlPlugin.THREAD_POOL_NAME), job.getAnalysisConfig().getUsePerPartitionNormalization());
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, jobResultsPersister, parser);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(jobId, renormalizer, jobResultsPersister, parser);
AutodetectProcess process = null;
try {
@ -336,21 +338,12 @@ public class AutodetectProcessManager extends AbstractComponent {
client.execute(UpdateJobStatusAction.INSTANCE, request, ActionListener.wrap(r -> handler.accept(null), errorHandler));
}
public Optional<ModelSizeStats> getModelSizeStats(String jobId) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
return Optional.empty();
}
return communicator.getModelSizeStats();
}
public Optional<DataCounts> getDataCounts(String jobId) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
return Optional.empty();
}
return communicator.getDataCounts();
public Stream<Tuple<DataCounts, ModelSizeStats>> getStatistics(String jobId) {
return autoDetectCommunicatorByJob.entrySet().stream()
.filter(entry -> jobId.equals(entry.getKey()))
.map(Map.Entry::getValue)
.map(autodetectCommunicator -> {
return new Tuple<>(autodetectCommunicator.getDataCounts(), autodetectCommunicator.getModelSizeStats());
});
}
}

View File

@ -8,11 +8,11 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.job.results.Bucket;
@ -25,13 +25,12 @@ import java.io.InputStream;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;
/**
* A runnable class that reads the autodetect process output in the
* {@link #process(String, InputStream, boolean)} method and persists parsed
* {@link #process(InputStream, boolean)} method and persists parsed
* results via the {@linkplain JobResultsPersister} passed in the constructor.
* <p>
* Has methods to register and remove alert observers.
@ -50,6 +49,7 @@ public class AutoDetectResultProcessor {
private static final Logger LOGGER = Loggers.getLogger(AutoDetectResultProcessor.class);
private final String jobId;
private final Renormalizer renormalizer;
private final JobResultsPersister persister;
private final AutodetectResultsParser parser;
@ -59,19 +59,24 @@ public class AutoDetectResultProcessor {
private volatile ModelSizeStats latestModelSizeStats;
public AutoDetectResultProcessor(Renormalizer renormalizer, JobResultsPersister persister, AutodetectResultsParser parser) {
this(renormalizer, persister, parser, new FlushListener());
public AutoDetectResultProcessor(String jobId, Renormalizer renormalizer, JobResultsPersister persister,
AutodetectResultsParser parser) {
this(jobId, renormalizer, persister, parser, new FlushListener());
}
AutoDetectResultProcessor(Renormalizer renormalizer, JobResultsPersister persister, AutodetectResultsParser parser,
AutoDetectResultProcessor(String jobId, Renormalizer renormalizer, JobResultsPersister persister, AutodetectResultsParser parser,
FlushListener flushListener) {
this.jobId = jobId;
this.renormalizer = renormalizer;
this.persister = persister;
this.parser = parser;
this.flushListener = flushListener;
ModelSizeStats.Builder builder = new ModelSizeStats.Builder(jobId);
latestModelSizeStats = builder.build();
}
public void process(String jobId, InputStream in, boolean isPerPartitionNormalization) {
public void process(InputStream in, boolean isPerPartitionNormalization) {
try (Stream<AutodetectResult> stream = parser.parseResults(in)) {
int bucketCount = 0;
Iterator<AutodetectResult> iterator = stream.iterator();
@ -209,8 +214,8 @@ public class AutoDetectResultProcessor {
}
}
public Optional<ModelSizeStats> modelSizeStats() {
return Optional.ofNullable(latestModelSizeStats);
public ModelSizeStats modelSizeStats() {
return latestModelSizeStats;
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.xpack.ml.action.GetJobsStatsAction.TransportAction.determineJobIdsWithoutLiveStats;
public class GetJobsStatsActionTests extends ESTestCase {
public void testDetermineJobIds() {
List<String> result = determineJobIdsWithoutLiveStats(Collections.singletonList("id1"), Collections.emptyList());
assertEquals(1, result.size());
assertEquals("id1", result.get(0));
result = determineJobIdsWithoutLiveStats(Collections.singletonList("id1"), Collections.singletonList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobStatus.CLOSED)));
assertEquals(0, result.size());
result = determineJobIdsWithoutLiveStats(
Arrays.asList("id1", "id2", "id3"), Collections.emptyList());
assertEquals(3, result.size());
assertEquals("id1", result.get(0));
assertEquals("id2", result.get(1));
assertEquals("id3", result.get(2));
result = determineJobIdsWithoutLiveStats(
Arrays.asList("id1", "id2", "id3"),
Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobStatus.CLOSED))
);
assertEquals(2, result.size());
assertEquals("id2", result.get(0));
assertEquals("id3", result.get(1));
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobStatus.CLOSED),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobStatus.CLOSED)
));
assertEquals(1, result.size());
assertEquals("id2", result.get(0));
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"),
Arrays.asList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobStatus.CLOSED),
new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, JobStatus.CLOSED),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobStatus.CLOSED)));
assertEquals(0, result.size());
}
}

View File

@ -11,23 +11,23 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.process.normalizer.noop.NoOpRenormalizer;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.BucketTests;
@ -62,7 +62,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
private JobProvider jobProvider;
@Before
private void createComponents() {
public void createComponents() {
renormalizer = new NoOpRenormalizer();
jobResultsPersister = new JobResultsPersister(nodeSettings(), client());
autodetectResultsParser = new AutodetectResultsParser(nodeSettings());
@ -73,7 +73,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
createJob();
AutoDetectResultProcessor resultProcessor =
new AutoDetectResultProcessor(renormalizer, jobResultsPersister, autodetectResultsParser);
new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
@ -108,7 +108,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
}
}).start();
resultProcessor.process(JOB_ID, inputStream, false);
resultProcessor.process(inputStream, false);
jobResultsPersister.commitResultWrites(JOB_ID);
BucketsQueryBuilder.BucketsQuery bucketsQuery = new BucketsQueryBuilder().includeInterim(true).build();
@ -149,7 +149,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
createJob();
AutoDetectResultProcessor resultProcessor =
new AutoDetectResultProcessor(renormalizer, jobResultsPersister, autodetectResultsParser);
new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
@ -173,7 +173,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
}
}).start();
resultProcessor.process(JOB_ID, inputStream, false);
resultProcessor.process(inputStream, false);
jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());
@ -194,7 +194,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
createJob();
AutoDetectResultProcessor resultProcessor =
new AutoDetectResultProcessor(renormalizer, jobResultsPersister, autodetectResultsParser);
new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
@ -222,7 +222,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
}
}).start();
resultProcessor.process(JOB_ID, inputStream, false);
resultProcessor.process(inputStream, false);
jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());
@ -240,7 +240,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
createJob();
AutoDetectResultProcessor resultProcessor =
new AutoDetectResultProcessor(renormalizer, jobResultsPersister, autodetectResultsParser);
new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
@ -263,7 +263,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
}
}).start();
resultProcessor.process(JOB_ID, inputStream, false);
resultProcessor.process(inputStream, false);
jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());

View File

@ -6,11 +6,11 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.output;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.job.results.Bucket;
@ -28,8 +28,8 @@ import java.util.List;
import java.util.stream.Stream;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -54,8 +54,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, parser);
processor.process(JOB_ID, mock(InputStream.class), randomBoolean());
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, parser);
processor.process(mock(InputStream.class), randomBoolean());
verify(renormalizer, times(1)).waitUntilIdle();
assertEquals(0, processor.completionLatch.getCount());
}
@ -66,7 +66,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -88,7 +88,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = true;
@ -110,7 +110,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false, bulkBuilder);
context.deleteInterimRequired = false;
@ -131,7 +131,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", true, bulkBuilder);
context.deleteInterimRequired = false;
@ -155,7 +155,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -176,7 +176,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -196,7 +196,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
FlushListener flushListener = mock(FlushListener.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null, flushListener);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null, flushListener);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -218,7 +218,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
FlushListener flushListener = mock(FlushListener.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null, flushListener);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null, flushListener);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -244,7 +244,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -261,7 +261,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -272,14 +272,14 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verify(persister, times(1)).persistModelSizeStats(modelSizeStats);
verifyNoMoreInteractions(persister);
assertEquals(modelSizeStats, processor.modelSizeStats().get());
assertEquals(modelSizeStats, processor.modelSizeStats());
}
public void testProcessResult_modelSnapshot() {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -296,7 +296,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;

View File

@ -52,20 +52,6 @@ setup:
"types":["response"]
}
- do:
index:
index: .ml-anomalies-job-stats-test
type: result
id: job-stats-test-model_size_stats
body: {
"job_id": "job-stats-test",
"result_type": "model_size_stats",
"log_time": 1480896000000,
"timestamp": 1480896000000,
"model_bytes": 100,
"total_by_field_count": 2
}
---
"Test get job stats after uploading data prompting the creation of some stats":
@ -89,9 +75,34 @@ setup:
- match: { jobs.0.data_counts.processed_record_count: 2 }
- match: { jobs.0.data_counts.processed_field_count: 4}
- match: { jobs.0.data_counts.input_field_count: 4 }
- match: { jobs.0.model_size_stats.model_bytes: 100 }
- match: { jobs.0.model_size_stats.model_bytes: 0 }
- match: { jobs.0.status: OPENED }
---
"Test get job stats for closed job":
- do:
xpack.ml.post_data:
job_id: job-stats-test
body: >
{"airline":"AAL","responsetime":"132.2046","time":"1403481600"}
{"airline":"JZA","responsetime":"990.4628","time":"1403481600"}
- do:
xpack.ml.close_job:
job_id: job-stats-test
- match: { closed: true }
- do:
xpack.ml.get_job_stats:
job_id: job-stats-test
- match: { jobs.0.job_id : job-stats-test }
- match: { jobs.0.data_counts.processed_record_count: 2 }
- match: { jobs.0.data_counts.processed_field_count: 4}
- match: { jobs.0.data_counts.input_field_count: 4 }
- match: { jobs.0.model_size_stats.model_bytes: 9105 }
- match: { jobs.0.status: CLOSED }
---
"Test get job stats of datafeed job that has not received and data":
@ -100,7 +111,7 @@ setup:
job_id: datafeed-job
- match: { jobs.0.job_id : datafeed-job }
- match: { jobs.0.data_counts.processed_record_count: 0 }
- is_false: jobs.0.model_size_stats
- match: { jobs.0.model_size_stats.model_bytes : 0 }
- match: { jobs.0.status: OPENED }
---

View File

@ -16,6 +16,7 @@ import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -63,10 +64,11 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("closed", true), responseEntityToMap(response));
response = client().performRequest("get", "/.ml-anomalies-" + jobId + "/data_counts/" + jobId + "-data-counts");
response = client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
assertEquals(200, response.getStatusLine().getStatusCode());
@SuppressWarnings("unchecked")
Map<String, Object> dataCountsDoc = (Map<String, Object>) responseEntityToMap(response).get("_source");
Map<String, Object> dataCountsDoc = (Map<String, Object>)
((Map)((List) responseEntityToMap(response).get("jobs")).get(0)).get("data_counts");
assertEquals(2, dataCountsDoc.get("processed_record_count"));
assertEquals(4, dataCountsDoc.get("processed_field_count"));
assertEquals(177, dataCountsDoc.get("input_bytes"));