Core: Require all actions have a Task (#31627)

The TaskManager and TaskAwareRequest could return null when registering
a task according to their javadocs, but no implementations ever actually
did that. This commit removes that wording from the javadocs and ensures
null is no longer allowed.
This commit is contained in:
Ryan Ernst 2018-06-28 08:24:03 -07:00 committed by GitHub
parent 0522c6644d
commit f924835265
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 39 additions and 95 deletions

View File

@ -59,23 +59,19 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
* this method.
*/
Task task = taskManager.register("transport", actionName, request);
if (task == null) {
execute(null, request, listener);
} else {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
taskManager.unregister(task);
listener.onResponse(response);
}
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
taskManager.unregister(task);
listener.onResponse(response);
}
@Override
public void onFailure(Exception e) {
taskManager.unregister(task);
listener.onFailure(e);
}
});
}
@Override
public void onFailure(Exception e) {
taskManager.unregister(task);
listener.onFailure(e);
}
});
return task;
}

View File

@ -45,9 +45,6 @@ public interface TaskAwareRequest {
/**
* Returns the task object that should be used to keep track of the processing of the request.
*
* A request can override this method and return null to avoid being tracked by the task
* manager.
*/
default Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new Task(id, type, action, getDescription(), parentTaskId, headers);

View File

@ -45,6 +45,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@ -91,8 +92,6 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
/**
* Registers a task without parent task
* <p>
* Returns the task manager tracked task or null if the task doesn't support the task manager
*/
public Task register(String type, String action, TaskAwareRequest request) {
Map<String, String> headers = new HashMap<>();
@ -110,9 +109,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
}
}
Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers);
if (task == null) {
return null;
}
Objects.requireNonNull(task);
assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId";
if (logger.isTraceEnabled()) {
logger.trace("register {} [{}] [{}] [{}]", task.getId(), type, action, task.getDescription());

View File

@ -58,17 +58,13 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
final Task task = taskManager.register(channel.getChannelType(), action, request);
if (task == null) {
handler.messageReceived(request, channel, null);
} else {
boolean success = false;
try {
handler.messageReceived(request, new TaskTransportChannel(taskManager, task, channel), task);
success = true;
} finally {
if (success == false) {
taskManager.unregister(task);
}
boolean success = false;
try {
handler.messageReceived(request, new TaskTransportChannel(taskManager, task, channel), task);
success = true;
} finally {
if (success == false) {
taskManager.unregister(task);
}
}
}

View File

@ -79,7 +79,6 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
public static class NodeRequest extends BaseNodeRequest {
protected String requestName;
private boolean enableTaskManager;
public NodeRequest() {
super();
@ -88,82 +87,63 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
public NodeRequest(NodesRequest request, String nodeId) {
super(nodeId);
requestName = request.requestName;
enableTaskManager = request.enableTaskManager;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
enableTaskManager = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(requestName);
out.writeBoolean(enableTaskManager);
}
@Override
public String getDescription() {
return "CancellableNodeRequest[" + requestName + ", " + enableTaskManager + "]";
return "CancellableNodeRequest[" + requestName + "]";
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
if (enableTaskManager) {
return super.createTask(id, type, action, parentTaskId, headers);
} else {
return null;
}
return super.createTask(id, type, action, parentTaskId, headers);
}
}
public static class NodesRequest extends BaseNodesRequest<NodesRequest> {
private String requestName;
private boolean enableTaskManager;
NodesRequest() {
super();
}
public NodesRequest(String requestName, String... nodesIds) {
this(requestName, true, nodesIds);
}
public NodesRequest(String requestName, boolean enableTaskManager, String... nodesIds) {
super(nodesIds);
this.requestName = requestName;
this.enableTaskManager = enableTaskManager;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
enableTaskManager = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(requestName);
out.writeBoolean(enableTaskManager);
}
@Override
public String getDescription() {
return "CancellableNodesRequest[" + requestName + ", " + enableTaskManager + "]";
return "CancellableNodesRequest[" + requestName + "]";
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
if (enableTaskManager) {
return super.createTask(id, type, action, parentTaskId, headers);
} else {
return null;
}
return super.createTask(id, type, action, parentTaskId, headers);
}
}
@ -400,7 +380,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertEquals("CancellableNodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
assertEquals("CancellableNodeRequest[Test Request]", entry.getValue().get(0).getDescription());
}
// Make sure that the main task on coordinating node is the task that was returned to us by execute()
@ -455,27 +435,6 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
assertEquals(0, responses.failureCount());
}
public void testTaskManagementOptOut() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);
CountDownLatch checkLatch = new CountDownLatch(1);
// Starting actions that disable task manager
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch, new NodesRequest("Test Request", false));
TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
// Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction*");
ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(0, response.getTasks().size());
// Release all tasks and wait for response
checkLatch.countDown();
NodesResponse responses = future.get();
assertEquals(0, responses.failureCount());
}
public void testTasksDescriptions() throws Exception {
long minimalStartTime = System.currentTimeMillis();
setupTestNodes(Settings.EMPTY);
@ -502,7 +461,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertEquals("CancellableNodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
assertEquals("CancellableNodeRequest[Test Request]", entry.getValue().get(0).getDescription());
assertThat(entry.getValue().get(0).getStartTime(), greaterThanOrEqualTo(minimalStartTime));
assertThat(entry.getValue().get(0).getRunningTimeNanos(), greaterThanOrEqualTo(minimalDurationNanos));
}

View File

@ -50,17 +50,15 @@ public class MockTaskManager extends TaskManager {
@Override
public Task register(String type, String action, TaskAwareRequest request) {
Task task = super.register(type, action, request);
if (task != null) {
for (MockTaskManagerListener listener : listeners) {
try {
listener.onTaskRegistered(task);
} catch (Exception e) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to notify task manager listener about registering the task with id {}",
task.getId()),
e);
}
for (MockTaskManagerListener listener : listeners) {
try {
listener.onTaskRegistered(task);
} catch (Exception e) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to notify task manager listener about registering the task with id {}",
task.getId()),
e);
}
}
return task;

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.monitoring.action;
import java.util.concurrent.ExecutorService;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilter;
@ -28,6 +27,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
@ -51,6 +51,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import static org.elasticsearch.Version.CURRENT;
import static org.hamcrest.Matchers.containsString;
@ -98,7 +99,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
filters = mock(ActionFilters.class);
when(transportService.getTaskManager()).thenReturn(taskManager);
when(taskManager.register(anyString(), eq(MonitoringBulkAction.NAME), any(TaskAwareRequest.class))).thenReturn(null);
when(taskManager.register(anyString(), eq(MonitoringBulkAction.NAME), any(TaskAwareRequest.class))).thenReturn(mock(Task.class));
when(filters.filters()).thenReturn(new ActionFilter[0]);
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executor);