[ML] implement '_all' for stopping datafeeds (elastic/x-pack-elasticsearch#995)

Add a '_all' functionality for stopping ML datafeeds.

For cluster shutdown due to maintenance and major upgrades we recommend the user to stop all datafeeds and jobs. This change add the ability to stop all datafeeds at once where previously it was required to iterate over all feeds and do a explicit stop. This is part two of elastic/x-pack-elasticsearch#795, part one can be found in elastic/x-pack-elasticsearch#962 .

relates elastic/x-pack-elasticsearch#795

Original commit: elastic/x-pack-elasticsearch@ed1eff83d5
This commit is contained in:
Hendrik Muhs 2017-04-11 13:39:22 +02:00 committed by GitHub
parent 01a8f5a0b3
commit 7ef9a16f45
7 changed files with 293 additions and 117 deletions

View File

@ -19,7 +19,6 @@ import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
@ -36,7 +35,6 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -68,7 +66,6 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobAction.Response, CloseJobAction.RequestBuilder> { public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobAction.Response, CloseJobAction.RequestBuilder> {
@ -465,9 +462,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
// so wait for that to happen here. // so wait for that to happen here.
void waitForJobClosed(Request request, Map<String, Long> jobIdToPersistentTaskId, Response response, void waitForJobClosed(Request request, Map<String, Long> jobIdToPersistentTaskId, Response response,
ActionListener<Response> listener) { ActionListener<Response> listener) {
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> {
request.timeout, logger, threadPool.getThreadContext());
waitForPersistentTaskStatus(stateObserver, persistentTasksCustomMetaData -> {
for (Map.Entry<String, Long> entry : jobIdToPersistentTaskId.entrySet()) { for (Map.Entry<String, Long> entry : jobIdToPersistentTaskId.entrySet()) {
long persistentTaskId = entry.getValue(); long persistentTaskId = entry.getValue();
if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) {
@ -475,7 +470,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
} }
} }
return true; return true;
}, new ActionListener<Boolean>() { }, request.timeout, new ActionListener<Boolean>() {
@Override @Override
public void onResponse(Boolean result) { public void onResponse(Boolean result) {
Set<String> jobIds = jobIdToPersistentTaskId.keySet(); Set<String> jobIds = jobIdToPersistentTaskId.keySet();
@ -501,34 +496,6 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
} }
}); });
} }
void waitForPersistentTaskStatus(ClusterStateObserver stateObserver,
Predicate<PersistentTasksCustomMetaData> predicate,
ActionListener<Boolean> listener) {
if (predicate.test(stateObserver.setAndGetObservedState().metaData()
.custom(PersistentTasksCustomMetaData.TYPE))) {
listener.onResponse(true);
} else {
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
listener.onResponse(true);
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new IllegalStateException("timed out after " + timeout));
}
}, clusterState -> predicate
.test(clusterState.metaData()
.custom(PersistentTasksCustomMetaData.TYPE)));
}
}
} }
static List<String> resolveAndValidateJobId(String jobId, ClusterState state) { static List<String> resolveAndValidateJobId(String jobId, ClusterState state) {

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.action; package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action; import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -27,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -36,16 +38,25 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksService; import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class StopDatafeedAction public class StopDatafeedAction
extends Action<StopDatafeedAction.Request, StopDatafeedAction.Response, StopDatafeedAction.RequestBuilder> { extends Action<StopDatafeedAction.Request, StopDatafeedAction.Response, StopDatafeedAction.RequestBuilder> {
@ -94,21 +105,31 @@ public class StopDatafeedAction
} }
private String datafeedId; private String datafeedId;
private String[] resolvedDatafeedIds;
private TimeValue stopTimeout = DEFAULT_TIMEOUT; private TimeValue stopTimeout = DEFAULT_TIMEOUT;
private boolean force = false; private boolean force = false;
public Request(String jobId) { public Request(String datafeedId) {
this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName()); this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
this.resolvedDatafeedIds = new String[] { datafeedId };
setActions(StartDatafeedAction.NAME); setActions(StartDatafeedAction.NAME);
} }
Request() { Request() {
} }
public String getDatafeedId() { private String getDatafeedId() {
return datafeedId; return datafeedId;
} }
private String[] getResolvedDatafeedIds() {
return resolvedDatafeedIds;
}
private void setResolvedDatafeedIds(String[] resolvedDatafeedIds) {
this.resolvedDatafeedIds = resolvedDatafeedIds;
}
public TimeValue getStopTimeout() { public TimeValue getStopTimeout() {
return stopTimeout; return stopTimeout;
} }
@ -127,8 +148,13 @@ public class StopDatafeedAction
@Override @Override
public boolean match(Task task) { public boolean match(Task task) {
String expectedDescription = "datafeed-" + datafeedId; for (String id : resolvedDatafeedIds) {
return task instanceof StartDatafeedAction.DatafeedTask && expectedDescription.equals(task.getDescription()); String expectedDescription = "datafeed-" + id;
if (task instanceof StartDatafeedAction.DatafeedTask && expectedDescription.equals(task.getDescription())){
return true;
}
}
return false;
} }
@Override @Override
@ -140,6 +166,7 @@ public class StopDatafeedAction
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
datafeedId = in.readString(); datafeedId = in.readString();
resolvedDatafeedIds = in.readStringArray();
stopTimeout = new TimeValue(in); stopTimeout = new TimeValue(in);
force = in.readBoolean(); force = in.readBoolean();
} }
@ -148,6 +175,7 @@ public class StopDatafeedAction
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeString(datafeedId); out.writeString(datafeedId);
out.writeStringArray(resolvedDatafeedIds);
stopTimeout.writeTo(out); stopTimeout.writeTo(out);
out.writeBoolean(force); out.writeBoolean(force);
} }
@ -241,33 +269,99 @@ public class StopDatafeedAction
MlMetadata mlMetadata = state.getMetaData().custom(MlMetadata.TYPE); MlMetadata mlMetadata = state.getMetaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
List<String> resolvedDatafeeds = resolve(request.getDatafeedId(), mlMetadata, tasks);
if (resolvedDatafeeds.isEmpty()) {
listener.onResponse(new Response(true));
return;
}
request.setResolvedDatafeedIds(resolvedDatafeeds.toArray(new String[resolvedDatafeeds.size()]));
if (request.force) { if (request.force) {
PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks); final AtomicInteger counter = new AtomicInteger();
final AtomicArray<Exception> failures = new AtomicArray<>(resolvedDatafeeds.size());
for (String datafeedId : resolvedDatafeeds) {
PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
if (datafeedTask != null) { if (datafeedTask != null) {
forceStopTask(datafeedTask.getId(), listener); persistentTasksService.cancelPersistentTask(datafeedTask.getId(), new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
if (counter.incrementAndGet() == resolvedDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
listener.onResponse(new Response(true));
}
@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
failures.set(slot - 1, e);
if (slot == resolvedDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
}
});
} else { } else {
String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " + String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " +
"datafeed's task could not be found."; "datafeed's task could not be found.";
logger.warn(msg); logger.warn(msg);
listener.onFailure(new RuntimeException(msg)); final int slot = counter.incrementAndGet();
failures.set(slot - 1, new RuntimeException(msg));
if (slot == resolvedDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
}
} }
} else { } else {
PersistentTask<?> datafeedTask = validateAndReturnDatafeedTask(request.getDatafeedId(), mlMetadata, tasks); Set<String> executorNodes = new HashSet<>();
request.setNodes(datafeedTask.getExecutorNode()); Map<String, Long> datafeedIdToPersistentTaskId = new HashMap<>();
for (String datafeedId : resolvedDatafeeds) {
PersistentTask<?> datafeedTask = validateAndReturnDatafeedTask(datafeedId, mlMetadata, tasks);
executorNodes.add(datafeedTask.getExecutorNode());
datafeedIdToPersistentTaskId.put(datafeedId, datafeedTask.getId());
}
ActionListener<Response> finalListener = ActionListener<Response> finalListener =
ActionListener.wrap(r -> waitForDatafeedStopped(datafeedTask.getId(), request, r, listener), listener::onFailure); ActionListener.wrap(r -> waitForDatafeedStopped(datafeedIdToPersistentTaskId, request, r, listener), listener::onFailure);
request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
super.doExecute(task, request, finalListener); super.doExecute(task, request, finalListener);
} }
} }
private void sendResponseOrFailure(String datafeedId, ActionListener<Response> listener,
AtomicArray<Exception> failures) {
List<Exception> catchedExceptions = failures.asList();
if (catchedExceptions.size() == 0) {
listener.onResponse(new Response(true));
return;
}
String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + catchedExceptions.size()
+ "] failures, rethrowing last, all Exceptions: ["
+ catchedExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+ "]";
ElasticsearchException e = new ElasticsearchException(msg,
catchedExceptions.get(0));
listener.onFailure(e);
}
// Wait for datafeed to be marked as stopped in cluster state, which means the datafeed persistent task has been removed // Wait for datafeed to be marked as stopped in cluster state, which means the datafeed persistent task has been removed
// This api returns when task has been cancelled, but that doesn't mean the persistent task has been removed from cluster state, // This api returns when task has been cancelled, but that doesn't mean the persistent task has been removed from cluster state,
// so wait for that to happen here. // so wait for that to happen here.
void waitForDatafeedStopped(long persistentTaskId, Request request, Response response, ActionListener<Response> listener) { void waitForDatafeedStopped(Map<String, Long> datafeedIdToPersistentTaskId, Request request, Response response, ActionListener<Response> listener) {
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.getStopTimeout(), persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> {
new WaitForPersistentTaskStatusListener<StartDatafeedAction.Request>() { for (Map.Entry<String, Long> entry : datafeedIdToPersistentTaskId.entrySet()) {
long persistentTaskId = entry.getValue();
if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) {
return false;
}
}
return true;
}, request.getTimeout(), new ActionListener<Boolean>() {
@Override @Override
public void onResponse(PersistentTask<StartDatafeedAction.Request> task) { public void onResponse(Boolean result) {
listener.onResponse(response); listener.onResponse(response);
} }
@ -278,24 +372,26 @@ public class StopDatafeedAction
}); });
} }
private void forceStopTask(long persistentTaskId, ActionListener<Response> listener) {
persistentTasksService.cancelPersistentTask(persistentTaskId, new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
listener.onResponse(new Response(true));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
@Override @Override
protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures, protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) { List<FailedNodeException> failedNodeExceptions) {
return TransportJobTaskAction.selectFirst(tasks, taskOperationFailures, failedNodeExceptions); // number of resolved data feeds should be equal to the number of
// tasks, otherwise something went wrong
if (request.getResolvedDatafeedIds().length != tasks.size()) {
if (taskOperationFailures.isEmpty() == false) {
throw org.elasticsearch.ExceptionsHelper
.convertToElastic(taskOperationFailures.get(0).getCause());
} else if (failedNodeExceptions.isEmpty() == false) {
throw org.elasticsearch.ExceptionsHelper
.convertToElastic(failedNodeExceptions.get(0));
} else {
throw new IllegalStateException(
"Expected [" + request.getResolvedDatafeedIds().length
+ "] number of tasks but " + "got [" + tasks.size() + "]");
}
}
return new Response(tasks.stream().allMatch(Response::isStopped));
} }
@Override @Override
@ -315,6 +411,36 @@ public class StopDatafeedAction
} }
} }
static List<String> resolve(String datafeedId, MlMetadata mlMetadata,
PersistentTasksCustomMetaData tasks) {
if (!Job.ALL.equals(datafeedId)) {
return Collections.singletonList(datafeedId);
}
if (mlMetadata.getDatafeeds().isEmpty()) {
return Collections.emptyList();
}
List<String> matched_datafeeds = new ArrayList<>();
for (Map.Entry<String, DatafeedConfig> datafeedEntry : mlMetadata.getDatafeeds()
.entrySet()) {
String resolvedDatafeedId = datafeedEntry.getKey();
DatafeedConfig datafeed = datafeedEntry.getValue();
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(),
tasks);
if (datafeedState == DatafeedState.STOPPED) {
continue;
}
matched_datafeeds.add(resolvedDatafeedId);
}
return matched_datafeeds;
}
static PersistentTask<?> validateAndReturnDatafeedTask(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) { static PersistentTask<?> validateAndReturnDatafeedTask(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) {
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
if (datafeed == null) { if (datafeed == null) {

View File

@ -143,6 +143,32 @@ public class PersistentTasksService extends AbstractComponent {
} }
} }
public void waitForPersistentTasksStatus(Predicate<PersistentTasksCustomMetaData> predicate,
@Nullable TimeValue timeout, ActionListener<Boolean> listener) {
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout,
logger, threadPool.getThreadContext());
if (predicate.test(stateObserver.setAndGetObservedState().metaData().custom(PersistentTasksCustomMetaData.TYPE))) {
listener.onResponse(true);
} else {
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
listener.onResponse(true);
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new IllegalStateException("timed out after " + timeout));
}
}, clusterState -> predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE)));
}
}
public interface WaitForPersistentTaskStatusListener<Request extends PersistentTaskRequest> public interface WaitForPersistentTaskStatusListener<Request extends PersistentTaskRequest>
extends ActionListener<PersistentTask<Request>> { extends ActionListener<PersistentTask<Request>> {
default void onTimeout(TimeValue timeout) { default void onTimeout(TimeValue timeout) {

View File

@ -10,17 +10,22 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.MlMetadata.Builder;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction.Request; import org.elasticsearch.xpack.ml.action.StopDatafeedAction.Request;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTaskRequest; import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob;
@ -88,4 +93,38 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
assertThat(e.getMessage(), equalTo("Cannot stop datafeed [foo] because it has already been stopped")); assertThat(e.getMessage(), equalTo("Cannot stop datafeed [foo] because it has already been stopped"));
} }
public void testResolveAll() {
Map<Long, PersistentTask<?>> taskMap = new HashMap<>();
Builder mlMetadataBuilder = new MlMetadata.Builder();
PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>(1L, StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_1", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED);
taskMap.put(1L, task);
Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date());
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
task = new PersistentTask<PersistentTaskRequest>(2L, StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_2", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STOPPED);
taskMap.put(2L, task);
job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date());
datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
task = new PersistentTask<PersistentTaskRequest>(3L, StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_3", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED);
taskMap.put(3L, task);
job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date());
datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, taskMap);
MlMetadata mlMetadata = mlMetadataBuilder.build();
assertEquals(Arrays.asList("datafeed_1", "datafeed_3"), StopDatafeedAction.resolve("_all", mlMetadata, tasks));
}
} }

View File

@ -50,21 +50,30 @@ public class MlRestTestStateCleaner {
return; return;
} }
try {
int statusCode = adminClient.performRequest("POST", "/_xpack/ml/datafeeds/_all/_stop")
.getStatusLine().getStatusCode();
if (statusCode != 200) {
logger.error("Got status code " + statusCode + " when stopping datafeeds");
}
} catch (Exception e1) {
logger.warn("failed to stop all datafeeds. Forcing stop", e1);
try {
int statusCode = adminClient
.performRequest("POST", "/_xpack/ml/datafeeds/_all/_stop?force=true")
.getStatusLine().getStatusCode();
if (statusCode != 200) {
logger.error("Got status code " + statusCode + " when stopping datafeeds");
}
} catch (Exception e2) {
logger.warn("Force-closing all data feeds failed", e2);
}
throw new RuntimeException(
"Had to resort to force-stopping datafeeds, something went wrong?", e1);
}
for (Map<String, Object> datafeed : datafeeds) { for (Map<String, Object> datafeed : datafeeds) {
String datafeedId = (String) datafeed.get("datafeed_id"); String datafeedId = (String) datafeed.get("datafeed_id");
try {
int statusCode = adminClient.performRequest("POST",
"/_xpack/ml/datafeeds/" + datafeedId + "/_stop").getStatusLine().getStatusCode();
if (statusCode != 200) {
logger.error("Got status code " + statusCode + " when stopping datafeed " + datafeedId);
}
} catch (Exception e) {
if (e.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) {
logger.debug("failed to stop datafeed [" + datafeedId + "]", e);
} else {
logger.warn("failed to stop datafeed [" + datafeedId + "]", e);
}
}
int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/datafeeds/" + datafeedId).getStatusLine().getStatusCode(); int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/datafeeds/" + datafeedId).getStatusLine().getStatusCode();
if (statusCode != 200) { if (statusCode != 200) {
logger.error("Got status code " + statusCode + " when deleting datafeed " + datafeedId); logger.error("Got status code " + statusCode + " when deleting datafeed " + datafeedId);

View File

@ -249,28 +249,28 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
public static void deleteAllDatafeeds(Logger logger, Client client) throws Exception { public static void deleteAllDatafeeds(Logger logger, Client client) throws Exception {
MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData(); MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData();
MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE); MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE);
for (DatafeedConfig datafeed : mlMetadata.getDatafeeds().values()) {
String datafeedId = datafeed.getId();
try { try {
logger.info("Closing datafeed [{}]", datafeedId); logger.info("Closing all datafeeds (using _all)");
StopDatafeedAction.Response stopResponse = StopDatafeedAction.Response stopResponse = client
client.execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).get(); .execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request("_all"))
.get();
assertTrue(stopResponse.isStopped()); assertTrue(stopResponse.isStopped());
} catch (ExecutionException e1) { } catch (ExecutionException e1) {
if (e1.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) {
logger.debug("failed to stop datafeed [" + datafeedId + "], already stopped", e1);
} else {
try { try {
StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedId); StopDatafeedAction.Request request = new StopDatafeedAction.Request("_all");
request.setForce(true); request.setForce(true);
StopDatafeedAction.Response stopResponse = client.execute(StopDatafeedAction.INSTANCE, request).get(); StopDatafeedAction.Response stopResponse = client
.execute(StopDatafeedAction.INSTANCE, request).get();
assertTrue(stopResponse.isStopped()); assertTrue(stopResponse.isStopped());
} catch (Exception e2) { } catch (ExecutionException e2) {
logger.warn("Force-stopping datafeed [" + datafeedId + "] failed.", e2); logger.warn("Force-stopping datafeed with _all failed.", e2);
}
throw new RuntimeException("Had to resort to force-stopping datafeed, something went wrong?", e1);
} }
throw new RuntimeException(
"Had to resort to force-stopping datafeed, something went wrong?", e1);
} }
for (DatafeedConfig datafeed : mlMetadata.getDatafeeds().values()) {
String datafeedId = datafeed.getId();
assertBusy(() -> { assertBusy(() -> {
try { try {
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId); GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);

View File

@ -50,21 +50,30 @@ public class MlRestTestStateCleaner {
return; return;
} }
try {
int statusCode = adminClient.performRequest("POST", "/_xpack/ml/datafeeds/_all/_stop")
.getStatusLine().getStatusCode();
if (statusCode != 200) {
logger.error("Got status code " + statusCode + " when stopping datafeeds");
}
} catch (Exception e1) {
logger.warn("failed to stop all datafeeds. Forcing stop", e1);
try {
int statusCode = adminClient
.performRequest("POST", "/_xpack/ml/datafeeds/_all/_stop?force=true")
.getStatusLine().getStatusCode();
if (statusCode != 200) {
logger.error("Got status code " + statusCode + " when stopping datafeeds");
}
} catch (Exception e2) {
logger.warn("Force-closing all data feeds failed", e2);
}
throw new RuntimeException(
"Had to resort to force-stopping datafeeds, something went wrong?", e1);
}
for (Map<String, Object> datafeed : datafeeds) { for (Map<String, Object> datafeed : datafeeds) {
String datafeedId = (String) datafeed.get("datafeed_id"); String datafeedId = (String) datafeed.get("datafeed_id");
try {
int statusCode = adminClient.performRequest("POST",
"/_xpack/ml/datafeeds/" + datafeedId + "/_stop").getStatusLine().getStatusCode();
if (statusCode != 200) {
logger.error("Got status code " + statusCode + " when stopping datafeed " + datafeedId);
}
} catch (Exception e) {
if (e.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) {
logger.debug("failed to stop datafeed [" + datafeedId + "]", e);
} else {
logger.warn("failed to stop datafeed [" + datafeedId + "]", e);
}
}
int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/datafeeds/" + datafeedId).getStatusLine().getStatusCode(); int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/datafeeds/" + datafeedId).getStatusLine().getStatusCode();
if (statusCode != 200) { if (statusCode != 200) {
logger.error("Got status code " + statusCode + " when deleting datafeed " + datafeedId); logger.error("Got status code " + statusCode + " when deleting datafeed " + datafeedId);