diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java index c1482ca784d..47b056a2650 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java @@ -52,6 +52,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponseHandler; @@ -279,6 +280,32 @@ public class CancellableTasksIT extends ESIntegTestCase { ensureAllBansRemoved(); } + public void testCancelOrphanedTasks() throws Exception { + final String nodeWithRootTask = internalCluster().startDataOnlyNode(); + Set nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet()); + TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 3)); + client(nodeWithRootTask).execute(TransportTestAction.ACTION, rootRequest); + allowPartialRequest(rootRequest); + try { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeWithRootTask)); + assertBusy(() -> { + for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { + for (CancellableTask task : transportService.getTaskManager().getCancellableTasks().values()) { + if (task.getAction().equals(TransportTestAction.ACTION.name())) { + final TaskInfo taskInfo = task.taskInfo(transportService.getLocalNode().getId(), false); + assertTrue(taskInfo.toString(), task.isCancelled()); + assertNotNull(taskInfo.toString(), task.getReasonCancelled()); + assertThat(taskInfo.toString(), task.getReasonCancelled(), equalTo("channel was closed")); + } + } + } + }, 30, TimeUnit.SECONDS); + } finally { + allowEntireRequest(rootRequest); + ensureAllBansRemoved(); + } + } + static TaskId getRootTaskId(TestRequest request) throws Exception { SetOnce taskId = new SetOnce<>(); assertBusy(() -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java index c95c4df9b40..273d245ec57 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java @@ -19,38 +19,20 @@ package org.elasticsearch.action.admin.cluster.node.tasks.cancel; -import org.elasticsearch.ElasticsearchSecurityException; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.StepListener; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.ChannelActionListener; -import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.tasks.TransportTasksAction; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; -import java.io.IOException; -import java.util.Collection; import java.util.List; import java.util.function.Consumer; @@ -62,14 +44,10 @@ import java.util.function.Consumer; */ public class TransportCancelTasksAction extends TransportTasksAction { - public static final String BAN_PARENT_ACTION_NAME = "internal:admin/tasks/ban"; - @Inject public TransportCancelTasksAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) { super(CancelTasksAction.NAME, clusterService, transportService, actionFilters, CancelTasksRequest::new, CancelTasksResponse::new, TaskInfo::new, ThreadPool.Names.MANAGEMENT); - transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, ThreadPool.Names.SAME, BanParentTaskRequest::new, - new BanParentRequestHandler()); } @Override @@ -108,172 +86,8 @@ public class TransportCancelTasksAction extends TransportTasksAction listener) { String nodeId = clusterService.localNode().getId(); - cancelTaskAndDescendants(cancellableTask, request.getReason(), request.waitForCompletion(), + taskManager.cancelTaskAndDescendants(cancellableTask, request.getReason(), request.waitForCompletion(), ActionListener.map(listener, r -> cancellableTask.taskInfo(nodeId, false))); } - - void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener listener) { - final TaskId taskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId(); - if (task.shouldCancelChildrenOnCancellation()) { - logger.trace("cancelling task [{}] and its descendants", taskId); - StepListener completedListener = new StepListener<>(); - GroupedActionListener groupedListener = new GroupedActionListener<>(ActionListener.map(completedListener, r -> null), 3); - Collection childrenNodes = taskManager.startBanOnChildrenNodes(task.getId(), () -> { - logger.trace("child tasks of parent [{}] are completed", taskId); - groupedListener.onResponse(null); - }); - taskManager.cancel(task, reason, () -> { - logger.trace("task [{}] is cancelled", taskId); - groupedListener.onResponse(null); - }); - StepListener banOnNodesListener = new StepListener<>(); - setBanOnNodes(reason, waitForCompletion, task, childrenNodes, banOnNodesListener); - banOnNodesListener.whenComplete(groupedListener::onResponse, groupedListener::onFailure); - // If we start unbanning when the last child task completed and that child task executed with a specific user, then unban - // requests are denied because internal requests can't run with a user. We need to remove bans with the current thread context. - final Runnable removeBansRunnable = transportService.getThreadPool().getThreadContext() - .preserveContext(() -> removeBanOnNodes(task, childrenNodes)); - // We remove bans after all child tasks are completed although in theory we can do it on a per-node basis. - completedListener.whenComplete(r -> removeBansRunnable.run(), e -> removeBansRunnable.run()); - // if wait_for_completion is true, then only return when (1) bans are placed on child nodes, (2) child tasks are - // completed or failed, (3) the main task is cancelled. Otherwise, return after bans are placed on child nodes. - if (waitForCompletion) { - completedListener.whenComplete(r -> listener.onResponse(null), listener::onFailure); - } else { - banOnNodesListener.whenComplete(r -> listener.onResponse(null), listener::onFailure); - } - } else { - logger.trace("task [{}] doesn't have any children that should be cancelled", taskId); - if (waitForCompletion) { - taskManager.cancel(task, reason, () -> listener.onResponse(null)); - } else { - taskManager.cancel(task, reason, () -> {}); - listener.onResponse(null); - } - } - } - - private void setBanOnNodes(String reason, boolean waitForCompletion, CancellableTask task, - Collection childNodes, ActionListener listener) { - if (childNodes.isEmpty()) { - listener.onResponse(null); - return; - } - final TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); - logger.trace("cancelling child tasks of [{}] on child nodes {}", taskId, childNodes); - GroupedActionListener groupedListener = - new GroupedActionListener<>(ActionListener.map(listener, r -> null), childNodes.size()); - final BanParentTaskRequest banRequest = BanParentTaskRequest.createSetBanParentTaskRequest(taskId, reason, waitForCompletion); - for (DiscoveryNode node : childNodes) { - transportService.sendRequest(node, BAN_PARENT_ACTION_NAME, banRequest, - new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleResponse(TransportResponse.Empty response) { - logger.trace("sent ban for tasks with the parent [{}] to the node [{}]", taskId, node); - groupedListener.onResponse(null); - } - - @Override - public void handleException(TransportException exp) { - assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false; - logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", taskId, node); - groupedListener.onFailure(exp); - } - }); - } - } - - private void removeBanOnNodes(CancellableTask task, Collection childNodes) { - final BanParentTaskRequest request = - BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId())); - for (DiscoveryNode node : childNodes) { - logger.trace("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node); - transportService.sendRequest(node, BAN_PARENT_ACTION_NAME, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleException(TransportException exp) { - assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false; - logger.info("failed to remove the parent ban for task {} on node {}", request.parentTaskId, node); - } - }); - } - } - - private static class BanParentTaskRequest extends TransportRequest { - - private final TaskId parentTaskId; - private final boolean ban; - private final boolean waitForCompletion; - private final String reason; - - static BanParentTaskRequest createSetBanParentTaskRequest(TaskId parentTaskId, String reason, boolean waitForCompletion) { - return new BanParentTaskRequest(parentTaskId, reason, waitForCompletion); - } - - static BanParentTaskRequest createRemoveBanParentTaskRequest(TaskId parentTaskId) { - return new BanParentTaskRequest(parentTaskId); - } - - private BanParentTaskRequest(TaskId parentTaskId, String reason, boolean waitForCompletion) { - this.parentTaskId = parentTaskId; - this.ban = true; - this.reason = reason; - this.waitForCompletion = waitForCompletion; - } - - private BanParentTaskRequest(TaskId parentTaskId) { - this.parentTaskId = parentTaskId; - this.ban = false; - this.reason = null; - this.waitForCompletion = false; - } - - private BanParentTaskRequest(StreamInput in) throws IOException { - super(in); - parentTaskId = TaskId.readFromStream(in); - ban = in.readBoolean(); - reason = ban ? in.readString() : null; - if (in.getVersion().onOrAfter(Version.V_7_8_0)) { - waitForCompletion = in.readBoolean(); - } else { - waitForCompletion = false; - } - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - parentTaskId.writeTo(out); - out.writeBoolean(ban); - if (ban) { - out.writeString(reason); - } - if (out.getVersion().onOrAfter(Version.V_7_8_0)) { - out.writeBoolean(waitForCompletion); - } - } - } - - class BanParentRequestHandler implements TransportRequestHandler { - @Override - public void messageReceived(final BanParentTaskRequest request, final TransportChannel channel, Task task) throws Exception { - if (request.ban) { - logger.debug("Received ban for the parent [{}] on the node [{}], reason: [{}]", request.parentTaskId, - clusterService.localNode().getId(), request.reason); - final List childTasks = taskManager.setBan(request.parentTaskId, request.reason); - final GroupedActionListener listener = new GroupedActionListener<>(ActionListener.map( - new ChannelActionListener<>(channel, BAN_PARENT_ACTION_NAME, request), r -> TransportResponse.Empty.INSTANCE), - childTasks.size() + 1); - for (CancellableTask childTask : childTasks) { - cancelTaskAndDescendants(childTask, request.reason, request.waitForCompletion, listener); - } - listener.onResponse(null); - } else { - logger.debug("Removing ban for the parent [{}] on the node [{}]", request.parentTaskId, - clusterService.localNode().getId()); - taskManager.removeBan(request.parentTaskId); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - } - } - } + diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 60df04a4eaf..1debc06beae 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -157,6 +157,7 @@ import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotShardsService; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskCancellationService; import org.elasticsearch.tasks.TaskResultsService; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; @@ -735,6 +736,7 @@ public class Node implements Closeable { // Start the transport service now so the publish address will be added to the local disco node in ClusterService TransportService transportService = injector.getInstance(TransportService.class); transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class)); + transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService)); transportService.start(); assert localNodeFactory.getNode() != null; assert transportService.getLocalNode().equals(localNodeFactory.getNode()) diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java b/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java new file mode 100644 index 00000000000..6ab511d3de3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java @@ -0,0 +1,226 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.tasks; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.EmptyTransportResponseHandler; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +public class TaskCancellationService { + public static final String BAN_PARENT_ACTION_NAME = "internal:admin/tasks/ban"; + private static final Logger logger = LogManager.getLogger(TaskCancellationService.class); + private final TransportService transportService; + private final TaskManager taskManager; + + public TaskCancellationService(TransportService transportService) { + this.transportService = transportService; + this.taskManager = transportService.getTaskManager(); + transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, ThreadPool.Names.SAME, BanParentTaskRequest::new, + new BanParentRequestHandler()); + } + + private String localNodeId() { + return transportService.getLocalNode().getId(); + } + + void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener listener) { + final TaskId taskId = task.taskInfo(localNodeId(), false).getTaskId(); + if (task.shouldCancelChildrenOnCancellation()) { + logger.trace("cancelling task [{}] and its descendants", taskId); + StepListener completedListener = new StepListener<>(); + GroupedActionListener groupedListener = new GroupedActionListener<>(ActionListener.map(completedListener, r -> null), 3); + Collection childrenNodes = taskManager.startBanOnChildrenNodes(task.getId(), () -> { + logger.trace("child tasks of parent [{}] are completed", taskId); + groupedListener.onResponse(null); + }); + taskManager.cancel(task, reason, () -> { + logger.trace("task [{}] is cancelled", taskId); + groupedListener.onResponse(null); + }); + StepListener banOnNodesListener = new StepListener<>(); + setBanOnNodes(reason, waitForCompletion, task, childrenNodes, banOnNodesListener); + banOnNodesListener.whenComplete(groupedListener::onResponse, groupedListener::onFailure); + // If we start unbanning when the last child task completed and that child task executed with a specific user, then unban + // requests are denied because internal requests can't run with a user. We need to remove bans with the current thread context. + final Runnable removeBansRunnable = transportService.getThreadPool().getThreadContext() + .preserveContext(() -> removeBanOnNodes(task, childrenNodes)); + // We remove bans after all child tasks are completed although in theory we can do it on a per-node basis. + completedListener.whenComplete(r -> removeBansRunnable.run(), e -> removeBansRunnable.run()); + // if wait_for_completion is true, then only return when (1) bans are placed on child nodes, (2) child tasks are + // completed or failed, (3) the main task is cancelled. Otherwise, return after bans are placed on child nodes. + if (waitForCompletion) { + completedListener.whenComplete(r -> listener.onResponse(null), listener::onFailure); + } else { + banOnNodesListener.whenComplete(r -> listener.onResponse(null), listener::onFailure); + } + } else { + logger.trace("task [{}] doesn't have any children that should be cancelled", taskId); + if (waitForCompletion) { + taskManager.cancel(task, reason, () -> listener.onResponse(null)); + } else { + taskManager.cancel(task, reason, () -> {}); + listener.onResponse(null); + } + } + } + + private void setBanOnNodes(String reason, boolean waitForCompletion, CancellableTask task, + Collection childNodes, ActionListener listener) { + if (childNodes.isEmpty()) { + listener.onResponse(null); + return; + } + final TaskId taskId = new TaskId(localNodeId(), task.getId()); + logger.trace("cancelling child tasks of [{}] on child nodes {}", taskId, childNodes); + GroupedActionListener groupedListener = + new GroupedActionListener<>(ActionListener.map(listener, r -> null), childNodes.size()); + final BanParentTaskRequest banRequest = BanParentTaskRequest.createSetBanParentTaskRequest(taskId, reason, waitForCompletion); + for (DiscoveryNode node : childNodes) { + transportService.sendRequest(node, BAN_PARENT_ACTION_NAME, banRequest, + new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleResponse(TransportResponse.Empty response) { + logger.trace("sent ban for tasks with the parent [{}] to the node [{}]", taskId, node); + groupedListener.onResponse(null); + } + + @Override + public void handleException(TransportException exp) { + assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false; + logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", taskId, node); + groupedListener.onFailure(exp); + } + }); + } + } + + private void removeBanOnNodes(CancellableTask task, Collection childNodes) { + final BanParentTaskRequest request = + BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(localNodeId(), task.getId())); + for (DiscoveryNode node : childNodes) { + logger.trace("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node); + transportService.sendRequest(node, BAN_PARENT_ACTION_NAME, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleException(TransportException exp) { + assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false; + logger.info("failed to remove the parent ban for task {} on node {}", request.parentTaskId, node); + } + }); + } + } + + private static class BanParentTaskRequest extends TransportRequest { + + private final TaskId parentTaskId; + private final boolean ban; + private final boolean waitForCompletion; + private final String reason; + + static BanParentTaskRequest createSetBanParentTaskRequest(TaskId parentTaskId, String reason, boolean waitForCompletion) { + return new BanParentTaskRequest(parentTaskId, reason, waitForCompletion); + } + + static BanParentTaskRequest createRemoveBanParentTaskRequest(TaskId parentTaskId) { + return new BanParentTaskRequest(parentTaskId); + } + + private BanParentTaskRequest(TaskId parentTaskId, String reason, boolean waitForCompletion) { + this.parentTaskId = parentTaskId; + this.ban = true; + this.reason = reason; + this.waitForCompletion = waitForCompletion; + } + + private BanParentTaskRequest(TaskId parentTaskId) { + this.parentTaskId = parentTaskId; + this.ban = false; + this.reason = null; + this.waitForCompletion = false; + } + + private BanParentTaskRequest(StreamInput in) throws IOException { + super(in); + parentTaskId = TaskId.readFromStream(in); + ban = in.readBoolean(); + reason = ban ? in.readString() : null; + if (in.getVersion().onOrAfter(Version.V_7_8_0)) { + waitForCompletion = in.readBoolean(); + } else { + waitForCompletion = false; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + parentTaskId.writeTo(out); + out.writeBoolean(ban); + if (ban) { + out.writeString(reason); + } + if (out.getVersion().onOrAfter(Version.V_7_8_0)) { + out.writeBoolean(waitForCompletion); + } + } + } + + private class BanParentRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(final BanParentTaskRequest request, final TransportChannel channel, Task task) throws Exception { + if (request.ban) { + logger.debug("Received ban for the parent [{}] on the node [{}], reason: [{}]", request.parentTaskId, + localNodeId(), request.reason); + final List childTasks = taskManager.setBan(request.parentTaskId, request.reason); + final GroupedActionListener listener = new GroupedActionListener<>(ActionListener.map( + new ChannelActionListener<>(channel, BAN_PARENT_ACTION_NAME, request), r -> TransportResponse.Empty.INSTANCE), + childTasks.size() + 1); + for (CancellableTask childTask : childTasks) { + cancelTaskAndDescendants(childTask, request.reason, request.waitForCompletion, listener); + } + listener.onResponse(null); + } else { + logger.debug("Removing ban for the parent [{}] on the node [{}]", request.parentTaskId, localNodeId()); + taskManager.removeBan(request.parentTaskId); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index 24ba3f3976c..256cd3f14ce 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -24,6 +24,8 @@ import com.carrotsearch.hppc.ObjectIntMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; @@ -38,10 +40,12 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TcpChannel; import java.io.IOException; import java.util.ArrayList; @@ -54,6 +58,8 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -88,6 +94,8 @@ public class TaskManager implements ClusterStateApplier { private volatile DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES; private final ByteSizeValue maxHeaderSize; + private final Map channelPendingTaskTrackers = ConcurrentCollections.newConcurrentMap(); + private final SetOnce cancellationService = new SetOnce<>(); public TaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { this.threadPool = threadPool; @@ -100,6 +108,10 @@ public class TaskManager implements ClusterStateApplier { this.taskResultsService = taskResultsService; } + public void setTaskCancellationService(TaskCancellationService taskCancellationService) { + this.cancellationService.set(taskCancellationService); + } + /** * Registers a task without parent task */ @@ -404,17 +416,6 @@ public class TaskManager implements ClusterStateApplier { } } } - // Cancel cancellable tasks for the nodes that are gone - for (Map.Entry taskEntry : cancellableTasks.entrySet()) { - CancellableTaskHolder holder = taskEntry.getValue(); - CancellableTask task = holder.getTask(); - TaskId parentTaskId = task.getParentTaskId(); - if (parentTaskId.isSet() && lastDiscoveryNodes.nodeExists(parentTaskId.getNodeId()) == false) { - if (task.cancelOnParentLeaving()) { - holder.cancel("Coordinating node [" + parentTaskId.getNodeId() + "] left the cluster"); - } - } - } } } @@ -569,4 +570,98 @@ public class TaskManager implements ClusterStateApplier { } } + /** + * Start tracking a cancellable task with its tcp channel, so if the channel gets closed we can get a set of + * pending tasks associated that channel and cancel them as these results won't be retrieved by the parent task. + * + * @return a releasable that should be called when this pending task is completed + */ + public Releasable startTrackingCancellableChannelTask(TcpChannel channel, CancellableTask task) { + assert cancellableTasks.containsKey(task.getId()) : "task [" + task.getId() + "] is not registered yet"; + final ChannelPendingTaskTracker tracker = channelPendingTaskTrackers.compute(channel, (k, curr) -> { + if (curr == null) { + curr = new ChannelPendingTaskTracker(); + } + curr.addTask(task); + return curr; + }); + if (tracker.registered.compareAndSet(false, true)) { + channel.addCloseListener(ActionListener.wrap( + r -> { + final ChannelPendingTaskTracker removedTracker = channelPendingTaskTrackers.remove(channel); + assert removedTracker == tracker; + cancelTasksOnChannelClosed(tracker.drainTasks()); + }, + e -> { + assert false : new AssertionError("must not be here", e); + })); + } + return () -> tracker.removeTask(task); + } + + // for testing + final int numberOfChannelPendingTaskTrackers() { + return channelPendingTaskTrackers.size(); + } + + private static class ChannelPendingTaskTracker { + final AtomicBoolean registered = new AtomicBoolean(); + final Semaphore permits = Assertions.ENABLED ? new Semaphore(Integer.MAX_VALUE) : null; + final Set pendingTasks = ConcurrentCollections.newConcurrentSet(); + + void addTask(CancellableTask task) { + assert permits.tryAcquire() : "tracker was drained"; + final boolean added = pendingTasks.add(task); + assert added : "task " + task.getId() + " is in the pending list already"; + assert releasePermit(); + } + + boolean acquireAllPermits() { + permits.acquireUninterruptibly(Integer.MAX_VALUE); + return true; + } + + boolean releasePermit() { + permits.release(); + return true; + } + + Set drainTasks() { + assert acquireAllPermits(); // do not release permits so we can't add tasks to this tracker after draining + return Collections.unmodifiableSet(pendingTasks); + } + + void removeTask(CancellableTask task) { + final boolean removed = pendingTasks.remove(task); + assert removed : "task " + task.getId() + " is not in the pending list"; + } + } + + private void cancelTasksOnChannelClosed(Set tasks) { + if (tasks.isEmpty() == false) { + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.warn("failed to cancel tasks on channel closed", e); + } + + @Override + protected void doRun() { + for (CancellableTask task : tasks) { + cancelTaskAndDescendants(task, "channel was closed", false, ActionListener.wrap(() -> {})); + } + } + }); + } + } + + public void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener listener) { + final TaskCancellationService service = cancellationService.get(); + if (service != null) { + service.cancelTaskAndDescendants(task, reason, waitForCompletion, listener); + } else { + assert false : "TaskCancellationService is not initialized"; + throw new IllegalStateException("TaskCancellationService is not initialized"); + } + } } diff --git a/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java b/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java index 2730a52437b..0c1922d3543 100644 --- a/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java +++ b/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java @@ -21,6 +21,9 @@ package org.elasticsearch.transport; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; @@ -58,14 +61,18 @@ public class RequestHandlerRegistry { public void processMessageReceived(Request request, TransportChannel channel) throws Exception { final Task task = taskManager.register(channel.getChannelType(), action, request); - boolean success = false; + Releasable unregisterTask = () -> taskManager.unregister(task); try { - handler.messageReceived(request, new TaskTransportChannel(taskManager, task, channel), task); - success = true; - } finally { - if (success == false) { - taskManager.unregister(task); + if (channel instanceof TcpTransportChannel && task instanceof CancellableTask) { + final TcpChannel tcpChannel = ((TcpTransportChannel) channel).getChannel(); + final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(tcpChannel, (CancellableTask) task); + unregisterTask = Releasables.wrap(unregisterTask, stopTracking); } + final TaskTransportChannel taskTransportChannel = new TaskTransportChannel(channel, unregisterTask); + handler.messageReceived(request, taskTransportChannel, task); + unregisterTask = null; + } finally { + Releasables.close(unregisterTask); } } diff --git a/server/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java b/server/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java index aa659906019..142fae1a996 100644 --- a/server/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java @@ -20,22 +20,18 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.common.lease.Releasable; import java.io.IOException; public class TaskTransportChannel implements TransportChannel { - private final Task task; - - private final TaskManager taskManager; private final TransportChannel channel; + private final Releasable onTaskFinished; - TaskTransportChannel(TaskManager taskManager, Task task, TransportChannel channel) { + TaskTransportChannel(TransportChannel channel, Releasable onTaskFinished) { this.channel = channel; - this.task = task; - this.taskManager = taskManager; + this.onTaskFinished = onTaskFinished; } @Override @@ -50,14 +46,20 @@ public class TaskTransportChannel implements TransportChannel { @Override public void sendResponse(TransportResponse response) throws IOException { - endTask(); - channel.sendResponse(response); + try { + onTaskFinished.close(); + } finally { + channel.sendResponse(response); + } } @Override public void sendResponse(Exception exception) throws IOException { - endTask(); - channel.sendResponse(exception); + try { + onTaskFinished.close(); + } finally { + channel.sendResponse(exception); + } } @Override @@ -68,8 +70,4 @@ public class TaskTransportChannel implements TransportChannel { public TransportChannel getChannel() { return channel; } - - private void endTask() { - taskManager.unregister(task); - } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java index 87a2df9dc63..5b17e863605 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java @@ -377,7 +377,6 @@ public class CancellableTasksTests extends TaskManagerTestCase { setupTestNodes(Settings.EMPTY); connectNodes(testNodes); CountDownLatch responseLatch = new CountDownLatch(1); - boolean simulateBanBeforeLeaving = randomBoolean(); final AtomicReference responseReference = new AtomicReference<>(); final AtomicReference throwableReference = new AtomicReference<>(); int blockedNodesCount = randomIntBetween(0, nodesCount - 1); @@ -410,40 +409,51 @@ public class CancellableTasksTests extends TaskManagerTestCase { assertThat(listTasksResponse.getTasks().size(), greaterThanOrEqualTo(blockOnNodes.size())); // Simulate the coordinating node leaving the cluster - DiscoveryNode[] discoveryNodes = new DiscoveryNode[testNodes.length - 1]; - for (int i = 1; i < testNodes.length; i++) { - discoveryNodes[i - 1] = testNodes[i].discoveryNode(); - } - DiscoveryNode master = discoveryNodes[0]; - for (int i = 1; i < testNodes.length; i++) { - // Notify only nodes that should remain in the cluster - setState(testNodes[i].clusterService, ClusterStateCreationUtils.state(testNodes[i].discoveryNode(), master, discoveryNodes)); - } - - if (simulateBanBeforeLeaving) { - logger.info("--> Simulate issuing cancel request on the node that is about to leave the cluster"); - // Simulate issuing cancel request on the node that is about to leave the cluster - CancelTasksRequest request = new CancelTasksRequest(); - request.setReason("Testing Cancellation"); - request.setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId())); - // And send the cancellation request to a random node - CancelTasksResponse response = ActionTestUtils.executeBlocking(testNodes[0].transportCancelTasksAction, request); - logger.info("--> Done simulating issuing cancel request on the node that is about to leave the cluster"); - // This node still thinks that's part of the cluster, so cancelling should look successful - if (response.getTasks().size() == 0) { - logger.error("!!!!"); + if (randomBoolean()) { + DiscoveryNode[] discoveryNodes = new DiscoveryNode[testNodes.length - 1]; + for (int i = 1; i < testNodes.length; i++) { + discoveryNodes[i - 1] = testNodes[i].discoveryNode(); + } + DiscoveryNode master = discoveryNodes[0]; + for (int i = 1; i < testNodes.length; i++) { + // Notify only nodes that should remain in the cluster + setState(testNodes[i].clusterService, + ClusterStateCreationUtils.state(testNodes[i].discoveryNode(), master, discoveryNodes)); + } + if (randomBoolean()) { + logger.info("--> Simulate issuing cancel request on the node that is about to leave the cluster"); + // Simulate issuing cancel request on the node that is about to leave the cluster + CancelTasksRequest request = new CancelTasksRequest(); + request.setReason("Testing Cancellation"); + request.setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId())); + // And send the cancellation request to a random node + CancelTasksResponse response = ActionTestUtils.executeBlocking(testNodes[0].transportCancelTasksAction, request); + logger.info("--> Done simulating issuing cancel request on the node that is about to leave the cluster"); + // This node still thinks that's part of the cluster, so cancelling should look successful + if (response.getTasks().size() == 0) { + logger.error("!!!!"); + } + assertThat(response.getTasks().size(), lessThanOrEqualTo(1)); + assertThat(response.getTaskFailures().size(), lessThanOrEqualTo(1)); + assertThat(response.getTaskFailures().size() + response.getTasks().size(), lessThanOrEqualTo(1)); } - assertThat(response.getTasks().size(), lessThanOrEqualTo(1)); - assertThat(response.getTaskFailures().size(), lessThanOrEqualTo(1)); - assertThat(response.getTaskFailures().size() + response.getTasks().size(), lessThanOrEqualTo(1)); } for (int i = 1; i < testNodes.length; i++) { assertEquals("No bans on the node " + i, 0, testNodes[i].transportService.getTaskManager().getBanCount()); } - // Close the first node - testNodes[0].close(); + if (randomBoolean()) { + testNodes[0].close(); + } else { + for (TestNode blockOnNode : blockOnNodes) { + if (randomBoolean()) { + testNodes[0].transportService.disconnectFromNode(blockOnNode.discoveryNode()); + } else { + testNodes[0].transportService.getConnection(blockOnNode.discoveryNode()).close(); + } + } + } assertBusy(() -> { // Make sure that tasks are no longer running @@ -455,7 +465,6 @@ public class CancellableTasksTests extends TaskManagerTestCase { // Wait for clean up responseLatch.await(); - } public void testNonExistingTaskCancellation() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index f138fe45687..2bdd609b3b2 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.tasks.TaskCancellationService; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.tasks.MockTaskManager; @@ -91,8 +92,11 @@ public abstract class TaskManagerTestCase extends ESTestCase { @After public final void shutdownTestNodes() throws Exception { - for (TestNode testNode : testNodes) { - testNode.close(); + if (testNodes != null) { + for (TestNode testNode : testNodes) { + testNode.close(); + } + testNodes = null; } ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); threadPool = null; @@ -182,6 +186,7 @@ public abstract class TaskManagerTestCase extends ESTestCase { } } }; + transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService)); transportService.start(); clusterService = createClusterService(threadPool, discoveryNode.get()); clusterService.addStateApplier(transportService.getTaskManager()); diff --git a/server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java b/server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java new file mode 100644 index 00000000000..51bc2315535 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java @@ -0,0 +1,203 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.tasks; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.TransportTasksActionTests; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.FakeTcpChannel; +import org.elasticsearch.transport.TcpChannel; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Phaser; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class TaskManagerTests extends ESTestCase { + private ThreadPool threadPool; + + @Before + public void setupThreadPool() { + threadPool = new TestThreadPool(TransportTasksActionTests.class.getSimpleName()); + } + + @After + public void terminateThreadPool() { + terminate(threadPool); + } + + /** + * Makes sure that tasks that attempt to store themselves on completion retry if + * they don't succeed at first. + */ + public void testResultsServiceRetryTotalTime() { + Iterator times = TaskResultsService.STORE_BACKOFF_POLICY.iterator(); + long total = 0; + while (times.hasNext()) { + total += times.next().millis(); + } + assertEquals(600000L, total); + } + + public void testTrackingChannelTask() throws Exception { + final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); + Set cancelledTasks = new HashSet<>(); + taskManager.setTaskCancellationService(new TaskCancellationService(mock(TransportService.class)) { + @Override + void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener listener) { + assertThat(reason, equalTo("channel was closed")); + assertFalse(waitForCompletion); + assertTrue("task [" + task + "] was cancelled already", cancelledTasks.add(task)); + } + }); + Map> pendingTasks = new HashMap<>(); + Set expectedCancelledTasks = new HashSet<>(); + FakeTcpChannel[] channels = new FakeTcpChannel[randomIntBetween(1, 10)]; + List stopTrackingTasks = new ArrayList<>(); + for (int i = 0; i < channels.length; i++) { + channels[i] = new SingleThreadedTcpChannel(); + } + int iterations = randomIntBetween(1, 200); + for (int i = 0; i < iterations; i++) { + final List subset = randomSubsetOf(stopTrackingTasks); + stopTrackingTasks.removeAll(subset); + Releasables.close(subset); + final FakeTcpChannel channel = randomFrom(channels); + final Task task = taskManager.register("transport", "test", new CancellableRequest(Integer.toString(i))); + if (channel.isOpen() && randomBoolean()) { + channel.close(); + expectedCancelledTasks.addAll(pendingTasks.getOrDefault(channel, Collections.emptySet())); + } + final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(channel, (CancellableTask) task); + if (channel.isOpen()) { + pendingTasks.computeIfAbsent(channel, k -> new HashSet<>()).add(task); + stopTrackingTasks.add(() -> { + stopTracking.close(); + pendingTasks.get(channel).remove(task); + }); + } else { + expectedCancelledTasks.add(task); + } + } + assertBusy(() -> assertThat(cancelledTasks, equalTo(expectedCancelledTasks))); + for (FakeTcpChannel channel : channels) { + channel.close(); + } + assertThat(taskManager.numberOfChannelPendingTaskTrackers(), equalTo(0)); + } + + public void testTrackingTaskAndCloseChannelConcurrently() throws Exception { + final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); + Set cancelledTasks = ConcurrentCollections.newConcurrentSet(); + taskManager.setTaskCancellationService(new TaskCancellationService(mock(TransportService.class)) { + @Override + void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener listener) { + assertTrue("task [" + task + "] was cancelled already", cancelledTasks.add(task)); + } + }); + Set expectedCancelledTasks = ConcurrentCollections.newConcurrentSet(); + FakeTcpChannel[] channels = new FakeTcpChannel[randomIntBetween(2, 20)]; + for (int i = 0; i < channels.length; i++) { + channels[i] = new FakeTcpChannel(); + } + Thread[] threads = new Thread[randomIntBetween(2, 8)]; + Phaser phaser = new Phaser(threads.length); + for (int t = 0; t < threads.length; t++) { + String threadName = "thread-" + t; + threads[t] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + int iterations = randomIntBetween(100, 1000); + for (int i = 0; i < iterations; i++) { + final FakeTcpChannel channel = randomFrom(channels); + final Task task = taskManager.register("transport", "test", new CancellableRequest(threadName + ":" + i)); + expectedCancelledTasks.add(task); + taskManager.startTrackingCancellableChannelTask(channel, (CancellableTask) task); + if (randomInt(100) < 5) { + randomFrom(channels).close(); + } + } + }); + threads[t].start(); + } + for (FakeTcpChannel channel : channels) { + channel.close(); + } + for (Thread thread : threads) { + thread.join(); + } + assertBusy(() -> assertThat(cancelledTasks, equalTo(expectedCancelledTasks))); + assertThat(taskManager.numberOfChannelPendingTaskTrackers(), equalTo(0)); + } + + static class CancellableRequest extends TransportRequest { + private final String requestId; + + CancellableRequest(String requestId) { + this.requestId = requestId; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "request-" + requestId, parentTaskId, headers) { + @Override + public boolean shouldCancelChildrenOnCancellation() { + return false; + } + + @Override + public String toString() { + return getDescription(); + } + }; + } + } + + static class SingleThreadedTcpChannel extends FakeTcpChannel { + private boolean registeredListener = false; + + @Override + public void addCloseListener(ActionListener listener) { + if (isOpen()) { + assertFalse("listener was registered already", registeredListener); + registeredListener = true; + } + super.addCloseListener(listener); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/tasks/TaskResultsServiceTests.java b/server/src/test/java/org/elasticsearch/tasks/TaskResultsServiceTests.java deleted file mode 100644 index 7c896cb2159..00000000000 --- a/server/src/test/java/org/elasticsearch/tasks/TaskResultsServiceTests.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.tasks; - -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.ESTestCase; - -import java.util.Iterator; - -/** - * Makes sure that tasks that attempt to store themselves on completion retry if - * they don't succeed at first. - */ -public class TaskResultsServiceTests extends ESTestCase { - public void testRetryTotalTime() { - Iterator times = TaskResultsService.STORE_BACKOFF_POLICY.iterator(); - long total = 0; - while (times.hasNext()) { - total += times.next().millis(); - } - assertEquals(600000L, total); - } -} \ No newline at end of file