Move parentTaskId into TransportRequest

Now everything can have a parent!
This commit is contained in:
Nik Everett 2016-04-19 17:39:03 -04:00
parent 61f0b665b8
commit 2b56a42b69
4 changed files with 44 additions and 71 deletions

View File

@ -1,66 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
/**
* Base class for action requests that can have associated child tasks
*/
public abstract class ChildTaskActionRequest<Request extends ActionRequest<Request>> extends ActionRequest<Request> {
private TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
protected ChildTaskActionRequest() {
}
public void setParentTask(String parentTaskNode, long parentTaskId) {
this.parentTaskId = new TaskId(parentTaskNode, parentTaskId);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
parentTaskId = TaskId.readFromStream(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
parentTaskId.writeTo(out);
}
@Override
public final Task createTask(long id, String type, String action) {
return createTask(id, type, action, parentTaskId);
}
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new Task(id, type, action, getDescription(), parentTaskId);
}
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.action.support.master; package org.elasticsearch.action.support.master;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.support.ChildTaskActionRequest;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -30,7 +29,7 @@ import java.io.IOException;
/** /**
* A based request for master based operation. * A based request for master based operation.
*/ */
public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ChildTaskActionRequest<Request> { public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ActionRequest<Request> {
public static final TimeValue DEFAULT_MASTER_NODE_TIMEOUT = TimeValue.timeValueSeconds(30); public static final TimeValue DEFAULT_MASTER_NODE_TIMEOUT = TimeValue.timeValueSeconds(30);

View File

@ -19,10 +19,10 @@
package org.elasticsearch.action.support.replication; package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.ChildTaskActionRequest;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -40,7 +40,8 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
/** /**
* *
*/ */
public abstract class ReplicationRequest<Request extends ReplicationRequest<Request>> extends ChildTaskActionRequest<Request> implements IndicesRequest { public abstract class ReplicationRequest<Request extends ReplicationRequest<Request>> extends ActionRequest<Request>
implements IndicesRequest {
public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);

View File

@ -19,19 +19,42 @@
package org.elasticsearch.transport; package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
/** /**
*/ */
public abstract class TransportRequest extends TransportMessage { public abstract class TransportRequest extends TransportMessage {
public static class Empty extends TransportRequest { public static class Empty extends TransportRequest {
public static final Empty INSTANCE = new Empty(); public static final Empty INSTANCE = new Empty();
} }
/**
* Parent of this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "no parent".
*/
private TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
public TransportRequest() { public TransportRequest() {
} }
/**
* Set a reference to task that caused this task to be run.
*/
public void setParentTask(String parentTaskNode, long parentTaskId) {
setParentTask(new TaskId(parentTaskNode, parentTaskId));
}
/**
* Set a reference to task that caused this task to be run.
*/
public void setParentTask(TaskId taskId) {
this.parentTaskId = taskId;
}
/** /**
* Returns the task object that should be used to keep track of the processing of the request. * Returns the task object that should be used to keep track of the processing of the request.
* *
@ -41,10 +64,26 @@ public abstract class TransportRequest extends TransportMessage {
return new Task(id, type, action, getDescription()); return new Task(id, type, action, getDescription());
} }
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new Task(id, type, action, getDescription(), parentTaskId);
}
/** /**
* Returns optional description of the request to be displayed by the task manager * Returns optional description of the request to be displayed by the task manager
*/ */
public String getDescription() { public String getDescription() {
return ""; return "";
} }
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
parentTaskId = TaskId.readFromStream(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
parentTaskId.writeTo(out);
}
} }