diff --git a/server/src/main/java/org/elasticsearch/action/support/TransportAction.java b/server/src/main/java/org/elasticsearch/action/support/TransportAction.java index 9db5bfd84b5..e861a9f8d3f 100644 --- a/server/src/main/java/org/elasticsearch/action/support/TransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/TransportAction.java @@ -59,23 +59,19 @@ public abstract class TransportAction() { - @Override - public void onResponse(Response response) { - taskManager.unregister(task); - listener.onResponse(response); - } + execute(task, request, new ActionListener() { + @Override + public void onResponse(Response response) { + taskManager.unregister(task); + listener.onResponse(response); + } - @Override - public void onFailure(Exception e) { - taskManager.unregister(task); - listener.onFailure(e); - } - }); - } + @Override + public void onFailure(Exception e) { + taskManager.unregister(task); + listener.onFailure(e); + } + }); return task; } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java b/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java index 86ba59ebcc8..0b9dfba3e83 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java @@ -45,9 +45,6 @@ public interface TaskAwareRequest { /** * Returns the task object that should be used to keep track of the processing of the request. - * - * A request can override this method and return null to avoid being tracked by the task - * manager. */ default Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { return new Task(id, type, action, getDescription(), parentTaskId, headers); diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index 80427b19723..73af26cfc70 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -45,6 +45,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -91,8 +92,6 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie /** * Registers a task without parent task - *

- * Returns the task manager tracked task or null if the task doesn't support the task manager */ public Task register(String type, String action, TaskAwareRequest request) { Map headers = new HashMap<>(); @@ -110,9 +109,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie } } Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers); - if (task == null) { - return null; - } + Objects.requireNonNull(task); assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId"; if (logger.isTraceEnabled()) { logger.trace("register {} [{}] [{}] [{}]", task.getId(), type, action, task.getDescription()); diff --git a/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java b/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java index 4e09daf9ccf..7887dd2c7ca 100644 --- a/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java +++ b/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java @@ -58,17 +58,13 @@ public class RequestHandlerRegistry { public void processMessageReceived(Request request, TransportChannel channel) throws Exception { final Task task = taskManager.register(channel.getChannelType(), action, request); - if (task == null) { - handler.messageReceived(request, channel, null); - } else { - boolean success = false; - try { - handler.messageReceived(request, new TaskTransportChannel(taskManager, task, channel), task); - success = true; - } finally { - if (success == false) { - taskManager.unregister(task); - } + boolean success = false; + try { + handler.messageReceived(request, new TaskTransportChannel(taskManager, task, channel), task); + success = true; + } finally { + if (success == false) { + taskManager.unregister(task); } } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java index 9175bc69bf6..edc79db7942 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java @@ -79,7 +79,6 @@ public class TransportTasksActionTests extends TaskManagerTestCase { public static class NodeRequest extends BaseNodeRequest { protected String requestName; - private boolean enableTaskManager; public NodeRequest() { super(); @@ -88,82 +87,63 @@ public class TransportTasksActionTests extends TaskManagerTestCase { public NodeRequest(NodesRequest request, String nodeId) { super(nodeId); requestName = request.requestName; - enableTaskManager = request.enableTaskManager; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); requestName = in.readString(); - enableTaskManager = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(requestName); - out.writeBoolean(enableTaskManager); } @Override public String getDescription() { - return "CancellableNodeRequest[" + requestName + ", " + enableTaskManager + "]"; + return "CancellableNodeRequest[" + requestName + "]"; } @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - if (enableTaskManager) { - return super.createTask(id, type, action, parentTaskId, headers); - } else { - return null; - } + return super.createTask(id, type, action, parentTaskId, headers); } } public static class NodesRequest extends BaseNodesRequest { private String requestName; - private boolean enableTaskManager; NodesRequest() { super(); } public NodesRequest(String requestName, String... nodesIds) { - this(requestName, true, nodesIds); - } - - public NodesRequest(String requestName, boolean enableTaskManager, String... nodesIds) { super(nodesIds); this.requestName = requestName; - this.enableTaskManager = enableTaskManager; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); requestName = in.readString(); - enableTaskManager = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(requestName); - out.writeBoolean(enableTaskManager); } @Override public String getDescription() { - return "CancellableNodesRequest[" + requestName + ", " + enableTaskManager + "]"; + return "CancellableNodesRequest[" + requestName + "]"; } @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - if (enableTaskManager) { - return super.createTask(id, type, action, parentTaskId, headers); - } else { - return null; - } + return super.createTask(id, type, action, parentTaskId, headers); } } @@ -400,7 +380,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { assertEquals(testNodes.length, response.getPerNodeTasks().size()); for (Map.Entry> entry : response.getPerNodeTasks().entrySet()) { assertEquals(1, entry.getValue().size()); - assertEquals("CancellableNodeRequest[Test Request, true]", entry.getValue().get(0).getDescription()); + assertEquals("CancellableNodeRequest[Test Request]", entry.getValue().get(0).getDescription()); } // Make sure that the main task on coordinating node is the task that was returned to us by execute() @@ -455,27 +435,6 @@ public class TransportTasksActionTests extends TaskManagerTestCase { assertEquals(0, responses.failureCount()); } - public void testTaskManagementOptOut() throws Exception { - setupTestNodes(Settings.EMPTY); - connectNodes(testNodes); - CountDownLatch checkLatch = new CountDownLatch(1); - // Starting actions that disable task manager - ActionFuture future = startBlockingTestNodesAction(checkLatch, new NodesRequest("Test Request", false)); - - TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)]; - - // Get the parent task - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setActions("testAction*"); - ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest); - assertEquals(0, response.getTasks().size()); - - // Release all tasks and wait for response - checkLatch.countDown(); - NodesResponse responses = future.get(); - assertEquals(0, responses.failureCount()); - } - public void testTasksDescriptions() throws Exception { long minimalStartTime = System.currentTimeMillis(); setupTestNodes(Settings.EMPTY); @@ -502,7 +461,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { assertEquals(testNodes.length, response.getPerNodeTasks().size()); for (Map.Entry> entry : response.getPerNodeTasks().entrySet()) { assertEquals(1, entry.getValue().size()); - assertEquals("CancellableNodeRequest[Test Request, true]", entry.getValue().get(0).getDescription()); + assertEquals("CancellableNodeRequest[Test Request]", entry.getValue().get(0).getDescription()); assertThat(entry.getValue().get(0).getStartTime(), greaterThanOrEqualTo(minimalStartTime)); assertThat(entry.getValue().get(0).getRunningTimeNanos(), greaterThanOrEqualTo(minimalDurationNanos)); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java index 41cdaefe035..0133a8be0c0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java @@ -50,17 +50,15 @@ public class MockTaskManager extends TaskManager { @Override public Task register(String type, String action, TaskAwareRequest request) { Task task = super.register(type, action, request); - if (task != null) { - for (MockTaskManagerListener listener : listeners) { - try { - listener.onTaskRegistered(task); - } catch (Exception e) { - logger.warn( - (Supplier) () -> new ParameterizedMessage( - "failed to notify task manager listener about registering the task with id {}", - task.getId()), - e); - } + for (MockTaskManagerListener listener : listeners) { + try { + listener.onTaskRegistered(task); + } catch (Exception e) { + logger.warn( + (Supplier) () -> new ParameterizedMessage( + "failed to notify task manager listener about registering the task with id {}", + task.getId()), + e); } } return task; diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java index 61e99d2dc85..55393b1860b 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.monitoring.action; -import java.util.concurrent.ExecutorService; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.ActionFilter; @@ -28,6 +27,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskAwareRequest; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; @@ -51,6 +51,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutorService; import static org.elasticsearch.Version.CURRENT; import static org.hamcrest.Matchers.containsString; @@ -98,7 +99,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { filters = mock(ActionFilters.class); when(transportService.getTaskManager()).thenReturn(taskManager); - when(taskManager.register(anyString(), eq(MonitoringBulkAction.NAME), any(TaskAwareRequest.class))).thenReturn(null); + when(taskManager.register(anyString(), eq(MonitoringBulkAction.NAME), any(TaskAwareRequest.class))).thenReturn(mock(Task.class)); when(filters.filters()).thenReturn(new ActionFilter[0]); when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executor);