From c7fd1aacffee7ac65f9f2f348add8a764b1446bf Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 7 Apr 2017 14:51:13 +0200 Subject: [PATCH] [ML] implement _all for closing jobs (elastic/x-pack-elasticsearch#962) Add a '_all' functionality for closing ML jobs. For cluster shutdown due to maintenance and major upgrades we recommend the user to stop all datafeeds and jobs. This change add the ability to close all jobs at once where previously it was required to iterate over all jobs and do a explicit close. This is part one of elastic/x-pack-elasticsearch#795, part two can be found in elastic/x-pack-elasticsearch#995. relates elastic/x-pack-elasticsearch#795 Original commit: elastic/x-pack-elasticsearch@9b251ed7e1686ba53c49d423ccd4e2c3122d5015 --- .../xpack/ml/action/CloseJobAction.java | 270 +++++++++++++++--- .../ml/action/FinalizeJobExecutionAction.java | 31 +- .../ml/action/TransportJobTaskAction.java | 62 +++- .../xpack/ml/utils/MlStrings.java | 4 +- .../ml/action/CloseJobActionRequestTests.java | 38 +++ .../integration/MlRestTestStateCleaner.java | 45 +-- .../xpack/ml/support/BaseMlIntegTestCase.java | 40 ++- .../xpack/ml/utils/MlStringsTests.java | 1 + .../integration/MlRestTestStateCleaner.java | 45 +-- 9 files changed, 409 insertions(+), 127 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index ddad90d4ff4..49e58a6235f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -5,15 +5,19 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; @@ -23,11 +27,13 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -36,18 +42,27 @@ import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksService; -import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.stream.Collectors; public class CloseJobAction extends Action { @@ -245,19 +260,34 @@ public class CloseJobAction extends Action listener) { + /* + * Closing of multiple jobs: + * + * 1. Resolve and validate jobs first: if any job does not meet the + * criteria (e.g. open datafeed), fail immediately, do not close any + * job + * + * 2. Internally a task request is created for every job, so there + * are n inner tasks for 1 user request + * + * 3. Collect n inner task results or failures and send 1 outer + * result/failure + */ + ClusterState currentState = clusterService.state(); + List resolvedJobs = resolveAndValidateJobId(request.getJobId(), currentState); + + if (resolvedJobs.isEmpty()) { + listener.onResponse(new Response(true)); + return; + } + + request.setResolvedJobIds(resolvedJobs.toArray(new String[0])); + if (request.isForce()) { - PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - String jobId = request.getJobId(); - PersistentTask jobTask = MlMetadata.getJobTask(jobId, tasks); - if (jobTask == null) { - throw new ElasticsearchStatusException("cannot force close job, because job [" + jobId + "] is not open", - RestStatus.CONFLICT); - } - forceCloseJob(jobTask.getId(), jobId, listener); + forceCloseJob(currentState, request, listener); } else { - PersistentTask jobTask = validateAndReturnJobTask(request.getJobId(), currentState); - normalCloseJob(task, jobTask.getId(), request, listener); + normalCloseJob(currentState, task, request, listener); } } @@ -268,46 +298,139 @@ public class CloseJobAction extends Action tasks, + List taskOperationFailures, + List failedNodeExceptions) { + + // number of resolved jobs should be equal to the number of tasks, + // otherwise something went wrong + if (request.getResolvedJobIds().length != tasks.size()) { + if (taskOperationFailures.isEmpty() == false) { + throw org.elasticsearch.ExceptionsHelper + .convertToElastic(taskOperationFailures.get(0).getCause()); + } else if (failedNodeExceptions.isEmpty() == false) { + throw org.elasticsearch.ExceptionsHelper + .convertToElastic(failedNodeExceptions.get(0)); + } else { + throw new IllegalStateException( + "Expected [" + request.getResolvedJobIds().length + + "] number of tasks but " + "got [" + tasks.size() + "]"); + } + } + + return new Response(tasks.stream().allMatch(Response::isClosed)); + } + @Override protected Response readTaskResponse(StreamInput in) throws IOException { return new Response(in); } - private void forceCloseJob(long persistentTaskId, String jobId, ActionListener listener) { - auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING); - persistentTasksService.cancelPersistentTask(persistentTaskId, new ActionListener>() { - @Override - public void onResponse(PersistentTask task) { - listener.onResponse(new Response(true)); - } + private void forceCloseJob(ClusterState currentState, Request request, + ActionListener listener) { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + String[] jobIds = request.getResolvedJobIds(); + final int numberOfJobs = jobIds.length; + final AtomicInteger counter = new AtomicInteger(); + final AtomicArray failures = new AtomicArray<>(jobIds.length); + + for (String jobId : jobIds) { + auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING); + PersistentTask jobTask = validateAndReturnJobTask(jobId, currentState); + persistentTasksService.cancelPersistentTask(jobTask.getId(), + new ActionListener>() { + @Override + public void onResponse(PersistentTask task) { + if (counter.incrementAndGet() == numberOfJobs) { + sendResponseOrFailure(request.getJobId(), listener, failures); + } + } + + @Override + public void onFailure(Exception e) { + final int slot = counter.incrementAndGet(); + failures.set(slot - 1, e); + if (slot == numberOfJobs) { + sendResponseOrFailure(request.getJobId(), listener, failures); + } + } + + private void sendResponseOrFailure(String jobId, + ActionListener listener, + AtomicArray failures) { + List catchedExceptions = failures.asList(); + if (catchedExceptions.size() == 0) { + listener.onResponse(new Response(true)); + return; + } + + String msg = "Failed to force close job [" + jobId + "] with [" + + catchedExceptions.size() + + "] failures, rethrowing last, all Exceptions: [" + + catchedExceptions.stream().map(Exception::getMessage) + .collect(Collectors.joining(", ")) + + "]"; + + ElasticsearchException e = new ElasticsearchException(msg, + catchedExceptions.get(0)); + listener.onFailure(e); + } + }); + } } - private void normalCloseJob(Task task, long persistentTaskId, Request request, ActionListener listener) { - auditor.info(request.getJobId(), Messages.JOB_AUDIT_CLOSING); + private void normalCloseJob(ClusterState currentState, Task task, Request request, + ActionListener listener) { + Map jobIdToPersistentTaskId = new HashMap<>(); + + for (String jobId : request.getResolvedJobIds()) { + auditor.info(jobId, Messages.JOB_AUDIT_CLOSING); + PersistentTask jobTask = validateAndReturnJobTask(jobId, currentState); + jobIdToPersistentTaskId.put(jobId, jobTask.getId()); + } + ActionListener finalListener = - ActionListener.wrap(r -> waitForJobClosed(persistentTaskId, request, r, listener), listener::onFailure); + ActionListener.wrap( + r -> waitForJobClosed(request, jobIdToPersistentTaskId, + r, listener), + listener::onFailure); super.doExecute(task, request, finalListener); } // Wait for job to be marked as closed in cluster state, which means the job persistent task has been removed // This api returns when job has been closed, but that doesn't mean the persistent task has been removed from cluster state, // so wait for that to happen here. - void waitForJobClosed(long persistentTaskId, Request request, Response response, ActionListener listener) { - persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.timeout, - new WaitForPersistentTaskStatusListener() { + void waitForJobClosed(Request request, Map jobIdToPersistentTaskId, Response response, + ActionListener listener) { + ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, + request.timeout, logger, threadPool.getThreadContext()); + waitForPersistentTaskStatus(stateObserver, persistentTasksCustomMetaData -> { + for (Map.Entry entry : jobIdToPersistentTaskId.entrySet()) { + long persistentTaskId = entry.getValue(); + if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { + return false; + } + } + return true; + }, new ActionListener() { @Override - public void onResponse(PersistentTask task) { - logger.debug("finalizing job [{}]", request.getJobId()); - FinalizeJobExecutionAction.Request finalizeRequest = - new FinalizeJobExecutionAction.Request(request.getJobId()); + public void onResponse(Boolean result) { + Set jobIds = jobIdToPersistentTaskId.keySet(); + FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request( + jobIds.toArray(new String[jobIds.size()])); client.execute(FinalizeJobExecutionAction.INSTANCE, finalizeRequest, - ActionListener.wrap(r-> listener.onResponse(response), listener::onFailure)); + new ActionListener() { + @Override + public void onResponse(FinalizeJobExecutionAction.Response r) { + listener.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } @Override @@ -317,6 +440,74 @@ public class CloseJobAction extends Action predicate, + ActionListener listener) { + if (predicate.test(stateObserver.setAndGetObservedState().metaData() + .custom(PersistentTasksCustomMetaData.TYPE))) { + listener.onResponse(true); + } else { + stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + listener.onResponse(true); + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + listener.onFailure(new IllegalStateException("timed out after " + timeout)); + } + }, clusterState -> predicate + .test(clusterState.metaData() + .custom(PersistentTasksCustomMetaData.TYPE))); + } + } + } + + static List resolveAndValidateJobId(String jobId, ClusterState state) { + MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); + PersistentTasksCustomMetaData tasks = state.getMetaData() + .custom(PersistentTasksCustomMetaData.TYPE); + + if (!Job.ALL.equals(jobId)) { + validateAndReturnJobTask(jobId, state); + return Collections.singletonList(jobId); + } + + if (mlMetadata.getJobs().isEmpty()) { + return Collections.emptyList(); + } + + List matchedJobs = new ArrayList<>(); + + for (Map.Entry jobEntry : mlMetadata.getJobs().entrySet()) { + String resolvedJobId = jobEntry.getKey(); + Job job = jobEntry.getValue(); + + if (job.isDeleted()) { + continue; + } + + PersistentTask jobTask = MlMetadata.getJobTask(resolvedJobId, tasks); + if (jobTask == null) { + continue; + } + + if (MlMetadata.getJobState(resolvedJobId, tasks) == JobState.CLOSED) { + continue; + } + + validateAndReturnJobTask(resolvedJobId, state); + + matchedJobs.add(resolvedJobId); + } + + return matchedJobs; } static PersistentTask validateAndReturnJobTask(String jobId, ClusterState state) { @@ -326,7 +517,8 @@ public class CloseJobAction extends Action jobTask = MlMetadata.getJobTask(jobId, tasks); if (jobTask == null) { throw new ElasticsearchStatusException("cannot close job, because job [" + jobId + "] is not open", RestStatus.CONFLICT); @@ -334,10 +526,12 @@ public class CloseJobAction extends Action datafeed = mlMetadata.getDatafeedByJobId(jobId); if (datafeed.isPresent()) { - DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(), tasks); + DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(), + tasks); if (datafeedState != DatafeedState.STOPPED) { - throw new ElasticsearchStatusException("cannot close job [{}], datafeed hasn't been stopped", - RestStatus.CONFLICT, jobId); + throw new ElasticsearchStatusException( + "cannot close job [{}], datafeed hasn't been stopped", RestStatus.CONFLICT, + jobId); } } return jobTask; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FinalizeJobExecutionAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FinalizeJobExecutionAction.java index c547191fcec..cc7a574d0de 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FinalizeJobExecutionAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FinalizeJobExecutionAction.java @@ -55,29 +55,29 @@ public class FinalizeJobExecutionAction extends Action { - private String jobId; + private String[] jobIds; - public Request(String jobId) { - this.jobId = jobId; + public Request(String[] jobIds) { + this.jobIds = jobIds; } Request() { } - public String getJobId() { - return jobId; + public String[] getJobIds() { + return jobIds; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - jobId = in.readString(); + jobIds = in.readStringArray(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(jobId); + out.writeStringArray(jobIds); } @Override @@ -140,18 +140,21 @@ public class FinalizeJobExecutionAction extends Action listener) throws Exception { - String jobId = request.getJobId(); - String source = "finalize_job_execution [" + jobId + "]"; - logger.debug("finalizing job [{}]", request.getJobId()); + String jobIdString = String.join(",", request.getJobIds()); + String source = "finalize_job_execution [" + jobIdString + "]"; + logger.debug("finalizing jobs [{}]", jobIdString); clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { MlMetadata mlMetadata = currentState.metaData().custom(MlMetadata.TYPE); - Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); - jobBuilder.setFinishedTime(new Date()); MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); - mlMetadataBuilder.putJob(jobBuilder.build(), true); + Date finishedTime = new Date(); + for (String jobId : request.getJobIds()) { + Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); + jobBuilder.setFinishedTime(finishedTime); + mlMetadataBuilder.putJob(jobBuilder.build(), true); + } ClusterState.Builder builder = ClusterState.builder(currentState); return builder.metaData(new MetaData.Builder(currentState.metaData()) .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) @@ -166,7 +169,7 @@ public class FinalizeJobExecutionAction extends Action, +public abstract class TransportJobTaskAction, Response extends BaseTasksResponse & Writeable> extends TransportTasksAction { protected final AutodetectProcessManager processManager; @@ -55,27 +57,36 @@ public abstract class TransportJobTaskAction listener) { - String jobId = request.getJobId(); + ClusterState state = clusterService.state(); // We need to check whether there is at least an assigned task here, otherwise we cannot redirect to the // node running the job task. - ClusterState state = clusterService.state(); - JobManager.getJobOrThrowIfUnknown(state, jobId); - PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - PersistentTasksCustomMetaData.PersistentTask jobTask = MlMetadata.getJobTask(jobId, tasks); - if (jobTask == null || jobTask.isAssigned() == false) { - String message = "Cannot perform requested action because job [" + jobId + "] is not open"; - listener.onFailure(ExceptionsHelper.conflictStatusException(message)); - } else { - request.setNodes(jobTask.getExecutorNode()); - super.doExecute(task, request, listener); + Set executorNodes = new HashSet<>(); + + for (String resolvedJobId : request.getResolvedJobIds()) { + JobManager.getJobOrThrowIfUnknown(state, resolvedJobId); + PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData() + .custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData.PersistentTask jobTask = MlMetadata + .getJobTask(resolvedJobId, tasks); + + if (jobTask == null || jobTask.isAssigned() == false) { + String message = "Cannot perform requested action because job [" + resolvedJobId + + "] is not open"; + listener.onFailure(ExceptionsHelper.conflictStatusException(message)); + } else { + executorNodes.add(jobTask.getExecutorNode()); + } } + + request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); + super.doExecute(task, request, listener); } @Override protected final void taskOperation(Request request, OperationTask task, ActionListener listener) { ClusterState state = clusterService.state(); PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); - JobState jobState = MlMetadata.getJobState(request.getJobId(), tasks); + JobState jobState = MlMetadata.getJobState(task.getJobId(), tasks); if (jobState == JobState.OPENED) { innerTaskOperation(request, task, listener, state); } else { @@ -110,7 +121,8 @@ public abstract class TransportJobTaskAction 1) { - throw new IllegalStateException("Expected one node level response, but got [" + tasks.size() + "]"); + throw new IllegalStateException( + "Expected one node level response, but got [" + tasks.size() + "]"); } return tasks.get(0); } @@ -124,33 +136,53 @@ public abstract class TransportJobTaskAction> extends BaseTasksRequest { String jobId; + String[] resolvedJobIds; JobTaskRequest() { } JobTaskRequest(String jobId) { this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); + + // the default implementation just returns 1 jobId + this.resolvedJobIds = new String[] { jobId }; } public String getJobId() { return jobId; } + protected String[] getResolvedJobIds() { + return resolvedJobIds; + } + + protected void setResolvedJobIds(String[] resolvedJobIds) { + this.resolvedJobIds = resolvedJobIds; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); jobId = in.readString(); + resolvedJobIds = in.readStringArray(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(jobId); + out.writeStringArray(resolvedJobIds); } @Override public boolean match(Task task) { - return OpenJobAction.JobTask.match(task, jobId); + for (String id : resolvedJobIds) { + if (OpenJobAction.JobTask.match(task, id)) { + return true; + } + } + + return false; } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/MlStrings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/MlStrings.java index 503d81aedc6..ad6bea7c4c7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/MlStrings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/MlStrings.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.utils; +import org.elasticsearch.xpack.ml.job.config.Job; + import java.util.regex.Pattern; /** @@ -56,6 +58,6 @@ public final class MlStrings { } public static boolean isValidId(String id) { - return id != null && VALID_ID_CHAR_PATTERN.matcher(id).matches(); + return id != null && VALID_ID_CHAR_PATTERN.matcher(id).matches() && !Job.ALL.equals(id); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java index 321d1c27907..24e6bed6e91 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -84,6 +85,43 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa CloseJobAction.validateAndReturnJobTask("job_id", cs2); } + public void testResolve() { + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()), + false); + mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_1", "job_id_1", + Collections.singletonList("*"))); + + mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()), + false); + mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_2", "job_id_2", + Collections.singletonList("*"))); + + mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()), + false); + mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_3", "job_id_3", + Collections.singletonList("*"))); + + Map> tasks = new HashMap<>(); + PersistentTask jobTask = createJobTask(1L, "job_id_1", null, JobState.OPENED); + tasks.put(1L, jobTask); + + jobTask = createJobTask(2L, "job_id_2", null, JobState.CLOSED); + tasks.put(2L, jobTask); + + jobTask = createJobTask(3L, "job_id_3", null, JobState.FAILED); + tasks.put(3L, jobTask); + + ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, + new PersistentTasksCustomMetaData(1L, tasks))) + .build(); + + assertEquals(Arrays.asList("job_id_1", "job_id_3"), + CloseJobAction.resolveAndValidateJobId("_all", cs1)); + } + public static PersistentTask createTask(long id, String datafeedId, long startTime, diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index a085c629fd5..7232cd76b02 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -15,6 +15,13 @@ import java.util.Collections; import java.util.List; import java.util.Map; +/* + * NOTE: a copy if this file resides in :x-pack-elasticsearch:qa:smoke-test-ml-with-security + * + * Therefore any changes here, have to be transfered there, too. + * (Or eventually fix it by introducing a common test infrastructure package) + */ + public class MlRestTestStateCleaner { private final Logger logger; @@ -75,27 +82,27 @@ public class MlRestTestStateCleaner { return; } + try { + int statusCode = adminClient + .performRequest("POST", "/_xpack/ml/anomaly_detectors/_all/_close") + .getStatusLine().getStatusCode(); + if (statusCode != 200) { + logger.error("Got status code " + statusCode + " when closing all jobs"); + } + } catch (Exception e1) { + logger.warn("failed to close all jobs. Forcing closed", e1); + try { + adminClient.performRequest("POST", + "/_xpack/ml/anomaly_detectors/_all/_close?force=true"); + } catch (Exception e2) { + logger.warn("Force-closing all jobs failed", e2); + } + throw new RuntimeException("Had to resort to force-closing jobs, something went wrong?", + e1); + } + for (Map jobConfig : jobConfigs) { String jobId = (String) jobConfig.get("job_id"); - try { - int statusCode = adminClient.performRequest("POST", - "/_xpack/ml/anomaly_detectors/" + jobId + "/_close").getStatusLine().getStatusCode(); - if (statusCode != 200) { - logger.error("Got status code " + statusCode + " when closing job " + jobId); - } - } catch (Exception e1) { - if (e1.getMessage().contains("because job [" + jobId + "] is not open")) { - logger.debug("job [" + jobId + "] has already been closed", e1); - } else { - logger.warn("failed to close job [" + jobId + "]. Forcing closed", e1); - try { - adminClient.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close?force=true"); - } catch (Exception e2) { - logger.warn("Force-closing job [" + jobId + "] failed", e2); - } - throw new RuntimeException("Had to resort to force-closing job, something went wrong?", e1); - } - } int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/anomaly_detectors/" + jobId).getStatusLine().getStatusCode(); if (statusCode != 200) { logger.error("Got status code " + statusCode + " when deleting job " + jobId); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 6b6e061b1cd..6daf2bdcd5a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -45,7 +45,6 @@ import org.junit.Before; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -290,32 +289,31 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { public static void deleteAllJobs(Logger logger, Client client) throws Exception { MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData(); MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE); - for (Map.Entry entry : mlMetadata.getJobs().entrySet()) { - String jobId = entry.getKey(); + + try { + CloseJobAction.Request closeRequest = new CloseJobAction.Request(Job.ALL); + closeRequest.setCloseTimeout(TimeValue.timeValueSeconds(20L)); + logger.info("Closing jobs using [{}]", Job.ALL); + CloseJobAction.Response response = client.execute(CloseJobAction.INSTANCE, closeRequest) + .get(); + assertTrue(response.isClosed()); + } catch (Exception e1) { try { - CloseJobAction.Request closeRequest = new CloseJobAction.Request(jobId); + CloseJobAction.Request closeRequest = new CloseJobAction.Request(Job.ALL); + closeRequest.setForce(true); closeRequest.setCloseTimeout(TimeValue.timeValueSeconds(20L)); - logger.info("Closing job [{}]", jobId); CloseJobAction.Response response = client.execute(CloseJobAction.INSTANCE, closeRequest).get(); assertTrue(response.isClosed()); - } catch (Exception e1) { - if (e1.getMessage().contains("because job [" + jobId + "] is not open")) { - logger.debug("job [" + jobId + "] has already been closed", e1); - } else { - try { - CloseJobAction.Request closeRequest = new CloseJobAction.Request(jobId); - closeRequest.setForce(true); - closeRequest.setCloseTimeout(TimeValue.timeValueSeconds(20L)); - CloseJobAction.Response response = - client.execute(CloseJobAction.INSTANCE, closeRequest).get(); - assertTrue(response.isClosed()); - } catch (Exception e2) { - logger.warn("Force-closing datafeed [" + jobId + "] failed.", e2); - } - throw new RuntimeException("Had to resort to force-closing job, something went wrong?", e1); - } + } catch (Exception e2) { + logger.warn("Force-closing jobs failed.", e2); } + throw new RuntimeException("Had to resort to force-closing job, something went wrong?", + e1); + } + + for (Map.Entry entry : mlMetadata.getJobs().entrySet()) { + String jobId = entry.getKey(); assertBusy(() -> { GetJobsStatsAction.Response statsResponse = client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(jobId)).actionGet(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/utils/MlStringsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/utils/MlStringsTests.java index 74f59c35d94..096caf84541 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/utils/MlStringsTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/utils/MlStringsTests.java @@ -29,5 +29,6 @@ public class MlStringsTests extends ESTestCase { assertThat(MlStrings.isValidId("_-.a1"), is(false)); assertThat(MlStrings.isValidId("A"), is(false)); assertThat(MlStrings.isValidId("!afafd"), is(false)); + assertThat(MlStrings.isValidId("_all"), is(false)); } } diff --git a/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index a085c629fd5..f15f407f97b 100644 --- a/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -15,6 +15,13 @@ import java.util.Collections; import java.util.List; import java.util.Map; +/* + * NOTE: a copy if this file resides in :x-pack-elasticsearch:plugin + * + * Therefore any changes here, have to be transfered there, too. + * (Or eventually fix it by introducing a common test infrastructure package) + */ + public class MlRestTestStateCleaner { private final Logger logger; @@ -75,27 +82,27 @@ public class MlRestTestStateCleaner { return; } + try { + int statusCode = adminClient + .performRequest("POST", "/_xpack/ml/anomaly_detectors/_all/_close") + .getStatusLine().getStatusCode(); + if (statusCode != 200) { + logger.error("Got status code " + statusCode + " when closing all jobs"); + } + } catch (Exception e1) { + logger.warn("failed to close all jobs. Forcing closed", e1); + try { + adminClient.performRequest("POST", + "/_xpack/ml/anomaly_detectors/_all/_close?force=true"); + } catch (Exception e2) { + logger.warn("Force-closing all jobs failed", e2); + } + throw new RuntimeException("Had to resort to force-closing jobs, something went wrong?", + e1); + } + for (Map jobConfig : jobConfigs) { String jobId = (String) jobConfig.get("job_id"); - try { - int statusCode = adminClient.performRequest("POST", - "/_xpack/ml/anomaly_detectors/" + jobId + "/_close").getStatusLine().getStatusCode(); - if (statusCode != 200) { - logger.error("Got status code " + statusCode + " when closing job " + jobId); - } - } catch (Exception e1) { - if (e1.getMessage().contains("because job [" + jobId + "] is not open")) { - logger.debug("job [" + jobId + "] has already been closed", e1); - } else { - logger.warn("failed to close job [" + jobId + "]. Forcing closed", e1); - try { - adminClient.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close?force=true"); - } catch (Exception e2) { - logger.warn("Force-closing job [" + jobId + "] failed", e2); - } - throw new RuntimeException("Had to resort to force-closing job, something went wrong?", e1); - } - } int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/anomaly_detectors/" + jobId).getStatusLine().getStatusCode(); if (statusCode != 200) { logger.error("Got status code " + statusCode + " when deleting job " + jobId);