diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java index 7785e146420..383c031ab84 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; @@ -30,13 +31,18 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; import org.elasticsearch.xpack.ml.job.results.Forecast; +import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats; +import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats.ForecastRequestStatus; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; +import java.util.List; import java.util.Objects; +import java.util.function.Consumer; import static org.elasticsearch.xpack.ml.action.ForecastJobAction.Request.DURATION; @@ -250,13 +256,15 @@ public class ForecastJobAction extends Action { + private final JobProvider jobProvider; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - AutodetectProcessManager processManager) { + JobProvider jobProvider, AutodetectProcessManager processManager) { super(settings, ForecastJobAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, ForecastJobAction.Request::new, ForecastJobAction.Response::new, ThreadPool.Names.SAME, processManager); + this.jobProvider = jobProvider; // ThreadPool.Names.SAME, because operations is executed by autodetect worker thread } @@ -286,7 +294,26 @@ public class ForecastJobAction extends Action { if (e == null) { - listener.onResponse(new Response(true, params.getForecastId())); + Consumer forecastRequestStatsHandler = forecastRequestStats -> { + if (forecastRequestStats == null) { + // paranoia case, it should not happen that we do not retrieve a result + listener.onFailure(new ElasticsearchException("Cannot run forecast: internal error, please check the logs")); + } else if (forecastRequestStats.getStatus() == ForecastRequestStatus.FAILED) { + List messages = forecastRequestStats.getMessages(); + if (messages.size() > 0) { + listener.onFailure(ExceptionsHelper.badRequestException("Cannot run forecast: " + messages.get(0))); + } else { + // paranoia case, it should not be possible to have an empty message list + listener.onFailure( + new ElasticsearchException("Cannot run forecast: internal error, please check the logs")); + } + } else { + listener.onResponse(new Response(true, params.getForecastId())); + } + }; + + jobProvider.getForecastRequestStats(request.getJobId(), params.getForecastId(), forecastRequestStatsHandler, + listener::onFailure); } else { listener.onFailure(e); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index e1dcbeb9b15..93212196f74 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -50,6 +50,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; @@ -85,6 +86,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; +import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; import org.elasticsearch.xpack.ml.job.results.Result; @@ -475,6 +477,18 @@ public class JobProvider { } } + private T parseGetHit(GetResponse getResponse, BiFunction objectParser, + Consumer errorHandler) { + BytesReference source = getResponse.getSourceAsBytesRef(); + + try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, source)) { + return objectParser.apply(parser, null); + } catch (IOException e) { + errorHandler.accept(new ElasticsearchParseException("failed to parse " + getResponse.getType(), e)); + return null; + } + } + public static IndicesOptions addIgnoreUnavailable(IndicesOptions indicesOptions) { return IndicesOptions.fromOptions(true, indicesOptions.allowNoIndices(), indicesOptions.expandWildcardsOpen(), indicesOptions.expandWildcardsClosed(), indicesOptions); @@ -904,6 +918,19 @@ public class JobProvider { ), client::search); } + private void getResult(String jobId, String resultDescription, GetRequest get, BiFunction objectParser, + Consumer> handler, Consumer errorHandler, Supplier notFoundSupplier) { + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, get, ActionListener.wrap(getDocResponse -> { + if (getDocResponse.isExists()) { + handler.accept(new Result<>(getDocResponse.getIndex(), parseGetHit(getDocResponse, objectParser, errorHandler))); + } else { + LOGGER.trace("No {} for job with id {}", resultDescription, jobId); + handler.accept(new Result<>(null, notFoundSupplier.get())); + } + }, errorHandler), client::get); + } + private SearchRequestBuilder createLatestModelSizeStatsSearch(String indexName) { return client.prepareSearch(indexName) .setSize(1) @@ -1043,6 +1070,16 @@ public class JobProvider { , client::search); } + public void getForecastRequestStats(String jobId, String forecastId, Consumer handler, + Consumer errorHandler) { + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + GetRequest getRequest = new GetRequest(indexName, ElasticsearchMappings.DOC_TYPE, + ForecastRequestStats.documentId(jobId, forecastId)); + + getResult(jobId, ForecastRequestStats.RESULTS_FIELD.getPreferredName(), getRequest, ForecastRequestStats.PARSER, + result -> handler.accept(result.result), errorHandler, () -> null); + } + public void updateCalendar(String calendarId, Set jobIdsToAdd, Set jobIdsToRemove, Consumer handler, Consumer errorHandler) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 17f672389da..b3d044b1807 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -42,6 +42,7 @@ import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -238,10 +239,25 @@ public class AutodetectCommunicator implements Closeable { } public void forecastJob(ForecastParams params, BiConsumer handler) { + BiConsumer forecastConsumer = (aVoid, e) -> { + if (e == null) { + FlushJobParams flushParams = FlushJobParams.builder().build(); + flushJob(flushParams, (flushAcknowledgement, flushException) -> { + if (flushException != null) { + String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", job.getId()); + handler.accept(null, ExceptionsHelper.serverError(msg, e)); + } else { + handler.accept(null, null); + } + }); + } else { + handler.accept(null, e); + } + }; submitOperation(() -> { autodetectProcess.forecastJob(params); return null; - }, handler); + }, forecastConsumer); } @Nullable diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index b1c2ebb0528..79db136f0bb 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Forecast; import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats; +import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats.ForecastRequestStatus; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; @@ -224,14 +225,18 @@ public class AutoDetectResultProcessor { LOGGER.trace("Received Forecast Stats [{}]", forecastRequestStats.getId()); context.bulkResultsPersister.persistForecastRequestStats(forecastRequestStats); - double forecastProgress = forecastRequestStats.getProgress(); - - // persist if progress is 0 (probably some error condition) or 1 (finished), + // execute the bulk request only in some cases or in doubt // otherwise rely on the count-based trigger - if (forecastProgress == 0.0 || forecastProgress >= 1.0) { - // if forecast stats progress is 1.0 it marks the end of a forecast, - // therefore commit whatever we have - context.bulkResultsPersister.executeRequest(); + switch (forecastRequestStats.getStatus()) { + case OK: + case STARTED: + break; + case FAILED: + case SCHEDULED: + case FINISHED: + default: + context.bulkResultsPersister.executeRequest(); + } } ModelSizeStats modelSizeStats = result.getModelSizeStats(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java index 5bd9a9f90e6..475fec1bcab 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java @@ -198,6 +198,10 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { return forecastId; } + public static String documentId(String jobId, String forecastId) { + return jobId + "_model_forecast_request_stats_" + forecastId; + } + /** * Return the document ID used for indexing. As there is 1 and only 1 document * per forecast request, the id has no dynamic parts. @@ -205,7 +209,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { * @return id */ public String getId() { - return jobId + "_model_forecast_request_stats_" + forecastId; + return documentId(jobId, forecastId); } public void setRecordCount(long recordCount) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java index 15cbb6a5fdd..e29d9d1642c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; @@ -16,11 +17,13 @@ import org.elasticsearch.xpack.ml.job.results.Forecast; import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats; import org.junit.After; +import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.stream.Collectors; @@ -156,6 +159,96 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase { equalTo("[duration] must be greater or equal to the bucket span: [10m/1h]")); } + public void testNoData() throws Exception { + Detector.Builder detector = new Detector.Builder("mean", "value"); + + TimeValue bucketSpan = TimeValue.timeValueMinutes(1); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); + analysisConfig.setBucketSpan(bucketSpan); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeFormat("epoch"); + Job.Builder job = new Job.Builder("forecast-it-test-no-data"); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + + registerJob(job); + putJob(job); + openJob(job.getId()); + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> forecast(job.getId(), TimeValue.timeValueMinutes(120), null)); + assertThat(e.getMessage(), + equalTo("Cannot run forecast: Forecast cannot be executed as job requires data to have been processed and modeled")); + } + + public void testMemoryStatus() throws Exception { + Detector.Builder detector = new Detector.Builder("mean", "value"); + detector.setByFieldName("clientIP"); + + TimeValue bucketSpan = TimeValue.timeValueHours(1); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); + analysisConfig.setBucketSpan(bucketSpan); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeFormat("epoch"); + Job.Builder job = new Job.Builder("forecast-it-test-memory-status"); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + + // Set the memory limit to 30MB + AnalysisLimits limits = new AnalysisLimits(30L, null); + job.setAnalysisLimits(limits); + + registerJob(job); + putJob(job); + openJob(job.getId()); + createDataWithLotsOfClientIps(bucketSpan, job); + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> forecast(job.getId(), TimeValue.timeValueMinutes(120), null)); + assertThat(e.getMessage(), equalTo("Cannot run forecast: Forecast cannot be executed as model memory status is not OK")); + } + + public void testMemoryLimit() throws Exception { + Detector.Builder detector = new Detector.Builder("mean", "value"); + detector.setByFieldName("clientIP"); + + TimeValue bucketSpan = TimeValue.timeValueHours(1); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); + analysisConfig.setBucketSpan(bucketSpan); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeFormat("epoch"); + Job.Builder job = new Job.Builder("forecast-it-test-memory-limit"); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + + registerJob(job); + putJob(job); + openJob(job.getId()); + createDataWithLotsOfClientIps(bucketSpan, job); + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> forecast(job.getId(), TimeValue.timeValueMinutes(120), null)); + assertThat(e.getMessage(), + equalTo("Cannot run forecast: Forecast cannot be executed as forecast memory usage is predicted to exceed 20MB")); + } + + private void createDataWithLotsOfClientIps(TimeValue bucketSpan, Job.Builder job) throws IOException { + long now = Instant.now().getEpochSecond(); + long timestamp = now - 50 * bucketSpan.seconds(); + while (timestamp < now) { + for (int i = 1; i < 256; i++) { + List data = new ArrayList<>(); + for (int j = 1; j < 100; j++) { + Map record = new HashMap<>(); + record.put("time", timestamp); + record.put("value", 10.0); + record.put("clientIP", String.format(Locale.ROOT, "192.168.%d.%d", i, j)); + data.add(createJsonRecord(record)); + } + postData(job.getId(), data.stream().collect(Collectors.joining())); + timestamp += bucketSpan.seconds(); + } + } + flushJob(job.getId(), false); + } + private static Map createRecord(long timestamp, double value) { Map record = new HashMap<>(); record.put("time", timestamp);