list remote nodest storage information with a timeout

This commit is contained in:
kimchy 2010-08-23 18:00:37 +03:00
parent 1461da5b49
commit 5900e01a0d
4 changed files with 42 additions and 10 deletions

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
@ -38,6 +39,8 @@ public abstract class NodesOperationRequest implements ActionRequest {
private boolean listenerThreaded = false;
private TimeValue timeout;
protected NodesOperationRequest() {
}
@ -64,6 +67,15 @@ public abstract class NodesOperationRequest implements ActionRequest {
return this;
}
public TimeValue timeout() {
return this.timeout;
}
public NodesOperationRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
@Override public ActionRequestValidationException validate() {
return null;
}
@ -73,6 +85,9 @@ public abstract class NodesOperationRequest implements ActionRequest {
for (int i = 0; i < nodesIds.length; i++) {
nodesIds[i] = in.readUTF();
}
if (in.readBoolean()) {
timeout = TimeValue.readTimeValue(in);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -84,5 +99,11 @@ public abstract class NodesOperationRequest implements ActionRequest {
out.writeUTF(nodeId);
}
}
if (timeout == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
timeout.writeTo(out);
}
}
}

View File

@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public abstract class TransportNodesOperationAction<Request extends NodesOperationRequest, Response extends NodesOperationResponse, NodeRequest extends NodeOperationRequest, NodeResponse extends NodeOperationResponse> extends BaseAction<Request, Response> {
@ -124,6 +124,10 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
});
return;
}
TransportRequestOptions transportRequestOptions = TransportRequestOptions.options();
if (request.timeout() != null) {
transportRequestOptions.withTimeout(request.timeout());
}
for (final String nodeId : nodesIds) {
final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId);
if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) {
@ -151,7 +155,7 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
onFailure(nodeId, new NoSuchNodeException(nodeId));
} else {
NodeRequest nodeRequest = newNodeRequest(nodeId, request);
transportService.sendRequest(node, transportNodeAction(), nodeRequest, new BaseTransportResponseHandler<NodeResponse>() {
transportService.sendRequest(node, transportNodeAction(), nodeRequest, transportRequestOptions, new BaseTransportResponseHandler<NodeResponse>() {
@Override public NodeResponse newInstance() {
return newNodeResponse();
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.gateway.CommitPoint;
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway;
import org.elasticsearch.index.service.InternalIndexService;
@ -52,11 +53,15 @@ public class BlobReuseExistingNodeAllocation extends AbstractComponent implement
private final TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData;
private final TimeValue listTimeout;
@Inject public BlobReuseExistingNodeAllocation(Settings settings, IndicesService indicesService,
TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) {
super(settings);
this.indicesService = indicesService;
this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData;
this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueMillis(500));
}
@Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
@ -78,7 +83,7 @@ public class BlobReuseExistingNodeAllocation extends AbstractComponent implement
continue;
}
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet()).actionGet();
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet();
if (logger.isDebugEnabled()) {
if (nodesStoreFilesMetaData.failures().length > 0) {

View File

@ -21,7 +21,6 @@ package org.elasticsearch.indices.store;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.*;
import org.elasticsearch.cluster.ClusterName;
@ -33,6 +32,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
@ -40,6 +40,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Set;
@ -58,12 +59,8 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
this.indicesService = indicesService;
}
public ActionFuture<NodesStoreFilesMetaData> list(ShardId shardId, boolean onlyUnallocated, Set<String> nodesIds) {
return execute(new Request(shardId, onlyUnallocated, nodesIds));
}
public void list(ShardId shardId, boolean onlyUnallocated, Set<String> nodesIds, ActionListener<NodesStoreFilesMetaData> listener) {
execute(new Request(shardId, onlyUnallocated, nodesIds), listener);
public ActionFuture<NodesStoreFilesMetaData> list(ShardId shardId, boolean onlyUnallocated, Set<String> nodesIds, @Nullable TimeValue timeout) {
return execute(new Request(shardId, onlyUnallocated, nodesIds).timeout(timeout));
}
@Override protected String transportAction() {
@ -155,6 +152,11 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
this.unallocated = unallocated;
}
@Override public Request timeout(TimeValue timeout) {
super.timeout(timeout);
return this;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);