Task Manager should be able to support non-transport tasks (#23619)

Currently the task manager is tied to the transport and can only create tasks based on TransportRequests. This commit enables task manager to support tasks created by non-transport services such as the persistent tasks service.
This commit is contained in:
Igor Motov 2017-03-17 19:29:18 -04:00 committed by GitHub
parent f30f18285c
commit 1bd66136d7
4 changed files with 67 additions and 32 deletions

View File

@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tasks;
/**
* An interface for a request that can be used to register a task manager task
*/
public interface TaskAwareRequest {
/**
* Set a reference to task that caused this task to be run.
*/
default void setParentTask(String parentTaskNode, long parentTaskId) {
setParentTask(new TaskId(parentTaskNode, parentTaskId));
}
/**
* Set a reference to task that created this request.
*/
void setParentTask(TaskId taskId);
/**
* Get a reference to the task that created this request. Implementers should default to
* {@link TaskId#EMPTY_TASK_ID}, meaning "there is no parent".
*/
TaskId getParentTask();
/**
* Returns the task object that should be used to keep track of the processing of the request.
*
* A request can override this method and return null to avoid being tracked by the task
* manager.
*/
default Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new Task(id, type, action, getDescription(), parentTaskId);
}
/**
* Returns optional description of the request to be displayed by the task manager
*/
default String getDescription() {
return "";
}
}

View File

@ -35,18 +35,14 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@ -83,7 +79,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
* <p>
* Returns the task manager tracked task or null if the task doesn't support the task manager
*/
public Task register(String type, String action, TransportRequest request) {
public Task register(String type, String action, TaskAwareRequest request) {
Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask());
if (task == null) {
return null;

View File

@ -21,12 +21,12 @@ package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
public abstract class TransportRequest extends TransportMessage {
public abstract class TransportRequest extends TransportMessage implements TaskAwareRequest {
public static class Empty extends TransportRequest {
public static final Empty INSTANCE = new Empty();
}
@ -39,16 +39,10 @@ public abstract class TransportRequest extends TransportMessage {
public TransportRequest() {
}
/**
* Set a reference to task that caused this task to be run.
*/
public void setParentTask(String parentTaskNode, long parentTaskId) {
setParentTask(new TaskId(parentTaskNode, parentTaskId));
}
/**
* Set a reference to task that created this request.
*/
@Override
public void setParentTask(TaskId taskId) {
this.parentTaskId = taskId;
}
@ -56,26 +50,11 @@ public abstract class TransportRequest extends TransportMessage {
/**
* Get a reference to the task that created this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "there is no parent".
*/
@Override
public TaskId getParentTask() {
return parentTaskId;
}
/**
* Returns the task object that should be used to keep track of the processing of the request.
*
* A request can override this method and return null to avoid being tracked by the task manager.
*/
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new Task(id, type, action, getDescription(), parentTaskId);
}
/**
* Returns optional description of the request to be displayed by the task manager
*/
public String getDescription() {
return "";
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -25,8 +25,8 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.transport.TransportRequest;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
@ -46,7 +46,7 @@ public class MockTaskManager extends TaskManager {
}
@Override
public Task register(String type, String action, TransportRequest request) {
public Task register(String type, String action, TaskAwareRequest request) {
Task task = super.register(type, action, request);
if (task != null) {
for (MockTaskManagerListener listener : listeners) {