Add task status
Implements a simple task status for superclasses of ReplicationRequest to show how you can do use the status.
This commit is contained in:
parent
f3ab5d2f4c
commit
a2f07679fd
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.action.admin.cluster.node.tasks.list;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
|
||||
import org.elasticsearch.action.FailedNodeException;
|
||||
import org.elasticsearch.action.TaskOperationFailure;
|
||||
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
|
||||
|
@ -111,7 +112,7 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
|
|||
|
||||
if (getNodeFailures() != null && getNodeFailures().size() > 0) {
|
||||
builder.startArray("node_failures");
|
||||
for (FailedNodeException ex : getNodeFailures()){
|
||||
for (FailedNodeException ex : getNodeFailures()) {
|
||||
builder.value(ex);
|
||||
}
|
||||
builder.endArray();
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -48,20 +49,23 @@ public class TaskInfo implements Writeable<TaskInfo>, ToXContent {
|
|||
|
||||
private final String description;
|
||||
|
||||
private final Task.Status status;
|
||||
|
||||
private final String parentNode;
|
||||
|
||||
private final long parentId;
|
||||
|
||||
public TaskInfo(DiscoveryNode node, long id, String type, String action, String description) {
|
||||
this(node, id, type, action, description, null, -1L);
|
||||
public TaskInfo(DiscoveryNode node, long id, String type, String action, String description, Task.Status status) {
|
||||
this(node, id, type, action, description, status, null, -1L);
|
||||
}
|
||||
|
||||
public TaskInfo(DiscoveryNode node, long id, String type, String action, String description, String parentNode, long parentId) {
|
||||
public TaskInfo(DiscoveryNode node, long id, String type, String action, String description, Task.Status status, String parentNode, long parentId) {
|
||||
this.node = node;
|
||||
this.id = id;
|
||||
this.type = type;
|
||||
this.action = action;
|
||||
this.description = description;
|
||||
this.status = status;
|
||||
this.parentNode = parentNode;
|
||||
this.parentId = parentId;
|
||||
}
|
||||
|
@ -72,6 +76,11 @@ public class TaskInfo implements Writeable<TaskInfo>, ToXContent {
|
|||
type = in.readString();
|
||||
action = in.readString();
|
||||
description = in.readOptionalString();
|
||||
if (in.readBoolean()) {
|
||||
status = in.readTaskStatus();
|
||||
} else {
|
||||
status = null;
|
||||
}
|
||||
parentNode = in.readOptionalString();
|
||||
parentId = in.readLong();
|
||||
}
|
||||
|
@ -96,6 +105,14 @@ public class TaskInfo implements Writeable<TaskInfo>, ToXContent {
|
|||
return description;
|
||||
}
|
||||
|
||||
/**
|
||||
* The status of the running task. Only available if TaskInfos were build
|
||||
* with the detailed flag.
|
||||
*/
|
||||
public Task.Status getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public String getParentNode() {
|
||||
return parentNode;
|
||||
}
|
||||
|
@ -116,6 +133,12 @@ public class TaskInfo implements Writeable<TaskInfo>, ToXContent {
|
|||
out.writeString(type);
|
||||
out.writeString(action);
|
||||
out.writeOptionalString(description);
|
||||
if (status != null) {
|
||||
out.writeBoolean(true);
|
||||
out.writeTaskStatus(status);
|
||||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
out.writeOptionalString(parentNode);
|
||||
out.writeLong(parentId);
|
||||
}
|
||||
|
@ -127,6 +150,9 @@ public class TaskInfo implements Writeable<TaskInfo>, ToXContent {
|
|||
builder.field("id", id);
|
||||
builder.field("type", type);
|
||||
builder.field("action", action);
|
||||
if (status != null) {
|
||||
builder.field("status", status, params);
|
||||
}
|
||||
if (description != null) {
|
||||
builder.field("description", description);
|
||||
}
|
||||
|
|
|
@ -44,6 +44,20 @@ public abstract class ChildTaskActionRequest<Request extends ActionRequest<Reque
|
|||
this.parentTaskId = parentTaskId;
|
||||
}
|
||||
|
||||
/**
|
||||
* The node that owns the parent task.
|
||||
*/
|
||||
public String getParentTaskNode() {
|
||||
return parentTaskNode;
|
||||
}
|
||||
|
||||
/**
|
||||
* The task id of the parent task on the parent node.
|
||||
*/
|
||||
public long getParentTaskId() {
|
||||
return parentTaskId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.action.support.replication;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
|
@ -30,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -195,6 +195,11 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
|
|||
out.writeVLong(routedBasedOnClusterVersion);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action) {
|
||||
return new ReplicationTask(id, type, action, this::getDescription, getParentTaskNode(), getParentTaskId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the target shard id for the request. The shard id is set when a
|
||||
* index/delete request is resolved by the transport action
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.replication;
|
||||
|
||||
import org.elasticsearch.common.inject.Provider;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
/**
|
||||
* Task that tracks replication actions.
|
||||
*/
|
||||
public class ReplicationTask extends Task {
|
||||
private volatile String phase = "starting";
|
||||
|
||||
public ReplicationTask(long id, String type, String action, Provider<String> description, String parentNode, long parentId) {
|
||||
super(id, type, action, description, parentNode, parentId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the current phase of the task.
|
||||
*/
|
||||
public void setPhase(String phase) {
|
||||
this.phase = phase;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current phase of the task.
|
||||
*/
|
||||
public String getPhase() {
|
||||
return phase;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status getStatus() {
|
||||
return new Status(phase);
|
||||
}
|
||||
|
||||
public static class Status implements Task.Status {
|
||||
public static final Status PROTOTYPE = new Status("prototype");
|
||||
|
||||
private final String phase;
|
||||
|
||||
public Status(String phase) {
|
||||
this.phase = requireNonNull(phase, "Phase cannot be null");
|
||||
}
|
||||
|
||||
public Status(StreamInput in) throws IOException {
|
||||
phase = in.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return "replication";
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field("phase", phase);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(phase);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status readFrom(StreamInput in) throws IOException {
|
||||
return new Status(in);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -142,7 +142,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
new ReroutePhase(task, request, listener).run();
|
||||
new ReroutePhase((ReplicationTask) task, request, listener).run();
|
||||
}
|
||||
|
||||
protected abstract Response newResponseInstance();
|
||||
|
@ -283,14 +283,24 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
class PrimaryOperationTransportHandler implements TransportRequestHandler<Request> {
|
||||
@Override
|
||||
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
|
||||
new PrimaryPhase(request, channel).run();
|
||||
throw new UnsupportedOperationException("the task parameter is required for this operation");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(Request request, TransportChannel channel, Task task) throws Exception {
|
||||
new PrimaryPhase((ReplicationTask) task, request, channel).run();
|
||||
}
|
||||
}
|
||||
|
||||
class ReplicaOperationTransportHandler implements TransportRequestHandler<ReplicaRequest> {
|
||||
@Override
|
||||
public void messageReceived(final ReplicaRequest request, final TransportChannel channel) throws Exception {
|
||||
new AsyncReplicaAction(request, channel).run();
|
||||
throw new UnsupportedOperationException("the task parameter is required for this operation");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ReplicaRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
new AsyncReplicaAction(request, channel, (ReplicationTask) task).run();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -309,13 +319,18 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
private final class AsyncReplicaAction extends AbstractRunnable {
|
||||
private final ReplicaRequest request;
|
||||
private final TransportChannel channel;
|
||||
/**
|
||||
* The task on the node with the replica shard.
|
||||
*/
|
||||
private final ReplicationTask task;
|
||||
// important: we pass null as a timeout as failing a replica is
|
||||
// something we want to avoid at all costs
|
||||
private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
|
||||
|
||||
AsyncReplicaAction(ReplicaRequest request, TransportChannel channel) {
|
||||
AsyncReplicaAction(ReplicaRequest request, TransportChannel channel, ReplicationTask task) {
|
||||
this.request = request;
|
||||
this.channel = channel;
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -385,6 +400,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
setPhase(task, "replica");
|
||||
assert request.shardId() != null : "request shardId must be set";
|
||||
try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId())) {
|
||||
shardOperationOnReplica(request);
|
||||
|
@ -392,6 +408,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), request);
|
||||
}
|
||||
}
|
||||
setPhase(task, "finished");
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
}
|
||||
|
@ -417,15 +434,17 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
final class ReroutePhase extends AbstractRunnable {
|
||||
private final ActionListener<Response> listener;
|
||||
private final Request request;
|
||||
private final ReplicationTask task;
|
||||
private final ClusterStateObserver observer;
|
||||
private final AtomicBoolean finished = new AtomicBoolean();
|
||||
|
||||
ReroutePhase(Task task, Request request, ActionListener<Response> listener) {
|
||||
ReroutePhase(ReplicationTask task, Request request, ActionListener<Response> listener) {
|
||||
this.request = request;
|
||||
if (task != null) {
|
||||
this.request.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
}
|
||||
this.listener = listener;
|
||||
this.task = task;
|
||||
this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
|
||||
}
|
||||
|
||||
|
@ -436,6 +455,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
setPhase(task, "routing");
|
||||
final ClusterState state = observer.observedState();
|
||||
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel());
|
||||
if (blockException != null) {
|
||||
|
@ -467,6 +487,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
}
|
||||
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
|
||||
if (primary.currentNodeId().equals(state.nodes().localNodeId())) {
|
||||
setPhase(task, "waiting_on_primary");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}] ", transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId());
|
||||
}
|
||||
|
@ -484,6 +505,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}]", actionName, request.shardId(), request, state.version(), primary.currentNodeId());
|
||||
}
|
||||
setPhase(task, "rerouted");
|
||||
performAction(node, actionName, false);
|
||||
}
|
||||
}
|
||||
|
@ -540,6 +562,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
finishAsFailed(failure);
|
||||
return;
|
||||
}
|
||||
setPhase(task, "waiting_for_retry");
|
||||
final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
|
||||
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
|
@ -564,6 +587,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
|
||||
void finishAsFailed(Throwable failure) {
|
||||
if (finished.compareAndSet(false, true)) {
|
||||
setPhase(task, "failed");
|
||||
logger.trace("operation failed. action [{}], request [{}]", failure, actionName, request);
|
||||
listener.onFailure(failure);
|
||||
} else {
|
||||
|
@ -574,6 +598,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
void finishWithUnexpectedFailure(Throwable failure) {
|
||||
logger.warn("unexpected error during the primary phase for action [{}], request [{}]", failure, actionName, request);
|
||||
if (finished.compareAndSet(false, true)) {
|
||||
setPhase(task, "failed");
|
||||
listener.onFailure(failure);
|
||||
} else {
|
||||
assert false : "finishWithUnexpectedFailure called but operation is already finished";
|
||||
|
@ -582,6 +607,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
|
||||
void finishOnSuccess(Response response) {
|
||||
if (finished.compareAndSet(false, true)) {
|
||||
setPhase(task, "finished");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("operation succeeded. action [{}],request [{}]", actionName, request);
|
||||
}
|
||||
|
@ -603,6 +629,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
* Note that as soon as we move to replication action, state responsibility is transferred to {@link ReplicationPhase}.
|
||||
*/
|
||||
class PrimaryPhase extends AbstractRunnable {
|
||||
private final ReplicationTask task;
|
||||
private final Request request;
|
||||
private final ShardId shardId;
|
||||
private final TransportChannel channel;
|
||||
|
@ -610,8 +637,9 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
private final AtomicBoolean finished = new AtomicBoolean();
|
||||
private IndexShardReference indexShardReference;
|
||||
|
||||
PrimaryPhase(Request request, TransportChannel channel) {
|
||||
PrimaryPhase(ReplicationTask task, Request request, TransportChannel channel) {
|
||||
this.state = clusterService.state();
|
||||
this.task = task;
|
||||
this.request = request;
|
||||
assert request.shardId() != null : "request shardId must be set prior to primary phase";
|
||||
this.shardId = request.shardId();
|
||||
|
@ -634,6 +662,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
setPhase(task, "primary");
|
||||
// request shardID was set in ReroutePhase
|
||||
final String writeConsistencyFailure = checkWriteConsistency(shardId);
|
||||
if (writeConsistencyFailure != null) {
|
||||
|
@ -648,7 +677,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
|
||||
}
|
||||
ReplicationPhase replicationPhase = new ReplicationPhase(primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference);
|
||||
ReplicationPhase replicationPhase = new ReplicationPhase(task, primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference);
|
||||
finishAndMoveToReplication(replicationPhase);
|
||||
} else {
|
||||
// delegate primary phase to relocation target
|
||||
|
@ -728,6 +757,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
*/
|
||||
void finishAsFailed(Throwable failure) {
|
||||
if (finished.compareAndSet(false, true)) {
|
||||
setPhase(task, "failed");
|
||||
Releasables.close(indexShardReference);
|
||||
logger.trace("operation failed", failure);
|
||||
try {
|
||||
|
@ -770,7 +800,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
* relocating copies
|
||||
*/
|
||||
final class ReplicationPhase extends AbstractRunnable {
|
||||
|
||||
private final ReplicationTask task;
|
||||
private final ReplicaRequest replicaRequest;
|
||||
private final Response finalResponse;
|
||||
private final TransportChannel channel;
|
||||
|
@ -785,8 +815,9 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
private final int totalShards;
|
||||
private final IndexShardReference indexShardReference;
|
||||
|
||||
public ReplicationPhase(ReplicaRequest replicaRequest, Response finalResponse, ShardId shardId,
|
||||
public ReplicationPhase(ReplicationTask task, ReplicaRequest replicaRequest, Response finalResponse, ShardId shardId,
|
||||
TransportChannel channel, IndexShardReference indexShardReference) {
|
||||
this.task = task;
|
||||
this.replicaRequest = replicaRequest;
|
||||
this.channel = channel;
|
||||
this.finalResponse = finalResponse;
|
||||
|
@ -870,6 +901,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
*/
|
||||
@Override
|
||||
protected void doRun() {
|
||||
setPhase(task, "replicating");
|
||||
if (pending.get() == 0) {
|
||||
doFinish();
|
||||
return;
|
||||
|
@ -981,6 +1013,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
}
|
||||
|
||||
private void forceFinishAsFailed(Throwable t) {
|
||||
setPhase(task, "failed");
|
||||
if (finished.compareAndSet(false, true)) {
|
||||
Releasables.close(indexShardReference);
|
||||
try {
|
||||
|
@ -994,6 +1027,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
|
||||
private void doFinish() {
|
||||
if (finished.compareAndSet(false, true)) {
|
||||
setPhase(task, "finished");
|
||||
Releasables.close(indexShardReference);
|
||||
final ReplicationResponse.ShardInfo.Failure[] failuresArray;
|
||||
if (!shardReplicaFailures.isEmpty()) {
|
||||
|
@ -1082,4 +1116,14 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
}
|
||||
indexShard.maybeFlush();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the current phase on the task if it isn't null. Pulled into its own
|
||||
* method because its more convenient that way.
|
||||
*/
|
||||
static void setPhase(ReplicationTask task, String phase) {
|
||||
if (task != null) {
|
||||
task.setPhase(phase);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.elasticsearch.common.text.Text;
|
|||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
|
||||
import org.elasticsearch.search.rescore.RescoreBuilder;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
|
@ -690,6 +691,13 @@ public abstract class StreamInput extends InputStream {
|
|||
return readNamedWriteable(ScoreFunctionBuilder.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a {@link Task.Status} from the current stream.
|
||||
*/
|
||||
public Task.Status readTaskStatus() throws IOException {
|
||||
return readNamedWriteable(Task.Status.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a list of objects
|
||||
*/
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.common.text.Text;
|
|||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
|
||||
import org.elasticsearch.search.rescore.RescoreBuilder;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.joda.time.ReadableInstant;
|
||||
|
||||
import java.io.EOFException;
|
||||
|
@ -660,6 +661,13 @@ public abstract class StreamOutput extends OutputStream {
|
|||
writeNamedWriteable(scoreFunctionBuilder);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a {@link Task.Status} to the current stream.
|
||||
*/
|
||||
public void writeTaskStatus(Task.Status status) throws IOException {
|
||||
writeNamedWriteable(status);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the given {@link GeoPoint} to the stream
|
||||
*/
|
||||
|
|
|
@ -23,6 +23,8 @@ package org.elasticsearch.tasks;
|
|||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.inject.Provider;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
|
||||
/**
|
||||
* Current task information
|
||||
|
@ -57,9 +59,24 @@ public class Task {
|
|||
this.parentId = parentId;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Build a version of the task status you can throw over the wire and back
|
||||
* to the user.
|
||||
*
|
||||
* @param node
|
||||
* the node this task is running on
|
||||
* @param detailed
|
||||
* should the information include detailed, potentially slow to
|
||||
* generate data?
|
||||
*/
|
||||
public TaskInfo taskInfo(DiscoveryNode node, boolean detailed) {
|
||||
return new TaskInfo(node, getId(), getType(), getAction(), detailed ? getDescription() : null, parentNode, parentId);
|
||||
String description = null;
|
||||
Task.Status status = null;
|
||||
if (detailed) {
|
||||
description = getDescription();
|
||||
status = getStatus();
|
||||
}
|
||||
return new TaskInfo(node, getId(), getType(), getAction(), description, status, parentNode, parentId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -104,4 +121,15 @@ public class Task {
|
|||
return parentId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a status for this task or null if this task doesn't have status.
|
||||
* Since most tasks don't have status this defaults to returning null. While
|
||||
* this can never perform IO it might be a costly operation, requiring
|
||||
* collating lists of results, etc. So only use it if you need the value.
|
||||
*/
|
||||
public Status getStatus() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public interface Status extends ToXContent, NamedWriteable<Status> {}
|
||||
}
|
||||
|
|
|
@ -20,11 +20,13 @@
|
|||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
|
||||
import org.elasticsearch.action.support.replication.ReplicationTask;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.metrics.MeanMetric;
|
||||
|
@ -41,6 +43,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
|
|||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
|
@ -109,11 +112,11 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
volatile DiscoveryNode localNode = null;
|
||||
|
||||
public TransportService(Transport transport, ThreadPool threadPool) {
|
||||
this(EMPTY_SETTINGS, transport, threadPool);
|
||||
this(EMPTY_SETTINGS, transport, threadPool, new NamedWriteableRegistry());
|
||||
}
|
||||
|
||||
@Inject
|
||||
public TransportService(Settings settings, Transport transport, ThreadPool threadPool) {
|
||||
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings);
|
||||
this.transport = transport;
|
||||
this.threadPool = threadPool;
|
||||
|
@ -122,6 +125,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
tracerLog = Loggers.getLogger(logger, ".tracer");
|
||||
adapter = createAdapter();
|
||||
taskManager = createTaskManager();
|
||||
namedWriteableRegistry.registerPrototype(Task.Status.class, ReplicationTask.Status.PROTOTYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -333,6 +333,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
}
|
||||
|
||||
protected void handleResponse(StreamInput buffer, LocalTransport sourceTransport, final TransportResponseHandler handler) {
|
||||
buffer = new NamedWriteableAwareStreamInput(buffer, namedWriteableRegistry);
|
||||
final TransportResponse response = handler.newInstance();
|
||||
response.remoteAddress(sourceTransport.boundAddress.publishAddress());
|
||||
try {
|
||||
|
|
|
@ -192,6 +192,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
}
|
||||
|
||||
protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
|
||||
buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
|
||||
final TransportResponse response = handler.newInstance();
|
||||
response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
|
||||
response.remoteAddress();
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.elasticsearch.action.admin.cluster.node.tasks;
|
||||
|
||||
import org.elasticsearch.action.ListenableActionFuture;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
|
@ -25,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
|
|||
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
|
||||
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeAction;
|
||||
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
|
||||
import org.elasticsearch.action.index.IndexAction;
|
||||
import org.elasticsearch.action.percolate.PercolateAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -32,20 +34,27 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.tasks.MockTaskManager;
|
||||
import org.elasticsearch.test.tasks.MockTaskManagerListener;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.hamcrest.Matchers.emptyCollectionOf;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
/**
|
||||
* Integration tests for task management API
|
||||
|
@ -218,6 +227,59 @@ public class TasksIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Very basic "is it plugged in" style test that indexes a document and
|
||||
* makes sure that you can fetch the status of the process. The goal here is
|
||||
* to verify that the large moving parts that make fetching task status work
|
||||
* fit together rather than to verify any particular status results from
|
||||
* indexing. For that, look at
|
||||
* {@link org.elasticsearch.action.support.replication.TransportReplicationActionTests}
|
||||
* . We intentionally don't use the task recording mechanism used in other
|
||||
* places in this test so we can make sure that the status fetching works
|
||||
* properly over the wire.
|
||||
*/
|
||||
public void testCanFetchIndexStatus() throws InterruptedException, ExecutionException, IOException {
|
||||
/*
|
||||
* We prevent any tasks from unregistering until the test is done so we
|
||||
* can fetch them. This will gum up the server if we leave it enabled
|
||||
* but we'll be quick so it'll be OK (TM).
|
||||
*/
|
||||
ReentrantLock taskFinishLock = new ReentrantLock();
|
||||
taskFinishLock.lock();
|
||||
for (ClusterService clusterService : internalCluster().getInstances(ClusterService.class)) {
|
||||
((MockTaskManager)clusterService.getTaskManager()).addListener(new MockTaskManagerListener() {
|
||||
@Override
|
||||
public void onTaskRegistered(Task task) {
|
||||
// Intentional noop
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTaskUnregistered(Task task) {
|
||||
/*
|
||||
* We can't block all tasks here or the task listing task
|
||||
* would never return.
|
||||
*/
|
||||
if (false == task.getAction().startsWith(IndexAction.NAME)) {
|
||||
return;
|
||||
}
|
||||
logger.debug("Blocking {} from being unregistered", task);
|
||||
taskFinishLock.lock();
|
||||
taskFinishLock.unlock();
|
||||
}
|
||||
});
|
||||
}
|
||||
ListenableActionFuture<?> indexFuture = client().prepareIndex("test", "test").setSource("test", "test").execute();
|
||||
ListTasksResponse tasks = client().admin().cluster().prepareListTasks().setActions("indices:data/write/index*").setDetailed(true)
|
||||
.get();
|
||||
taskFinishLock.unlock();
|
||||
indexFuture.get();
|
||||
assertThat(tasks.getTasks(), not(emptyCollectionOf(TaskInfo.class)));
|
||||
for (TaskInfo task : tasks.getTasks()) {
|
||||
assertNotNull(task.getStatus());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
|
||||
|
|
|
@ -58,7 +58,6 @@ import org.elasticsearch.transport.TransportService;
|
|||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -115,7 +114,7 @@ public class TransportTasksActionTests extends ESTestCase {
|
|||
public TestNode(String name, ThreadPool threadPool, Settings settings) {
|
||||
transportService = new TransportService(settings,
|
||||
new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()),
|
||||
threadPool){
|
||||
threadPool, new NamedWriteableRegistry()) {
|
||||
@Override
|
||||
protected TaskManager createTaskManager() {
|
||||
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
*/
|
||||
package org.elasticsearch.action.support.replication;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Repeat;
|
||||
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ReplicationResponse;
|
||||
|
@ -44,10 +46,10 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
|||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.shard.IndexShardNotStartedException;
|
||||
|
@ -64,6 +66,7 @@ import org.elasticsearch.transport.TransportChannel;
|
|||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportResponseOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -86,6 +89,7 @@ import static org.elasticsearch.action.support.replication.ClusterStateCreationU
|
|||
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.hamcrest.Matchers.arrayWithSize;
|
||||
import static org.hamcrest.Matchers.either;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
|
@ -142,27 +146,30 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
public void testBlocks() throws ExecutionException, InterruptedException {
|
||||
Request request = new Request();
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
ReplicationTask task = maybeTask();
|
||||
|
||||
ClusterBlocks.Builder block = ClusterBlocks.builder()
|
||||
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
|
||||
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
|
||||
reroutePhase.run();
|
||||
assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class);
|
||||
assertPhase(task, "failed");
|
||||
|
||||
block = ClusterBlocks.builder()
|
||||
.addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
|
||||
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
|
||||
listener = new PlainActionFuture<>();
|
||||
reroutePhase = action.new ReroutePhase(null, new Request().timeout("5ms"), listener);
|
||||
reroutePhase = action.new ReroutePhase(task, new Request().timeout("5ms"), listener);
|
||||
reroutePhase.run();
|
||||
assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class);
|
||||
|
||||
assertPhase(task, "failed");
|
||||
|
||||
listener = new PlainActionFuture<>();
|
||||
reroutePhase = action.new ReroutePhase(null, new Request(), listener);
|
||||
reroutePhase = action.new ReroutePhase(task, new Request(), listener);
|
||||
reroutePhase.run();
|
||||
assertFalse("primary phase should wait on retryable block", listener.isDone());
|
||||
assertPhase(task, "waiting_for_retry");
|
||||
|
||||
block = ClusterBlocks.builder()
|
||||
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
|
||||
|
@ -181,20 +188,23 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
// no replicas in oder to skip the replication part
|
||||
clusterService.setState(state(index, true,
|
||||
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
|
||||
ReplicationTask task = maybeTask();
|
||||
|
||||
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
|
||||
|
||||
Request request = new Request(shardId).timeout("1ms");
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
|
||||
reroutePhase.run();
|
||||
assertListenerThrows("unassigned primary didn't cause a timeout", listener, UnavailableShardsException.class);
|
||||
assertPhase(task, "failed");
|
||||
|
||||
request = new Request(shardId);
|
||||
listener = new PlainActionFuture<>();
|
||||
reroutePhase = action.new ReroutePhase(null, request, listener);
|
||||
reroutePhase = action.new ReroutePhase(task, request, listener);
|
||||
reroutePhase.run();
|
||||
assertFalse("unassigned primary didn't cause a retry", listener.isDone());
|
||||
assertPhase(task, "waiting_for_retry");
|
||||
|
||||
clusterService.setState(state(index, true, ShardRoutingState.STARTED));
|
||||
logger.debug("--> primary assigned state:\n{}", clusterService.state().prettyPrint());
|
||||
|
@ -267,9 +277,12 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
|
||||
Request request = new Request(new ShardId("unknown_index", "_na_", 0)).timeout("1ms");
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
|
||||
ReplicationTask task = maybeTask();
|
||||
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
|
||||
reroutePhase.run();
|
||||
assertListenerThrows("must throw index not found exception", listener, IndexNotFoundException.class);
|
||||
assertPhase(task, "failed");
|
||||
request = new Request(new ShardId(index, "_na_", 10)).timeout("1ms");
|
||||
listener = new PlainActionFuture<>();
|
||||
reroutePhase = action.new ReroutePhase(null, request, listener);
|
||||
|
@ -280,9 +293,9 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
public void testRoutePhaseExecutesRequest() {
|
||||
final String index = "test";
|
||||
final ShardId shardId = new ShardId(index, "_na_", 0);
|
||||
ReplicationTask task = maybeTask();
|
||||
|
||||
clusterService.setState(stateWithActivePrimary(index, randomBoolean(), 3));
|
||||
|
||||
logger.debug("using state: \n{}", clusterService.state().prettyPrint());
|
||||
|
||||
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
|
||||
|
@ -290,7 +303,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
Request request = new Request(shardId);
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
|
||||
reroutePhase.run();
|
||||
assertThat(request.shardId(), equalTo(shardId));
|
||||
logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId);
|
||||
|
@ -299,8 +312,10 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
assertThat(capturedRequests.size(), equalTo(1));
|
||||
if (clusterService.state().nodes().localNodeId().equals(primaryNodeId)) {
|
||||
assertThat(capturedRequests.get(0).action, equalTo("testAction[p]"));
|
||||
assertPhase(task, "waiting_on_primary");
|
||||
} else {
|
||||
assertThat(capturedRequests.get(0).action, equalTo("testAction"));
|
||||
assertPhase(task, "rerouted");
|
||||
}
|
||||
assertIndexShardUninitialized();
|
||||
}
|
||||
|
@ -312,8 +327,9 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
clusterService.setState(state);
|
||||
Request request = new Request(shardId).timeout("1ms");
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
ReplicationTask task = maybeTask();
|
||||
AtomicBoolean movedToReplication = new AtomicBoolean();
|
||||
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener)) {
|
||||
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener)) {
|
||||
@Override
|
||||
void finishAndMoveToReplication(TransportReplicationAction.ReplicationPhase replicationPhase) {
|
||||
super.finishAndMoveToReplication(replicationPhase);
|
||||
|
@ -335,6 +351,9 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
assertThat(requests, notNullValue());
|
||||
assertThat(requests.size(), equalTo(1));
|
||||
assertThat("primary request was not delegated to relocation target", requests.get(0).action, equalTo("testAction[p]"));
|
||||
assertPhase(task, "primary");
|
||||
} else {
|
||||
assertPhase(task, either(equalTo("finished")).or(equalTo("replicating")));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -348,8 +367,9 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
clusterService.setState(state);
|
||||
Request request = new Request(shardId).timeout("1ms");
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
ReplicationTask task = maybeTask();
|
||||
AtomicBoolean movedToReplication = new AtomicBoolean();
|
||||
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener)) {
|
||||
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener)) {
|
||||
@Override
|
||||
void finishAndMoveToReplication(TransportReplicationAction.ReplicationPhase replicationPhase) {
|
||||
super.finishAndMoveToReplication(replicationPhase);
|
||||
|
@ -359,6 +379,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
primaryPhase.run();
|
||||
assertThat("request was not processed on primary relocation target", request.processedOnPrimary.get(), equalTo(true));
|
||||
assertThat(movedToReplication.get(), equalTo(true));
|
||||
assertPhase(task, "replicating");
|
||||
}
|
||||
|
||||
public void testAddedReplicaAfterPrimaryOperation() {
|
||||
|
@ -368,6 +389,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
clusterService.setState(stateWithActivePrimary(index, true, 0));
|
||||
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
|
||||
final ClusterState stateWithAddedReplicas = state(index, true, ShardRoutingState.STARTED, randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED);
|
||||
ReplicationTask task = maybeTask();
|
||||
|
||||
final Action actionWithAddedReplicaAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
|
||||
@Override
|
||||
|
@ -382,9 +404,10 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
|
||||
Request request = new Request(shardId);
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithAddedReplicaAfterPrimaryOp.new PrimaryPhase(request, createTransportChannel(listener));
|
||||
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithAddedReplicaAfterPrimaryOp.new PrimaryPhase(task, request, createTransportChannel(listener));
|
||||
primaryPhase.run();
|
||||
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
|
||||
assertPhase(task, "replicating");
|
||||
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequestsByTargetNode = transport.getCapturedRequestsByTargetNodeAndClear();
|
||||
for (ShardRouting replica : stateWithAddedReplicas.getRoutingTable().shardRoutingTable(index, shardId.id()).replicaShards()) {
|
||||
List<CapturingTransport.CapturedRequest> requests = capturedRequestsByTargetNode.get(replica.currentNodeId());
|
||||
|
@ -415,11 +438,14 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
|
||||
Request request = new Request(shardId);
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithRelocatingReplicasAfterPrimaryOp.new PrimaryPhase(request, createTransportChannel(listener));
|
||||
ReplicationTask task = maybeTask();
|
||||
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithRelocatingReplicasAfterPrimaryOp.new PrimaryPhase(
|
||||
task, request, createTransportChannel(listener));
|
||||
primaryPhase.run();
|
||||
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
|
||||
ShardRouting relocatingReplicaShard = stateWithRelocatingReplica.getRoutingTable().shardRoutingTable(index, shardId.id()).replicaShards().get(0);
|
||||
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequestsByTargetNode = transport.getCapturedRequestsByTargetNodeAndClear();
|
||||
assertPhase(task, "replicating");
|
||||
for (String node : new String[] {relocatingReplicaShard.currentNodeId(), relocatingReplicaShard.relocatingNodeId()}) {
|
||||
List<CapturingTransport.CapturedRequest> requests = capturedRequestsByTargetNode.get(node);
|
||||
assertThat(requests, notNullValue());
|
||||
|
@ -448,10 +474,13 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
|
||||
Request request = new Request(shardId);
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithDeletedIndexAfterPrimaryOp.new PrimaryPhase(request, createTransportChannel(listener));
|
||||
ReplicationTask task = maybeTask();
|
||||
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithDeletedIndexAfterPrimaryOp.new PrimaryPhase(
|
||||
task, request, createTransportChannel(listener));
|
||||
primaryPhase.run();
|
||||
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
|
||||
assertThat("replication phase should be skipped if index gets deleted after primary operation", transport.capturedRequestsByTargetNode().size(), equalTo(0));
|
||||
assertPhase(task, "finished");
|
||||
}
|
||||
|
||||
public void testWriteConsistency() throws ExecutionException, InterruptedException {
|
||||
|
@ -496,16 +525,18 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
|
||||
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener));
|
||||
ReplicationTask task = maybeTask();
|
||||
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener));
|
||||
if (passesWriteConsistency) {
|
||||
assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard().shardId()), nullValue());
|
||||
primaryPhase.run();
|
||||
assertTrue("operations should have been perform, consistency level is met", request.processedOnPrimary.get());
|
||||
assertTrue("operations should have been performed, consistency level is met", request.processedOnPrimary.get());
|
||||
if (assignedReplicas > 0) {
|
||||
assertIndexShardCounter(2);
|
||||
} else {
|
||||
assertIndexShardCounter(1);
|
||||
}
|
||||
assertPhase(task, either(equalTo("finished")).or(equalTo("replicating")));
|
||||
} else {
|
||||
assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard().shardId()), notNullValue());
|
||||
primaryPhase.run();
|
||||
|
@ -517,10 +548,11 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
}
|
||||
clusterService.setState(state(index, true, ShardRoutingState.STARTED, replicaStates));
|
||||
listener = new PlainActionFuture<>();
|
||||
primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener));
|
||||
primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener));
|
||||
primaryPhase.run();
|
||||
assertTrue("once the consistency level met, operation should continue", request.processedOnPrimary.get());
|
||||
assertIndexShardCounter(2);
|
||||
assertPhase(task, "replicating");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -590,6 +622,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
final ShardId shardId = shardIt.shardId();
|
||||
final Request request = new Request(shardId);
|
||||
final PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
ReplicationTask task = maybeTask();
|
||||
logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint());
|
||||
|
||||
TransportReplicationAction.IndexShardReference reference = getOrCreateIndexShardOperationsCounter();
|
||||
|
@ -599,15 +632,14 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
|
||||
assertIndexShardCounter(2);
|
||||
// TODO: set a default timeout
|
||||
TransportReplicationAction<Request, Request, Response>.ReplicationPhase replicationPhase =
|
||||
action.new ReplicationPhase(request,
|
||||
new Response(),
|
||||
request.shardId(), createTransportChannel(listener), reference);
|
||||
TransportReplicationAction<Request, Request, Response>.ReplicationPhase replicationPhase = action.new ReplicationPhase(task,
|
||||
request, new Response(), request.shardId(), createTransportChannel(listener), reference);
|
||||
|
||||
assertThat(replicationPhase.totalShards(), equalTo(totalShards));
|
||||
assertThat(replicationPhase.pending(), equalTo(assignedReplicas));
|
||||
replicationPhase.run();
|
||||
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
|
||||
assertPhase(task, either(equalTo("finished")).or(equalTo("replicating")));
|
||||
|
||||
HashMap<String, Request> nodesSentTo = new HashMap<>();
|
||||
boolean executeOnReplica =
|
||||
|
@ -718,11 +750,11 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
final String index = "test";
|
||||
final ShardId shardId = new ShardId(index, "_na_", 0);
|
||||
// no replica, we only want to test on primary
|
||||
clusterService.setState(state(index, true,
|
||||
ShardRoutingState.STARTED));
|
||||
clusterService.setState(state(index, true, ShardRoutingState.STARTED));
|
||||
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
|
||||
Request request = new Request(shardId).timeout("100ms");
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
ReplicationTask task = maybeTask();
|
||||
|
||||
/**
|
||||
* Execute an action that is stuck in shard operation until a latch is counted down.
|
||||
|
@ -732,7 +764,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
* However, this failure would only become apparent once listener.get is called. Seems a little implicit.
|
||||
* */
|
||||
action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
|
||||
final TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener));
|
||||
final TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener));
|
||||
Thread t = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -751,6 +783,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
// operation finished, counter back to 0
|
||||
assertIndexShardCounter(1);
|
||||
assertThat(transport.capturedRequests().length, equalTo(0));
|
||||
assertPhase(task, "finished");
|
||||
}
|
||||
|
||||
public void testCounterIncrementedWhileReplicationOngoing() throws InterruptedException, ExecutionException, IOException {
|
||||
|
@ -764,7 +797,9 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
|
||||
Request request = new Request(shardId).timeout("100ms");
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener));
|
||||
ReplicationTask task = maybeTask();
|
||||
|
||||
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener));
|
||||
primaryPhase.run();
|
||||
assertIndexShardCounter(2);
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
|
@ -772,10 +807,14 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
transport.handleResponse(transport.capturedRequests()[0].requestId, TransportResponse.Empty.INSTANCE);
|
||||
transport.clear();
|
||||
assertIndexShardCounter(1);
|
||||
assertPhase(task, "finished");
|
||||
|
||||
request = new Request(shardId).timeout("100ms");
|
||||
primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener));
|
||||
task = maybeTask();
|
||||
primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener));
|
||||
primaryPhase.run();
|
||||
assertIndexShardCounter(2);
|
||||
assertPhase(task, "replicating");
|
||||
CapturingTransport.CapturedRequest[] replicationRequests = transport.getCapturedRequestsAndClear();
|
||||
assertThat(replicationRequests.length, equalTo(1));
|
||||
// try with failure response
|
||||
|
@ -792,12 +831,14 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
|
||||
action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
|
||||
final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
|
||||
final ReplicationTask task = maybeTask();
|
||||
Thread t = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
replicaOperationTransportHandler.messageReceived(new Request().setShardId(shardId), createTransportChannel(new PlainActionFuture<>()));
|
||||
replicaOperationTransportHandler.messageReceived(new Request().setShardId(shardId), createTransportChannel(new PlainActionFuture<>()), task);
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -807,13 +848,14 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
assertBusy(() -> assertIndexShardCounter(2));
|
||||
((ActionWithDelay) action).countDownLatch.countDown();
|
||||
t.join();
|
||||
assertPhase(task, "finished");
|
||||
// operation should have finished and counter decreased because no outstanding replica requests
|
||||
assertIndexShardCounter(1);
|
||||
// now check if this also works if operation throws exception
|
||||
action = new ActionWithExceptions(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
|
||||
final Action.ReplicaOperationTransportHandler replicaOperationTransportHandlerForException = action.new ReplicaOperationTransportHandler();
|
||||
try {
|
||||
replicaOperationTransportHandlerForException.messageReceived(new Request(shardId), createTransportChannel(new PlainActionFuture<>()));
|
||||
replicaOperationTransportHandlerForException.messageReceived(new Request(shardId), createTransportChannel(new PlainActionFuture<>()), task);
|
||||
fail();
|
||||
} catch (Throwable t2) {
|
||||
}
|
||||
|
@ -829,12 +871,15 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
|
||||
Request request = new Request(shardId).timeout("100ms");
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener));
|
||||
ReplicationTask task = maybeTask();
|
||||
|
||||
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener));
|
||||
primaryPhase.run();
|
||||
// no replica request should have been sent yet
|
||||
assertThat(transport.capturedRequests().length, equalTo(0));
|
||||
// no matter if the operation is retried or not, counter must be be back to 1
|
||||
assertIndexShardCounter(1);
|
||||
assertPhase(task, "failed");
|
||||
}
|
||||
|
||||
private void assertIndexShardCounter(int expected) {
|
||||
|
@ -847,9 +892,9 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
|
||||
private final AtomicReference<ShardRouting> indexShardRouting = new AtomicReference<>();
|
||||
|
||||
/*
|
||||
* Returns testIndexShardOperationsCounter or initializes it if it was already created in this test run.
|
||||
* */
|
||||
/**
|
||||
* Returns testIndexShardOperationsCounter or initializes it if it was already created in this test run.
|
||||
*/
|
||||
private synchronized TransportReplicationAction.IndexShardReference getOrCreateIndexShardOperationsCounter() {
|
||||
count.incrementAndGet();
|
||||
return new TransportReplicationAction.IndexShardReference() {
|
||||
|
@ -872,6 +917,29 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Sometimes build a ReplicationTask for tracking the phase of the
|
||||
* TransportReplicationAction. Since TransportReplicationAction has to work
|
||||
* if the task as null just as well as if it is supplied this returns null
|
||||
* half the time.
|
||||
*/
|
||||
private ReplicationTask maybeTask() {
|
||||
return random().nextBoolean() ? new ReplicationTask(0, null, null, null, null, 0) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the task is non-null this asserts that the phrase matches.
|
||||
*/
|
||||
private void assertPhase(@Nullable ReplicationTask task, String phase) {
|
||||
assertPhase(task, equalTo(phase));
|
||||
}
|
||||
|
||||
private void assertPhase(@Nullable ReplicationTask task, Matcher<String> phaseMatcher) {
|
||||
if (task != null) {
|
||||
assertThat(task.getPhase(), phaseMatcher);
|
||||
}
|
||||
}
|
||||
|
||||
public static class Request extends ReplicationRequest<Request> {
|
||||
public AtomicBoolean processedOnPrimary = new AtomicBoolean();
|
||||
public AtomicInteger processedOnReplicas = new AtomicInteger();
|
||||
|
@ -959,9 +1027,9 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Throws exceptions when executed. Used for testing if the counter is correctly decremented in case an operation fails.
|
||||
* */
|
||||
/**
|
||||
* Throws exceptions when executed. Used for testing if the counter is correctly decremented in case an operation fails.
|
||||
*/
|
||||
class ActionWithExceptions extends Action {
|
||||
|
||||
ActionWithExceptions(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) throws IOException {
|
||||
|
@ -1027,9 +1095,9 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
|
||||
}
|
||||
|
||||
/*
|
||||
* Transport channel that is needed for replica operation testing.
|
||||
* */
|
||||
/**
|
||||
* Transport channel that is needed for replica operation testing.
|
||||
*/
|
||||
public TransportChannel createTransportChannel(final PlainActionFuture<Response> listener) {
|
||||
return new TransportChannel() {
|
||||
|
||||
|
|
|
@ -32,13 +32,13 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
|
@ -128,8 +128,8 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase {
|
|||
CountDownLatch clusterStateLatch = new CountDownLatch(1);
|
||||
|
||||
@Inject
|
||||
public InternalTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
|
||||
super(settings, transport, threadPool);
|
||||
public InternalTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, transport, threadPool, namedWriteableRegistry);
|
||||
}
|
||||
|
||||
@Override @SuppressWarnings("unchecked")
|
||||
|
|
|
@ -23,9 +23,9 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BaseTransportResponseHandler;
|
||||
|
@ -71,7 +71,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
|
|||
return new TestResponse();
|
||||
}
|
||||
};
|
||||
transportService = new TransportService(Settings.EMPTY, transport, threadPool);
|
||||
transportService = new TransportService(Settings.EMPTY, transport, threadPool, new NamedWriteableRegistry());
|
||||
transportService.start();
|
||||
transportClientNodesService = new TransportClientNodesService(Settings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT);
|
||||
|
||||
|
|
|
@ -104,7 +104,9 @@ public class ZenFaultDetectionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
protected MockTransportService build(Settings settings, Version version) {
|
||||
MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version, new NamedWriteableRegistry()), threadPool);
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
MockTransportService transportService = new MockTransportService(Settings.EMPTY,
|
||||
new LocalTransport(settings, threadPool, version, namedWriteableRegistry), threadPool, namedWriteableRegistry);
|
||||
transportService.start();
|
||||
return transportService;
|
||||
}
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
|
@ -55,7 +54,6 @@ import org.elasticsearch.transport.TransportConnectionListener;
|
|||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportResponseOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
|
@ -232,7 +230,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
protected MockTransportService buildTransportService(Settings settings, Version version) {
|
||||
MockTransportService transportService = new MockTransportService(settings, new LocalTransport(settings, threadPool, version, new NamedWriteableRegistry()), threadPool);
|
||||
MockTransportService transportService = MockTransportService.local(Settings.EMPTY, version, threadPool);
|
||||
transportService.start();
|
||||
return transportService;
|
||||
}
|
||||
|
|
|
@ -41,8 +41,8 @@ public class TransportModuleTests extends ModuleTestCase {
|
|||
|
||||
static class FakeTransportService extends TransportService {
|
||||
@Inject
|
||||
public FakeTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
|
||||
super(settings, transport, threadPool);
|
||||
public FakeTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, transport, threadPool, namedWriteableRegistry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ public class SimpleLocalTransportTests extends AbstractSimpleTransportTestCase {
|
|||
|
||||
@Override
|
||||
protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version, namedWriteableRegistry), threadPool);
|
||||
MockTransportService transportService = MockTransportService.local(settings, version, threadPool);
|
||||
transportService.start();
|
||||
return transportService;
|
||||
}
|
||||
|
|
|
@ -52,12 +52,14 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
|
||||
Settings settings = Settings.builder().put(NettyTransport.PING_SCHEDULE.getKey(), "5ms").put(TransportSettings.PORT.getKey(), 0).build();
|
||||
|
||||
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
|
||||
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool);
|
||||
NamedWriteableRegistry registryA = new NamedWriteableRegistry();
|
||||
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA);
|
||||
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, registryA);
|
||||
serviceA.start();
|
||||
|
||||
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
|
||||
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool);
|
||||
NamedWriteableRegistry registryB = new NamedWriteableRegistry();
|
||||
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB);
|
||||
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, registryB);
|
||||
serviceB.start();
|
||||
|
||||
DiscoveryNode nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), Version.CURRENT);
|
||||
|
|
|
@ -22,10 +22,8 @@ package org.elasticsearch.transport.netty;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
|
@ -41,7 +39,7 @@ public class SimpleNettyTransportTests extends AbstractSimpleTransportTestCase {
|
|||
@Override
|
||||
protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), "0").build();
|
||||
MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version, namedWriteableRegistry), threadPool);
|
||||
MockTransportService transportService = MockTransportService.nettyFromThreadPool(settings, version, threadPool);
|
||||
transportService.start();
|
||||
return transportService;
|
||||
}
|
||||
|
|
|
@ -85,6 +85,7 @@ import org.elasticsearch.action.update.UpdateRequest;
|
|||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
|
@ -93,7 +94,6 @@ import org.elasticsearch.script.Script;
|
|||
import org.elasticsearch.script.groovy.GroovyPlugin;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
|
@ -785,8 +785,8 @@ public class IndicesRequestTests extends ESIntegTestCase {
|
|||
private final Map<String, List<TransportRequest>> requests = new HashMap<>();
|
||||
|
||||
@Inject
|
||||
public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
|
||||
super(settings, transport, threadPool);
|
||||
public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, transport, threadPool, namedWriteableRegistry);
|
||||
}
|
||||
|
||||
synchronized List<TransportRequest> consumeRequests(String action) {
|
||||
|
|
|
@ -20,11 +20,11 @@
|
|||
package org.elasticsearch.discovery.ec2;
|
||||
|
||||
import com.amazonaws.services.ec2.model.Tag;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
|
@ -32,7 +32,6 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -67,9 +66,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
|
||||
@Before
|
||||
public void createTransportService() {
|
||||
transportService = new MockTransportService(
|
||||
Settings.EMPTY,
|
||||
new LocalTransport(Settings.EMPTY, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool);
|
||||
transportService = MockTransportService.local(Settings.EMPTY, Version.CURRENT, threadPool);
|
||||
}
|
||||
|
||||
protected List<DiscoveryNode> buildDynamicNodes(Settings nodeSettings, int nodes) {
|
||||
|
|
|
@ -22,13 +22,11 @@ package org.elasticsearch.discovery.gce;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cloud.gce.GceComputeService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -94,9 +92,7 @@ public class GceDiscoveryTests extends ESTestCase {
|
|||
|
||||
@Before
|
||||
public void createTransportService() {
|
||||
transportService = new MockTransportService(
|
||||
Settings.EMPTY,
|
||||
new LocalTransport(Settings.EMPTY, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool);
|
||||
transportService = MockTransportService.local(Settings.EMPTY, Version.CURRENT, threadPool);
|
||||
}
|
||||
|
||||
@Before
|
||||
|
|
|
@ -19,11 +19,13 @@
|
|||
|
||||
package org.elasticsearch.test.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.component.LifecycleListener;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
|
@ -32,6 +34,7 @@ import org.elasticsearch.common.settings.SettingsModule;
|
|||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
@ -46,6 +49,8 @@ import org.elasticsearch.transport.TransportRequest;
|
|||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
@ -91,11 +96,25 @@ public class MockTransportService extends TransportService {
|
|||
}
|
||||
}
|
||||
|
||||
public static MockTransportService local(Settings settings, Version version, ThreadPool threadPool) {
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
Transport transport = new LocalTransport(settings, threadPool, version, namedWriteableRegistry);
|
||||
return new MockTransportService(settings, transport, threadPool, namedWriteableRegistry);
|
||||
}
|
||||
|
||||
public static MockTransportService nettyFromThreadPool(Settings settings, Version version, ThreadPool threadPool) {
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
Transport transport = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE,
|
||||
version, namedWriteableRegistry);
|
||||
return new MockTransportService(Settings.EMPTY, transport, threadPool, namedWriteableRegistry);
|
||||
}
|
||||
|
||||
|
||||
private final Transport original;
|
||||
|
||||
@Inject
|
||||
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
|
||||
super(settings, new LookupTestTransport(transport), threadPool);
|
||||
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, new LookupTestTransport(transport), threadPool, namedWriteableRegistry);
|
||||
this.original = transport;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue