[ML] wait for validation of forecast request and send error in case ()

change the forecast API to wait for the validation and return an error if validation fails.

fixes 

Original commit: elastic/x-pack-elasticsearch@5c0553b284
This commit is contained in:
Hendrik Muhs 2017-12-27 11:25:26 +01:00 committed by GitHub
parent 31839b2748
commit f8d62d0ade
6 changed files with 193 additions and 11 deletions
plugin/src
main/java/org/elasticsearch/xpack/ml
test/java/org/elasticsearch/xpack/ml/integration

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
@ -30,13 +31,18 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats.ForecastRequestStatus;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.ml.action.ForecastJobAction.Request.DURATION;
@ -250,13 +256,15 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
public static class TransportAction extends TransportJobTaskAction<Request, Response> {
private final JobProvider jobProvider;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutodetectProcessManager processManager) {
JobProvider jobProvider, AutodetectProcessManager processManager) {
super(settings, ForecastJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, ForecastJobAction.Request::new, ForecastJobAction.Response::new, ThreadPool.Names.SAME,
processManager);
this.jobProvider = jobProvider;
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
}
@ -286,7 +294,26 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
ForecastParams params = paramsBuilder.build();
processManager.forecastJob(task, params, e -> {
if (e == null) {
listener.onResponse(new Response(true, params.getForecastId()));
Consumer<ForecastRequestStats> forecastRequestStatsHandler = forecastRequestStats -> {
if (forecastRequestStats == null) {
// paranoia case, it should not happen that we do not retrieve a result
listener.onFailure(new ElasticsearchException("Cannot run forecast: internal error, please check the logs"));
} else if (forecastRequestStats.getStatus() == ForecastRequestStatus.FAILED) {
List<String> messages = forecastRequestStats.getMessages();
if (messages.size() > 0) {
listener.onFailure(ExceptionsHelper.badRequestException("Cannot run forecast: " + messages.get(0)));
} else {
// paranoia case, it should not be possible to have an empty message list
listener.onFailure(
new ElasticsearchException("Cannot run forecast: internal error, please check the logs"));
}
} else {
listener.onResponse(new Response(true, params.getForecastId()));
}
};
jobProvider.getForecastRequestStats(request.getJobId(), params.getForecastId(), forecastRequestStatsHandler,
listener::onFailure);
} else {
listener.onFailure(e);
}

@ -50,6 +50,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
@ -85,6 +86,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.job.results.Result;
@ -475,6 +477,18 @@ public class JobProvider {
}
}
private <T, U> T parseGetHit(GetResponse getResponse, BiFunction<XContentParser, U, T> objectParser,
Consumer<Exception> errorHandler) {
BytesReference source = getResponse.getSourceAsBytesRef();
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, source)) {
return objectParser.apply(parser, null);
} catch (IOException e) {
errorHandler.accept(new ElasticsearchParseException("failed to parse " + getResponse.getType(), e));
return null;
}
}
public static IndicesOptions addIgnoreUnavailable(IndicesOptions indicesOptions) {
return IndicesOptions.fromOptions(true, indicesOptions.allowNoIndices(), indicesOptions.expandWildcardsOpen(),
indicesOptions.expandWildcardsClosed(), indicesOptions);
@ -904,6 +918,19 @@ public class JobProvider {
), client::search);
}
private <U, T> void getResult(String jobId, String resultDescription, GetRequest get, BiFunction<XContentParser, U, T> objectParser,
Consumer<Result<T>> handler, Consumer<Exception> errorHandler, Supplier<T> notFoundSupplier) {
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, get, ActionListener.<GetResponse>wrap(getDocResponse -> {
if (getDocResponse.isExists()) {
handler.accept(new Result<>(getDocResponse.getIndex(), parseGetHit(getDocResponse, objectParser, errorHandler)));
} else {
LOGGER.trace("No {} for job with id {}", resultDescription, jobId);
handler.accept(new Result<>(null, notFoundSupplier.get()));
}
}, errorHandler), client::get);
}
private SearchRequestBuilder createLatestModelSizeStatsSearch(String indexName) {
return client.prepareSearch(indexName)
.setSize(1)
@ -1043,6 +1070,16 @@ public class JobProvider {
, client::search);
}
public void getForecastRequestStats(String jobId, String forecastId, Consumer<ForecastRequestStats> handler,
Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
GetRequest getRequest = new GetRequest(indexName, ElasticsearchMappings.DOC_TYPE,
ForecastRequestStats.documentId(jobId, forecastId));
getResult(jobId, ForecastRequestStats.RESULTS_FIELD.getPreferredName(), getRequest, ForecastRequestStats.PARSER,
result -> handler.accept(result.result), errorHandler, () -> null);
}
public void updateCalendar(String calendarId, Set<String> jobIdsToAdd, Set<String> jobIdsToRemove,
Consumer<Calendar> handler, Consumer<Exception> errorHandler) {

@ -42,6 +42,7 @@ import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -238,10 +239,25 @@ public class AutodetectCommunicator implements Closeable {
}
public void forecastJob(ForecastParams params, BiConsumer<Void, Exception> handler) {
BiConsumer<Void, Exception> forecastConsumer = (aVoid, e) -> {
if (e == null) {
FlushJobParams flushParams = FlushJobParams.builder().build();
flushJob(flushParams, (flushAcknowledgement, flushException) -> {
if (flushException != null) {
String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", job.getId());
handler.accept(null, ExceptionsHelper.serverError(msg, e));
} else {
handler.accept(null, null);
}
});
} else {
handler.accept(null, e);
}
};
submitOperation(() -> {
autodetectProcess.forecastJob(params);
return null;
}, handler);
}, forecastConsumer);
}
@Nullable

@ -29,6 +29,7 @@ import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats.ForecastRequestStatus;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
@ -224,14 +225,18 @@ public class AutoDetectResultProcessor {
LOGGER.trace("Received Forecast Stats [{}]", forecastRequestStats.getId());
context.bulkResultsPersister.persistForecastRequestStats(forecastRequestStats);
double forecastProgress = forecastRequestStats.getProgress();
// persist if progress is 0 (probably some error condition) or 1 (finished),
// execute the bulk request only in some cases or in doubt
// otherwise rely on the count-based trigger
if (forecastProgress == 0.0 || forecastProgress >= 1.0) {
// if forecast stats progress is 1.0 it marks the end of a forecast,
// therefore commit whatever we have
context.bulkResultsPersister.executeRequest();
switch (forecastRequestStats.getStatus()) {
case OK:
case STARTED:
break;
case FAILED:
case SCHEDULED:
case FINISHED:
default:
context.bulkResultsPersister.executeRequest();
}
}
ModelSizeStats modelSizeStats = result.getModelSizeStats();

@ -198,6 +198,10 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
return forecastId;
}
public static String documentId(String jobId, String forecastId) {
return jobId + "_model_forecast_request_stats_" + forecastId;
}
/**
* Return the document ID used for indexing. As there is 1 and only 1 document
* per forecast request, the id has no dynamic parts.
@ -205,7 +209,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
* @return id
*/
public String getId() {
return jobId + "_model_forecast_request_stats_" + forecastId;
return documentId(jobId, forecastId);
}
public void setRecordCount(long recordCount) {

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
@ -16,11 +17,13 @@ import org.elasticsearch.xpack.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats;
import org.junit.After;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
@ -156,6 +159,96 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase {
equalTo("[duration] must be greater or equal to the bucket span: [10m/1h]"));
}
public void testNoData() throws Exception {
Detector.Builder detector = new Detector.Builder("mean", "value");
TimeValue bucketSpan = TimeValue.timeValueMinutes(1);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
analysisConfig.setBucketSpan(bucketSpan);
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeFormat("epoch");
Job.Builder job = new Job.Builder("forecast-it-test-no-data");
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(dataDescription);
registerJob(job);
putJob(job);
openJob(job.getId());
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> forecast(job.getId(), TimeValue.timeValueMinutes(120), null));
assertThat(e.getMessage(),
equalTo("Cannot run forecast: Forecast cannot be executed as job requires data to have been processed and modeled"));
}
public void testMemoryStatus() throws Exception {
Detector.Builder detector = new Detector.Builder("mean", "value");
detector.setByFieldName("clientIP");
TimeValue bucketSpan = TimeValue.timeValueHours(1);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
analysisConfig.setBucketSpan(bucketSpan);
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeFormat("epoch");
Job.Builder job = new Job.Builder("forecast-it-test-memory-status");
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(dataDescription);
// Set the memory limit to 30MB
AnalysisLimits limits = new AnalysisLimits(30L, null);
job.setAnalysisLimits(limits);
registerJob(job);
putJob(job);
openJob(job.getId());
createDataWithLotsOfClientIps(bucketSpan, job);
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> forecast(job.getId(), TimeValue.timeValueMinutes(120), null));
assertThat(e.getMessage(), equalTo("Cannot run forecast: Forecast cannot be executed as model memory status is not OK"));
}
public void testMemoryLimit() throws Exception {
Detector.Builder detector = new Detector.Builder("mean", "value");
detector.setByFieldName("clientIP");
TimeValue bucketSpan = TimeValue.timeValueHours(1);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
analysisConfig.setBucketSpan(bucketSpan);
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeFormat("epoch");
Job.Builder job = new Job.Builder("forecast-it-test-memory-limit");
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(dataDescription);
registerJob(job);
putJob(job);
openJob(job.getId());
createDataWithLotsOfClientIps(bucketSpan, job);
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> forecast(job.getId(), TimeValue.timeValueMinutes(120), null));
assertThat(e.getMessage(),
equalTo("Cannot run forecast: Forecast cannot be executed as forecast memory usage is predicted to exceed 20MB"));
}
private void createDataWithLotsOfClientIps(TimeValue bucketSpan, Job.Builder job) throws IOException {
long now = Instant.now().getEpochSecond();
long timestamp = now - 50 * bucketSpan.seconds();
while (timestamp < now) {
for (int i = 1; i < 256; i++) {
List<String> data = new ArrayList<>();
for (int j = 1; j < 100; j++) {
Map<String, Object> record = new HashMap<>();
record.put("time", timestamp);
record.put("value", 10.0);
record.put("clientIP", String.format(Locale.ROOT, "192.168.%d.%d", i, j));
data.add(createJsonRecord(record));
}
postData(job.getId(), data.stream().collect(Collectors.joining()));
timestamp += bucketSpan.seconds();
}
}
flushJob(job.getId(), false);
}
private static Map<String, Object> createRecord(long timestamp, double value) {
Map<String, Object> record = new HashMap<>();
record.put("time", timestamp);