diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/NodesOperationRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/NodesOperationRequest.java index cca607a3ada..a4b38d7e29c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/NodesOperationRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/NodesOperationRequest.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; @@ -38,6 +39,8 @@ public abstract class NodesOperationRequest implements ActionRequest { private boolean listenerThreaded = false; + private TimeValue timeout; + protected NodesOperationRequest() { } @@ -64,6 +67,15 @@ public abstract class NodesOperationRequest implements ActionRequest { return this; } + public TimeValue timeout() { + return this.timeout; + } + + public NodesOperationRequest timeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + @Override public ActionRequestValidationException validate() { return null; } @@ -73,6 +85,9 @@ public abstract class NodesOperationRequest implements ActionRequest { for (int i = 0; i < nodesIds.length; i++) { nodesIds[i] = in.readUTF(); } + if (in.readBoolean()) { + timeout = TimeValue.readTimeValue(in); + } } @Override public void writeTo(StreamOutput out) throws IOException { @@ -84,5 +99,11 @@ public abstract class NodesOperationRequest implements ActionRequest { out.writeUTF(nodeId); } } + if (timeout == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + timeout.writeTo(out); + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java index 731b6406309..359887cc458 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java @@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public abstract class TransportNodesOperationAction extends BaseAction { @@ -124,6 +124,10 @@ public abstract class TransportNodesOperationAction() { + transportService.sendRequest(node, transportNodeAction(), nodeRequest, transportRequestOptions, new BaseTransportResponseHandler() { @Override public NodeResponse newInstance() { return newNodeResponse(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java index fe4d44092dc..ce37aa09eac 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.gateway.CommitPoint; import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway; import org.elasticsearch.index.service.InternalIndexService; @@ -52,11 +53,15 @@ public class BlobReuseExistingNodeAllocation extends AbstractComponent implement private final TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData; + private final TimeValue listTimeout; + @Inject public BlobReuseExistingNodeAllocation(Settings settings, IndicesService indicesService, TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) { super(settings); this.indicesService = indicesService; this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData; + + this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueMillis(500)); } @Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { @@ -78,7 +83,7 @@ public class BlobReuseExistingNodeAllocation extends AbstractComponent implement continue; } - TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet()).actionGet(); + TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet(); if (logger.isDebugEnabled()) { if (nodesStoreFilesMetaData.failures().length > 0) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index b6032a33e89..5d4f19ff997 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -21,7 +21,6 @@ package org.elasticsearch.indices.store; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.nodes.*; import org.elasticsearch.cluster.ClusterName; @@ -33,6 +32,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.service.InternalIndexService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStore; @@ -40,6 +40,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Set; @@ -58,12 +59,8 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio this.indicesService = indicesService; } - public ActionFuture list(ShardId shardId, boolean onlyUnallocated, Set nodesIds) { - return execute(new Request(shardId, onlyUnallocated, nodesIds)); - } - - public void list(ShardId shardId, boolean onlyUnallocated, Set nodesIds, ActionListener listener) { - execute(new Request(shardId, onlyUnallocated, nodesIds), listener); + public ActionFuture list(ShardId shardId, boolean onlyUnallocated, Set nodesIds, @Nullable TimeValue timeout) { + return execute(new Request(shardId, onlyUnallocated, nodesIds).timeout(timeout)); } @Override protected String transportAction() { @@ -155,6 +152,11 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio this.unallocated = unallocated; } + @Override public Request timeout(TimeValue timeout) { + super.timeout(timeout); + return this; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); shardId = ShardId.readShardId(in);