diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 0389ceffbc3..2d3707e98cf 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -51,6 +51,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction; import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction; +import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; @@ -254,6 +255,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl UpdateProcessAction.INSTANCE, DeleteExpiredDataAction.INSTANCE, ForecastJobAction.INSTANCE, + DeleteForecastAction.INSTANCE, GetCalendarsAction.INSTANCE, PutCalendarAction.INSTANCE, DeleteCalendarAction.INSTANCE, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteForecastAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteForecastAction.java new file mode 100644 index 00000000000..b9f981ae980 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteForecastAction.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; + +import java.io.IOException; + +public class DeleteForecastAction extends Action { + + public static final DeleteForecastAction INSTANCE = new DeleteForecastAction(); + public static final String NAME = "cluster:admin/xpack/ml/job/forecast/delete"; + + private DeleteForecastAction() { + super(NAME); + } + + @Override + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + public static class Request extends AcknowledgedRequest { + + private String jobId; + private String forecastId; + private boolean allowNoForecasts = true; + + public Request() { + } + + public Request(String jobId, String forecastId) { + this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); + this.forecastId = ExceptionsHelper.requireNonNull(forecastId, ForecastRequestStats.FORECAST_ID.getPreferredName()); + } + + public String getJobId() { + return jobId; + } + + public String getForecastId() { + return forecastId; + } + + public boolean isAllowNoForecasts() { + return allowNoForecasts; + } + + public void setAllowNoForecasts(boolean allowNoForecasts) { + this.allowNoForecasts = allowNoForecasts; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + jobId = in.readString(); + forecastId = in.readString(); + allowNoForecasts = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(jobId); + out.writeString(forecastId); + out.writeBoolean(allowNoForecasts); + } + } + + public static class RequestBuilder extends ActionRequestBuilder { + + public RequestBuilder(ElasticsearchClient client, DeleteForecastAction action) { + super(client, action, new Request()); + } + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 7411115bda3..3c571c9d605 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -161,7 +161,9 @@ public final class Messages { public static final String REST_JOB_NOT_CLOSED_REVERT = "Can only revert to a model snapshot when the job is closed."; public static final String REST_NO_SUCH_MODEL_SNAPSHOT = "No model snapshot with id [{0}] exists for job [{1}]"; public static final String REST_START_AFTER_END = "Invalid time range: end time ''{0}'' is earlier than start time ''{1}''."; - + public static final String REST_NO_SUCH_FORECAST = "No forecast(s) [{0}] exists for job [{1}]"; + public static final String REST_CANNOT_DELETE_FORECAST_IN_CURRENT_STATE = + "Forecast(s) [{0}] for job [{1}] needs to be either FAILED or FINISHED to be deleted"; public static final String FIELD_CANNOT_BE_NULL = "Field [{0}] cannot be null"; private Messages() { diff --git a/x-pack/plugin/ml/qa/ml-with-security/build.gradle b/x-pack/plugin/ml/qa/ml-with-security/build.gradle index a702973fcb0..abfed3fd878 100644 --- a/x-pack/plugin/ml/qa/ml-with-security/build.gradle +++ b/x-pack/plugin/ml/qa/ml-with-security/build.gradle @@ -91,7 +91,9 @@ integTestRunner { 'ml/validate/Test invalid job config', 'ml/validate/Test job config is invalid because model snapshot id set', 'ml/validate/Test job config that is invalid only because of the job ID', - 'ml/validate_detector/Test invalid detector' + 'ml/validate_detector/Test invalid detector', + 'ml/delete_forecast/Test delete on _all forecasts not allow no forecasts', + 'ml/delete_forecast/Test delete forecast on missing forecast' ].join(',') } diff --git a/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/CloseJobsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/CloseJobsIT.java similarity index 100% rename from x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/CloseJobsIT.java rename to x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/CloseJobsIT.java diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java index 2f3ea6c83a5..2d8c6a4128b 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java @@ -7,7 +7,10 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -276,6 +279,104 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase { } + public void testDelete() throws Exception { + Detector.Builder detector = new Detector.Builder("mean", "value"); + + TimeValue bucketSpan = TimeValue.timeValueHours(1); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); + analysisConfig.setBucketSpan(bucketSpan); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeFormat("epoch"); + + Job.Builder job = new Job.Builder("forecast-it-test-delete"); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + + registerJob(job); + putJob(job); + openJob(job.getId()); + + long now = Instant.now().getEpochSecond(); + long timestamp = now - 50 * bucketSpan.seconds(); + List data = new ArrayList<>(); + while (timestamp < now) { + data.add(createJsonRecord(createRecord(timestamp, 10.0))); + data.add(createJsonRecord(createRecord(timestamp, 30.0))); + timestamp += bucketSpan.seconds(); + } + + postData(job.getId(), data.stream().collect(Collectors.joining())); + flushJob(job.getId(), false); + String forecastIdDefaultDurationDefaultExpiry = forecast(job.getId(), null, null); + String forecastIdDuration1HourNoExpiry = forecast(job.getId(), TimeValue.timeValueHours(1), TimeValue.ZERO); + waitForecastToFinish(job.getId(), forecastIdDefaultDurationDefaultExpiry); + waitForecastToFinish(job.getId(), forecastIdDuration1HourNoExpiry); + closeJob(job.getId()); + + { + ForecastRequestStats forecastStats = getForecastStats(job.getId(), forecastIdDefaultDurationDefaultExpiry); + assertNotNull(forecastStats); + ForecastRequestStats otherStats = getForecastStats(job.getId(), forecastIdDuration1HourNoExpiry); + assertNotNull(otherStats); + } + + { + DeleteForecastAction.Request request = new DeleteForecastAction.Request(job.getId(), + forecastIdDefaultDurationDefaultExpiry + "," + forecastIdDuration1HourNoExpiry); + AcknowledgedResponse response = client().execute(DeleteForecastAction.INSTANCE, request).actionGet(); + assertTrue(response.isAcknowledged()); + } + + { + ForecastRequestStats forecastStats = getForecastStats(job.getId(), forecastIdDefaultDurationDefaultExpiry); + assertNull(forecastStats); + ForecastRequestStats otherStats = getForecastStats(job.getId(), forecastIdDuration1HourNoExpiry); + assertNull(otherStats); + } + + { + DeleteForecastAction.Request request = new DeleteForecastAction.Request(job.getId(), "forecast-does-not-exist"); + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> client().execute(DeleteForecastAction.INSTANCE, request).actionGet()); + assertThat(e.getMessage(), + equalTo("No forecast(s) [forecast-does-not-exist] exists for job [forecast-it-test-delete]")); + } + + { + DeleteForecastAction.Request request = new DeleteForecastAction.Request(job.getId(), MetaData.ALL); + AcknowledgedResponse response = client().execute(DeleteForecastAction.INSTANCE, request).actionGet(); + assertTrue(response.isAcknowledged()); + } + + { + Job.Builder otherJob = new Job.Builder("forecasts-delete-with-all-and-allow-no-forecasts"); + otherJob.setAnalysisConfig(analysisConfig); + otherJob.setDataDescription(dataDescription); + + registerJob(otherJob); + putJob(otherJob); + DeleteForecastAction.Request request = new DeleteForecastAction.Request(otherJob.getId(), MetaData.ALL); + AcknowledgedResponse response = client().execute(DeleteForecastAction.INSTANCE, request).actionGet(); + assertTrue(response.isAcknowledged()); + } + + { + Job.Builder otherJob = new Job.Builder("forecasts-delete-with-all-and-not-allow-no-forecasts"); + otherJob.setAnalysisConfig(analysisConfig); + otherJob.setDataDescription(dataDescription); + + registerJob(otherJob); + putJob(otherJob); + + DeleteForecastAction.Request request = new DeleteForecastAction.Request(otherJob.getId(), MetaData.ALL); + request.setAllowNoForecasts(false); + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> client().execute(DeleteForecastAction.INSTANCE, request).actionGet()); + assertThat(e.getMessage(), + equalTo("No forecast(s) [_all] exists for job [forecasts-delete-with-all-and-not-allow-no-forecasts]")); + } + } + private void createDataWithLotsOfClientIps(TimeValue bucketSpan, Job.Builder job) throws IOException { long now = Instant.now().getEpochSecond(); long timestamp = now - 15 * bucketSpan.seconds(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index af7cb4242f1..343833c0806 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -62,6 +62,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction; import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction; +import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; @@ -114,6 +115,7 @@ import org.elasticsearch.xpack.ml.action.TransportDeleteCalendarEventAction; import org.elasticsearch.xpack.ml.action.TransportDeleteDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportDeleteExpiredDataAction; import org.elasticsearch.xpack.ml.action.TransportDeleteFilterAction; +import org.elasticsearch.xpack.ml.action.TransportDeleteForecastAction; import org.elasticsearch.xpack.ml.action.TransportDeleteJobAction; import org.elasticsearch.xpack.ml.action.TransportDeleteModelSnapshotAction; import org.elasticsearch.xpack.ml.action.TransportFinalizeJobExecutionAction; @@ -200,6 +202,7 @@ import org.elasticsearch.xpack.ml.rest.filter.RestGetFiltersAction; import org.elasticsearch.xpack.ml.rest.filter.RestPutFilterAction; import org.elasticsearch.xpack.ml.rest.filter.RestUpdateFilterAction; import org.elasticsearch.xpack.ml.rest.job.RestCloseJobAction; +import org.elasticsearch.xpack.ml.rest.job.RestDeleteForecastAction; import org.elasticsearch.xpack.ml.rest.job.RestDeleteJobAction; import org.elasticsearch.xpack.ml.rest.job.RestFlushJobAction; import org.elasticsearch.xpack.ml.rest.job.RestForecastJobAction; @@ -489,6 +492,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu new RestDeleteModelSnapshotAction(settings, restController), new RestDeleteExpiredDataAction(settings, restController), new RestForecastJobAction(settings, restController), + new RestDeleteForecastAction(settings, restController), new RestGetCalendarsAction(settings, restController), new RestPutCalendarAction(settings, restController), new RestDeleteCalendarAction(settings, restController), @@ -545,6 +549,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu new ActionHandler<>(UpdateProcessAction.INSTANCE, TransportUpdateProcessAction.class), new ActionHandler<>(DeleteExpiredDataAction.INSTANCE, TransportDeleteExpiredDataAction.class), new ActionHandler<>(ForecastJobAction.INSTANCE, TransportForecastJobAction.class), + new ActionHandler<>(DeleteForecastAction.INSTANCE, TransportDeleteForecastAction.class), new ActionHandler<>(GetCalendarsAction.INSTANCE, TransportGetCalendarsAction.class), new ActionHandler<>(PutCalendarAction.INSTANCE, TransportPutCalendarAction.class), new ActionHandler<>(DeleteCalendarAction.INSTANCE, TransportDeleteCalendarAction.class), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteForecastAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteForecastAction.java new file mode 100644 index 00000000000..cb2a4b47ba6 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteForecastAction.java @@ -0,0 +1,219 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.ScrollableHitSource; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.results.Forecast; +import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; +import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats.ForecastRequestStatus; +import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + + +public class TransportDeleteForecastAction extends HandledTransportAction { + + private final Client client; + private static final int MAX_FORECAST_TO_SEARCH = 10_000; + + private static final Set DELETABLE_STATUSES = + EnumSet.of(ForecastRequestStatus.FINISHED, ForecastRequestStatus.FAILED); + + @Inject + public TransportDeleteForecastAction(Settings settings, TransportService transportService, ActionFilters actionFilters, Client client) { + super(settings, DeleteForecastAction.NAME, transportService, actionFilters, DeleteForecastAction.Request::new); + this.client = client; + } + + @Override + protected void doExecute(Task task, DeleteForecastAction.Request request, ActionListener listener) { + final String jobId = request.getJobId(); + final String forecastsExpression = request.getForecastId(); + ActionListener forecastStatsHandler = ActionListener.wrap( + searchResponse -> deleteForecasts(searchResponse, request, listener), + e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e))); + + SearchSourceBuilder source = new SearchSourceBuilder(); + + BoolQueryBuilder builder = QueryBuilders.boolQuery(); + BoolQueryBuilder innerBool = QueryBuilders.boolQuery().must( + QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE)); + + if (MetaData.ALL.equals(request.getForecastId()) == false) { + Set forcastIds = new HashSet<>(Arrays.asList(Strings.tokenizeToStringArray(forecastsExpression, ","))); + innerBool.must(QueryBuilders.termsQuery(Forecast.FORECAST_ID.getPreferredName(), forcastIds)); + } + + source.query(builder.filter(innerBool)); + + SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)); + searchRequest.source(source); + + executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, forecastStatsHandler); + } + + private void deleteForecasts(SearchResponse searchResponse, + DeleteForecastAction.Request request, + ActionListener listener) { + final String jobId = request.getJobId(); + Set forecastsToDelete; + try { + forecastsToDelete = parseForecastsFromSearch(searchResponse); + } catch (IOException e) { + listener.onFailure(e); + return; + } + + if (forecastsToDelete.isEmpty()) { + if (MetaData.ALL.equals(request.getForecastId()) && + request.isAllowNoForecasts()) { + listener.onResponse(new AcknowledgedResponse(true)); + } else { + listener.onFailure( + new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_FORECAST, request.getForecastId(), jobId))); + } + return; + } + List badStatusForecasts = forecastsToDelete.stream() + .filter((f) -> !DELETABLE_STATUSES.contains(f.getStatus())) + .map(ForecastRequestStats::getForecastId).collect(Collectors.toList()); + if (badStatusForecasts.size() > 0) { + listener.onFailure( + ExceptionsHelper.conflictStatusException( + Messages.getMessage(Messages.REST_CANNOT_DELETE_FORECAST_IN_CURRENT_STATE, badStatusForecasts, jobId))); + return; + } + + final List forecastIds = forecastsToDelete.stream().map(ForecastRequestStats::getForecastId).collect(Collectors.toList()); + DeleteByQueryRequest deleteByQueryRequest = buildDeleteByQuery(jobId, forecastIds); + + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap( + response -> { + if (response.isTimedOut()) { + listener.onFailure( + new TimeoutException("Delete request timed out. Successfully deleted " + + response.getDeleted() + " forecast documents from job [" + jobId + "]")); + return; + } + if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) { + Tuple statusAndReason = getStatusAndReason(response); + listener.onFailure( + new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2())); + return; + } + logger.info("Deleted forecast(s) [{}] from job [{}]", forecastIds, jobId); + listener.onResponse(new AcknowledgedResponse(true)); + }, + listener::onFailure)); + } + + private static Tuple getStatusAndReason(final BulkByScrollResponse response) { + RestStatus status = RestStatus.OK; + Throwable reason = new Exception("Unknown error"); + //Getting the max RestStatus is sort of arbitrary, would the user care about 5xx over 4xx? + //Unsure of a better way to return an appropriate and possibly actionable cause to the user. + for (BulkItemResponse.Failure failure : response.getBulkFailures()) { + if (failure.getStatus().getStatus() > status.getStatus()) { + status = failure.getStatus(); + reason = failure.getCause(); + } + } + + for (ScrollableHitSource.SearchFailure failure : response.getSearchFailures()) { + RestStatus failureStatus = org.elasticsearch.ExceptionsHelper.status(failure.getReason()); + if (failureStatus.getStatus() > status.getStatus()) { + status = failureStatus; + reason = failure.getReason(); + } + } + return new Tuple<>(status, reason); + } + + private static Set parseForecastsFromSearch(SearchResponse searchResponse) throws IOException { + SearchHits hits = searchResponse.getHits(); + List allStats = new ArrayList<>(hits.getHits().length); + for (SearchHit hit : hits) { + try (InputStream stream = hit.getSourceRef().streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream)) { + allStats.add(ForecastRequestStats.STRICT_PARSER.apply(parser, null)); + } + } + return new HashSet<>(allStats); + } + + private DeleteByQueryRequest buildDeleteByQuery(String jobId, List forecastsToDelete) { + SearchRequest searchRequest = new SearchRequest(); + // We need to create the DeleteByQueryRequest before we modify the SearchRequest + // because the constructor of the former wipes the latter + DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest) + .setAbortOnVersionConflict(false) //since these documents are not updated, a conflict just means it was deleted previously + .setSize(MAX_FORECAST_TO_SEARCH) + .setSlices(5); + + searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)); + BoolQueryBuilder innerBoolQuery = QueryBuilders.boolQuery(); + innerBoolQuery + .must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), + ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE)) + .must(QueryBuilders.termsQuery(Forecast.FORECAST_ID.getPreferredName(), + forecastsToDelete)); + + QueryBuilder query = QueryBuilders.boolQuery().filter(innerBoolQuery); + searchRequest.source(new SearchSourceBuilder().query(query)); + return request; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestDeleteForecastAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestDeleteForecastAction.java new file mode 100644 index 00000000000..e42a73204eb --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestDeleteForecastAction.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.rest.job; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.results.Forecast; +import org.elasticsearch.xpack.ml.MachineLearning; + +import java.io.IOException; + +public class RestDeleteForecastAction extends BaseRestHandler { + + public RestDeleteForecastAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.DELETE, + MachineLearning.BASE_PATH + + "anomaly_detectors/{" + Job.ID.getPreferredName() + + "}/_forecast/{" + Forecast.FORECAST_ID.getPreferredName() + "}", + this); + } + + @Override + public String getName() { + return "xpack_ml_delete_forecast_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + String jobId = restRequest.param(Job.ID.getPreferredName()); + String forecastId = restRequest.param(Forecast.FORECAST_ID.getPreferredName(), MetaData.ALL); + final DeleteForecastAction.Request request = new DeleteForecastAction.Request(jobId, forecastId); + request.timeout(restRequest.paramAsTime("timeout", request.timeout())); + request.setAllowNoForecasts(restRequest.paramAsBoolean("allow_no_forecasts", request.isAllowNoForecasts())); + return channel -> client.execute(DeleteForecastAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_forecast.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_forecast.json new file mode 100644 index 00000000000..78040351d35 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_forecast.json @@ -0,0 +1,38 @@ +{ + "xpack.ml.delete_forecast": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-delete-forecast.html", + "methods": [ "DELETE" ], + "url": { + "path": "/_xpack/ml/anomaly_detectors/{job_id}/_forecast/{forecast_id}", + "paths": [ + "/_xpack/ml/anomaly_detectors/{job_id}/_forecast", + "/_xpack/ml/anomaly_detectors/{job_id}/_forecast/{forecast_id}" + ], + "parts": { + "job_id": { + "type": "string", + "required": true, + "description": "The ID of the job from which to delete forecasts" + }, + "forecast_id": { + "type": "string", + "required": false, + "description": "The ID of the forecast to delete, can be comma delimited list. Leaving blank implies `_all`" + } + }, + "params": { + "allow_no_forecasts": { + "type": "boolean", + "required": false, + "description": "Whether to ignore if `_all` matches no forecasts" + }, + "timeout": { + "type": "time", + "requred": false, + "description": "Controls the time to wait until the forecast(s) are deleted. Default to 30 seconds" + } + } + }, + "body": null + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_forecast.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_forecast.yml new file mode 100644 index 00000000000..667f80410e0 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_forecast.yml @@ -0,0 +1,143 @@ +setup: + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + xpack.ml.put_job: + job_id: delete-forecast-job + body: > + { + "description":"A forecast job", + "analysis_config" : { + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}], + "bucket_span" : "1s" + }, + "data_description" : { + "format":"xcontent" + } + } + +--- +"Test delete forecast on missing forecast": + - do: + catch: /resource_not_found_exception/ + xpack.ml.delete_forecast: + job_id: delete-forecast-job + forecast_id: this-is-a-bad-forecast + +--- +"Test delete forecast": + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + Content-Type: application/json + index: + index: .ml-anomalies-shared + type: doc + id: "delete-forecast-job_model_forecast_someforecastid_1486591200000_1800_0_961_0" + body: + { + "job_id": "delete-forecast-job", + "forecast_id": "someforecastid", + "result_type": "model_forecast", + "bucket_span": 1800, + "detector_index": 0, + "timestamp": 1486591200000, + "model_feature": "'arithmetic mean value by person'", + "forecast_lower": 5440.502250736747, + "forecast_upper": 6294.296972680027, + "forecast_prediction": 5867.399611708387 + } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + Content-Type: application/json + index: + index: .ml-anomalies-shared + type: doc + id: "delete-forecast-job_model_forecast_someforecastid_1486591300000_1800_0_961_0" + body: + { + "job_id": "delete-forecast-job", + "forecast_id": "someforecastid", + "result_type": "model_forecast", + "bucket_span": 1800, + "detector_index": 0, + "timestamp": 1486591300000, + "model_feature": "'arithmetic mean value by person'", + "forecast_lower": 5440.502250736747, + "forecast_upper": 6294.296972680027, + "forecast_prediction": 5867.399611708387 + } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + Content-Type: application/json + index: + index: .ml-anomalies-shared + type: doc + id: "delete-forecast-job_model_forecast_request_stats_someforecastid" + body: + { + "job_id": "delete-forecast-job", + "result_type": "model_forecast_request_stats", + "forecast_id": "someforecastid", + "processed_record_count": 48, + "forecast_messages": [], + "timestamp": 1486575000000, + "forecast_start_timestamp": 1486575000000, + "forecast_end_timestamp": 1486661400000, + "forecast_create_timestamp": 1535721789000, + "forecast_expiry_timestamp": 1536931389000, + "forecast_progress": 1, + "processing_time_ms": 3, + "forecast_memory_bytes": 7034, + "forecast_status": "finished" + } + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + indices.refresh: + index: .ml-anomalies-delete-forecast-job + - do: + xpack.ml.delete_forecast: + job_id: delete-forecast-job + forecast_id: someforecastid + - match: { acknowledged: true } + - do: + catch: missing + get: + id: delete-forecast-job_model_forecast_request_stats_someforecastid + index: .ml-anomalies-shared + type: doc + - do: + catch: missing + get: + id: delete-forecast-job_model_forecast_someforecastid_1486591300000_1800_0_961_0 + index: .ml-anomalies-shared + type: doc + - do: + catch: missing + get: + id: delete-forecast-job_model_forecast_someforecastid_1486591200000_1800_0_961_0 + index: .ml-anomalies-shared + type: doc + +--- +"Test delete on _all forecasts not allow no forecasts": + - do: + catch: /resource_not_found_exception/ + xpack.ml.delete_forecast: + job_id: delete-forecast-job + forecast_id: _all + allow_no_forecasts: false + +--- +"Test delete on _all forecasts": + - do: + xpack.ml.delete_forecast: + job_id: delete-forecast-job + forecast_id: _all + allow_no_forecasts: true + - match: { acknowledged: true }