improve local gateway allocation by caching the started shards when possible and not asking the nodes for it

This commit is contained in:
kimchy 2011-04-16 22:33:45 +03:00
parent 658594fa70
commit e8306ac2c8
3 changed files with 164 additions and 143 deletions

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.*; import org.elasticsearch.cluster.routing.allocation.*;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -54,19 +55,19 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
private final Node node; private final Node node;
private final TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData; private final TransportNodesListShardStoreMetaData listShardStoreMetaData;
private final TimeValue listTimeout; private final TimeValue listTimeout;
private final ConcurrentMap<ShardId, CommitPoint> cachedCommitPoints = ConcurrentCollections.newConcurrentMap(); private final ConcurrentMap<ShardId, CommitPoint> cachedCommitPoints = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData>> cachedStores = ConcurrentCollections.newConcurrentMap(); private final ConcurrentMap<ShardId, Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData>> cachedStores = ConcurrentCollections.newConcurrentMap();
@Inject public BlobReuseExistingNodeAllocation(Settings settings, Node node, @Inject public BlobReuseExistingNodeAllocation(Settings settings, Node node,
TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) { TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) {
super(settings); super(settings);
this.node = node; // YACK!, we need the Gateway, but it creates crazy circular dependency this.node = node; // YACK!, we need the Gateway, but it creates crazy circular dependency
this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData; this.listShardStoreMetaData = transportNodesListShardStoreMetaData;
this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30)); this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30));
} }
@ -119,7 +120,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
continue; continue;
} }
ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = buildShardStores(nodes, shard); Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = buildShardStores(nodes, shard);
long lastSizeMatched = 0; long lastSizeMatched = 0;
DiscoveryNode lastDiscoNodeMatched = null; DiscoveryNode lastDiscoNodeMatched = null;
@ -248,12 +249,32 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
return changed; return changed;
} }
private ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) { private Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) {
ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = cachedStores.get(shard.shardId()); Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = cachedStores.get(shard.shardId());
Set<String> nodesIds;
if (shardStores == null) { if (shardStores == null) {
shardStores = ConcurrentCollections.newConcurrentMap(); shardStores = Maps.newHashMap();
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet(); cachedStores.put(shard.shardId(), shardStores);
if (logger.isDebugEnabled()) { nodesIds = nodes.dataNodes().keySet();
} else {
nodesIds = Sets.newHashSet();
// clean nodes that have failed
for (DiscoveryNode node : shardStores.keySet()) {
if (!nodes.nodeExists(node.id())) {
shardStores.remove(node);
}
}
for (DiscoveryNode node : nodes.dataNodes().values()) {
if (!shardStores.containsKey(node)) {
nodesIds.add(node.id());
}
}
}
if (!nodesIds.isEmpty()) {
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodesIds, listTimeout).actionGet();
if (logger.isTraceEnabled()) {
if (nodesStoreFilesMetaData.failures().length > 0) { if (nodesStoreFilesMetaData.failures().length > 0) {
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:"); StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:");
for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) { for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) {
@ -263,7 +284,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
} }
sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage()); sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage());
} }
logger.debug(sb.toString()); logger.trace(sb.toString());
} }
} }
@ -272,46 +293,8 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData()); shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
} }
} }
cachedStores.put(shard.shardId(), shardStores);
} else {
// clean nodes that have failed
for (DiscoveryNode node : shardStores.keySet()) {
if (!nodes.nodeExists(node.id())) {
shardStores.remove(node);
}
}
// we have stored cached from before, see if the nodes changed, if they have, go fetch again
Set<String> fetchedNodes = Sets.newHashSet();
for (DiscoveryNode node : nodes.dataNodes().values()) {
if (!shardStores.containsKey(node)) {
fetchedNodes.add(node.id());
}
}
if (!fetchedNodes.isEmpty()) {
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, fetchedNodes, listTimeout).actionGet();
if (logger.isDebugEnabled()) {
if (nodesStoreFilesMetaData.failures().length > 0) {
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:");
for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) {
Throwable cause = ExceptionsHelper.unwrapCause(nodesStoreFilesMetaData.failures()[i]);
if (cause instanceof ConnectTransportException) {
continue;
}
sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage());
}
logger.debug(sb.toString());
}
}
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
if (nodeStoreFilesMetaData.storeFilesMetaData() != null) {
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
}
}
}
} }
return shardStores; return shardStores;
} }
} }

View File

@ -28,9 +28,12 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.*; import org.elasticsearch.cluster.routing.allocation.*;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.iterator.TObjectLongIterator;
import org.elasticsearch.common.trove.map.hash.TObjectLongHashMap;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -53,7 +56,9 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
private final TransportNodesListShardStoreMetaData listShardStoreMetaData; private final TransportNodesListShardStoreMetaData listShardStoreMetaData;
private final ConcurrentMap<ShardId, ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData>> cachedStores = ConcurrentCollections.newConcurrentMap(); private final ConcurrentMap<ShardId, Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData>> cachedStores = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, TObjectLongHashMap<DiscoveryNode>> cachedShardsState = ConcurrentCollections.newConcurrentMap();
private final TimeValue listTimeout; private final TimeValue listTimeout;
@ -72,12 +77,14 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
@Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { @Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
for (ShardRouting shardRouting : allocation.startedShards()) { for (ShardRouting shardRouting : allocation.startedShards()) {
cachedStores.remove(shardRouting.shardId()); cachedStores.remove(shardRouting.shardId());
cachedShardsState.remove(shardRouting.shardId());
} }
} }
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
ShardRouting failedShard = allocation.failedShard(); ShardRouting failedShard = allocation.failedShard();
cachedStores.remove(failedShard.shardId()); cachedStores.remove(failedShard.shardId());
cachedShardsState.remove(failedShard.shardId());
} }
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
@ -86,7 +93,6 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
RoutingNodes routingNodes = allocation.routingNodes(); RoutingNodes routingNodes = allocation.routingNodes();
// First, handle primaries, they must find a place to be allocated on here // First, handle primaries, they must find a place to be allocated on here
TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards nodesState = null;
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator(); Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) { while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next(); MutableShardRouting shard = unassignedIterator.next();
@ -100,44 +106,27 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
continue; continue;
} }
if (nodesState == null) { TObjectLongHashMap<DiscoveryNode> nodesState = buildShardStates(nodes, shard);
Set<String> nodesIds = Sets.newHashSet();
nodesIds.addAll(nodes.dataNodes().keySet());
nodesState = listGatewayStartedShards.list(nodesIds, null).actionGet();
if (nodesState.failures().length > 0) {
StringBuilder sb = new StringBuilder("failures when trying to list started shards on nodes:");
for (int i = 0; i < nodesState.failures().length; i++) {
Throwable cause = ExceptionsHelper.unwrapCause(nodesState.failures()[i]);
if (cause instanceof ConnectTransportException) {
continue;
}
sb.append("\n -> ").append(nodesState.failures()[i].getDetailedMessage());
}
logger.warn(sb.toString());
}
}
int numberOfAllocationsFound = 0; int numberOfAllocationsFound = 0;
long highestVersion = -1; long highestVersion = -1;
DiscoveryNode nodeWithHighestVersion = null; DiscoveryNode nodeWithHighestVersion = null;
for (TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards nodeState : nodesState) { for (TObjectLongIterator<DiscoveryNode> it = nodesState.iterator(); it.hasNext();) {
if (nodeState.state() == null) { it.advance();
continue; DiscoveryNode node = it.key();
} long version = it.value();
// since we don't check in NO allocation, we need to double check here // since we don't check in NO allocation, we need to double check here
if (allocation.shouldIgnoreShardForNode(shard.shardId(), nodeState.node().id())) { if (allocation.shouldIgnoreShardForNode(shard.shardId(), node.id())) {
continue; continue;
} }
Long version = nodeState.state().shards().get(shard.shardId()); if (version != -1) {
if (version != null) {
numberOfAllocationsFound++; numberOfAllocationsFound++;
if (highestVersion == -1) { if (highestVersion == -1) {
nodeWithHighestVersion = nodeState.node(); nodeWithHighestVersion = node;
highestVersion = version; highestVersion = version;
} else { } else {
if (version > highestVersion) { if (version > highestVersion) {
nodeWithHighestVersion = nodeState.node(); nodeWithHighestVersion = node;
highestVersion = version; highestVersion = version;
} }
} }
@ -221,7 +210,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
continue; continue;
} }
ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = buildShardStores(nodes, shard); Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = buildShardStores(nodes, shard);
long lastSizeMatched = 0; long lastSizeMatched = 0;
DiscoveryNode lastDiscoNodeMatched = null; DiscoveryNode lastDiscoNodeMatched = null;
@ -303,12 +292,80 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
return changed; return changed;
} }
private ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) { private TObjectLongHashMap<DiscoveryNode> buildShardStates(DiscoveryNodes nodes, MutableShardRouting shard) {
ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = cachedStores.get(shard.shardId()); TObjectLongHashMap<DiscoveryNode> shardStates = cachedShardsState.get(shard.shardId());
Set<String> nodeIds;
if (shardStates == null) {
shardStates = new TObjectLongHashMap<DiscoveryNode>();
cachedShardsState.put(shard.shardId(), shardStates);
nodeIds = nodes.dataNodes().keySet();
} else {
// clean nodes that have failed
for (DiscoveryNode node : shardStates.keySet()) {
if (!nodes.nodeExists(node.id())) {
shardStates.remove(node);
}
}
nodeIds = Sets.newHashSet();
// we have stored cached from before, see if the nodes changed, if they have, go fetch again
for (DiscoveryNode node : nodes.dataNodes().values()) {
if (!shardStates.containsKey(node)) {
nodeIds.add(node.id());
}
}
}
if (nodeIds.isEmpty()) {
return shardStates;
}
TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), nodes.dataNodes().keySet(), listTimeout).actionGet();
if (logger.isDebugEnabled()) {
if (response.failures().length > 0) {
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list shards on nodes:");
for (int i = 0; i < response.failures().length; i++) {
Throwable cause = ExceptionsHelper.unwrapCause(response.failures()[i]);
if (cause instanceof ConnectTransportException) {
continue;
}
sb.append("\n -> ").append(response.failures()[i].getDetailedMessage());
}
logger.debug(sb.toString());
}
}
for (TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards nodeShardState : response) {
// -1 version means it does not exists, which is what the API returns, and what we expect to
shardStates.put(nodeShardState.node(), nodeShardState.version());
}
return shardStates;
}
private Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) {
Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = cachedStores.get(shard.shardId());
Set<String> nodesIds;
if (shardStores == null) { if (shardStores == null) {
shardStores = ConcurrentCollections.newConcurrentMap(); shardStores = Maps.newHashMap();
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet(); cachedStores.put(shard.shardId(), shardStores);
if (logger.isDebugEnabled()) { nodesIds = nodes.dataNodes().keySet();
} else {
nodesIds = Sets.newHashSet();
// clean nodes that have failed
for (DiscoveryNode node : shardStores.keySet()) {
if (!nodes.nodeExists(node.id())) {
shardStores.remove(node);
}
}
for (DiscoveryNode node : nodes.dataNodes().values()) {
if (!shardStores.containsKey(node)) {
nodesIds.add(node.id());
}
}
}
if (!nodesIds.isEmpty()) {
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodesIds, listTimeout).actionGet();
if (logger.isTraceEnabled()) {
if (nodesStoreFilesMetaData.failures().length > 0) { if (nodesStoreFilesMetaData.failures().length > 0) {
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:"); StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:");
for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) { for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) {
@ -318,7 +375,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
} }
sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage()); sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage());
} }
logger.debug(sb.toString()); logger.trace(sb.toString());
} }
} }
@ -327,46 +384,8 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData()); shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
} }
} }
cachedStores.put(shard.shardId(), shardStores);
} else {
// clean nodes that have failed
for (DiscoveryNode node : shardStores.keySet()) {
if (!nodes.nodeExists(node.id())) {
shardStores.remove(node);
}
}
// we have stored cached from before, see if the nodes changed, if they have, go fetch again
Set<String> fetchedNodes = Sets.newHashSet();
for (DiscoveryNode node : nodes.dataNodes().values()) {
if (!shardStores.containsKey(node)) {
fetchedNodes.add(node.id());
}
}
if (!fetchedNodes.isEmpty()) {
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, fetchedNodes, listTimeout).actionGet();
if (logger.isTraceEnabled()) {
if (nodesStoreFilesMetaData.failures().length > 0) {
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:");
for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) {
Throwable cause = ExceptionsHelper.unwrapCause(nodesStoreFilesMetaData.failures()[i]);
if (cause instanceof ConnectTransportException) {
continue;
}
sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage());
}
logger.trace(sb.toString());
}
}
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
if (nodeStoreFilesMetaData.storeFilesMetaData() != null) {
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
}
}
}
} }
return shardStores; return shardStores;
} }
} }

View File

@ -33,11 +33,13 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;
@ -57,8 +59,8 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
return this; return this;
} }
public ActionFuture<NodesLocalGatewayStartedShards> list(Set<String> nodesIds, @Nullable TimeValue timeout) { public ActionFuture<NodesLocalGatewayStartedShards> list(ShardId shardId, Set<String> nodesIds, @Nullable TimeValue timeout) {
return execute(new Request(nodesIds).timeout(timeout)); return execute(new Request(shardId, nodesIds).timeout(timeout));
} }
@Override protected String executor() { @Override protected String executor() {
@ -86,7 +88,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
} }
@Override protected NodeRequest newNodeRequest(String nodeId, Request request) { @Override protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId); return new NodeRequest(request.shardId(), nodeId);
} }
@Override protected NodeLocalGatewayStartedShards newNodeResponse() { @Override protected NodeLocalGatewayStartedShards newNodeResponse() {
@ -109,7 +111,13 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
} }
@Override protected NodeLocalGatewayStartedShards nodeOperation(NodeRequest request) throws ElasticSearchException { @Override protected NodeLocalGatewayStartedShards nodeOperation(NodeRequest request) throws ElasticSearchException {
return new NodeLocalGatewayStartedShards(clusterService.localNode(), gateway.currentStartedShards()); for (Map.Entry<ShardId, Long> entry : gateway.currentStartedShards().shards().entrySet()) {
if (entry.getKey().equals(request.shardId)) {
assert entry.getValue() != null;
return new NodeLocalGatewayStartedShards(clusterService.localNode(), entry.getValue());
}
}
return new NodeLocalGatewayStartedShards(clusterService.localNode(), -1);
} }
@Override protected boolean accumulateExceptions() { @Override protected boolean accumulateExceptions() {
@ -118,11 +126,18 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
static class Request extends NodesOperationRequest { static class Request extends NodesOperationRequest {
private ShardId shardId;
public Request() { public Request() {
} }
public Request(Set<String> nodesIds) { public Request(ShardId shardId, Set<String> nodesIds) {
super(nodesIds.toArray(new String[nodesIds.size()])); super(nodesIds.toArray(new String[nodesIds.size()]));
this.shardId = shardId;
}
public ShardId shardId() {
return this.shardId;
} }
@Override public Request timeout(TimeValue timeout) { @Override public Request timeout(TimeValue timeout) {
@ -132,10 +147,12 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
shardId = ShardId.readShardId(in);
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
shardId.writeTo(out);
} }
} }
@ -176,53 +193,55 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
static class NodeRequest extends NodeOperationRequest { static class NodeRequest extends NodeOperationRequest {
ShardId shardId;
NodeRequest() { NodeRequest() {
} }
NodeRequest(String nodeId) { NodeRequest(ShardId shardId, String nodeId) {
super(nodeId); super(nodeId);
this.shardId = shardId;
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
shardId = ShardId.readShardId(in);
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
shardId.writeTo(out);
} }
} }
public static class NodeLocalGatewayStartedShards extends NodeOperationResponse { public static class NodeLocalGatewayStartedShards extends NodeOperationResponse {
private LocalGatewayStartedShards state; private long version = -1;
NodeLocalGatewayStartedShards() { NodeLocalGatewayStartedShards() {
} }
public NodeLocalGatewayStartedShards(DiscoveryNode node, LocalGatewayStartedShards state) { public NodeLocalGatewayStartedShards(DiscoveryNode node, long version) {
super(node); super(node);
this.state = state; this.version = version;
} }
public LocalGatewayStartedShards state() { public boolean hasVersion() {
return state; return version != -1;
}
public long version() {
return this.version;
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
if (in.readBoolean()) { version = in.readLong();
state = LocalGatewayStartedShards.Builder.readFrom(in);
}
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
if (state == null) { out.writeLong(version);
out.writeBoolean(false);
} else {
out.writeBoolean(true);
LocalGatewayStartedShards.Builder.writeTo(state, out);
}
} }
} }
} }