diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java index 93c0223285c..e2a61bb9091 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.*; +import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -54,19 +55,19 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { private final Node node; - private final TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData; + private final TransportNodesListShardStoreMetaData listShardStoreMetaData; private final TimeValue listTimeout; private final ConcurrentMap cachedCommitPoints = ConcurrentCollections.newConcurrentMap(); - private final ConcurrentMap> cachedStores = ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMap> cachedStores = ConcurrentCollections.newConcurrentMap(); @Inject public BlobReuseExistingNodeAllocation(Settings settings, Node node, TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) { super(settings); this.node = node; // YACK!, we need the Gateway, but it creates crazy circular dependency - this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData; + this.listShardStoreMetaData = transportNodesListShardStoreMetaData; this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30)); } @@ -119,7 +120,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { continue; } - ConcurrentMap shardStores = buildShardStores(nodes, shard); + Map shardStores = buildShardStores(nodes, shard); long lastSizeMatched = 0; DiscoveryNode lastDiscoNodeMatched = null; @@ -248,12 +249,32 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { return changed; } - private ConcurrentMap buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) { - ConcurrentMap shardStores = cachedStores.get(shard.shardId()); + private Map buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) { + Map shardStores = cachedStores.get(shard.shardId()); + Set nodesIds; if (shardStores == null) { - shardStores = ConcurrentCollections.newConcurrentMap(); - TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet(); - if (logger.isDebugEnabled()) { + shardStores = Maps.newHashMap(); + cachedStores.put(shard.shardId(), shardStores); + nodesIds = nodes.dataNodes().keySet(); + } else { + nodesIds = Sets.newHashSet(); + // clean nodes that have failed + for (DiscoveryNode node : shardStores.keySet()) { + if (!nodes.nodeExists(node.id())) { + shardStores.remove(node); + } + } + + for (DiscoveryNode node : nodes.dataNodes().values()) { + if (!shardStores.containsKey(node)) { + nodesIds.add(node.id()); + } + } + } + + if (!nodesIds.isEmpty()) { + TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodesIds, listTimeout).actionGet(); + if (logger.isTraceEnabled()) { if (nodesStoreFilesMetaData.failures().length > 0) { StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:"); for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) { @@ -263,7 +284,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { } sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage()); } - logger.debug(sb.toString()); + logger.trace(sb.toString()); } } @@ -272,46 +293,8 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData()); } } - cachedStores.put(shard.shardId(), shardStores); - } else { - // clean nodes that have failed - for (DiscoveryNode node : shardStores.keySet()) { - if (!nodes.nodeExists(node.id())) { - shardStores.remove(node); - } - } - - // we have stored cached from before, see if the nodes changed, if they have, go fetch again - Set fetchedNodes = Sets.newHashSet(); - for (DiscoveryNode node : nodes.dataNodes().values()) { - if (!shardStores.containsKey(node)) { - fetchedNodes.add(node.id()); - } - } - - if (!fetchedNodes.isEmpty()) { - TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, fetchedNodes, listTimeout).actionGet(); - if (logger.isDebugEnabled()) { - if (nodesStoreFilesMetaData.failures().length > 0) { - StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:"); - for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) { - Throwable cause = ExceptionsHelper.unwrapCause(nodesStoreFilesMetaData.failures()[i]); - if (cause instanceof ConnectTransportException) { - continue; - } - sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage()); - } - logger.debug(sb.toString()); - } - } - - for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) { - if (nodeStoreFilesMetaData.storeFilesMetaData() != null) { - shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData()); - } - } - } } + return shardStores; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java index 2ec7c987bd3..6b01cc6cd33 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java @@ -28,9 +28,12 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.*; +import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.trove.iterator.TObjectLongIterator; +import org.elasticsearch.common.trove.map.hash.TObjectLongHashMap; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -53,7 +56,9 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { private final TransportNodesListShardStoreMetaData listShardStoreMetaData; - private final ConcurrentMap> cachedStores = ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMap> cachedStores = ConcurrentCollections.newConcurrentMap(); + + private final ConcurrentMap> cachedShardsState = ConcurrentCollections.newConcurrentMap(); private final TimeValue listTimeout; @@ -72,12 +77,14 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { @Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { for (ShardRouting shardRouting : allocation.startedShards()) { cachedStores.remove(shardRouting.shardId()); + cachedShardsState.remove(shardRouting.shardId()); } } @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { ShardRouting failedShard = allocation.failedShard(); cachedStores.remove(failedShard.shardId()); + cachedShardsState.remove(failedShard.shardId()); } @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { @@ -86,7 +93,6 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { RoutingNodes routingNodes = allocation.routingNodes(); // First, handle primaries, they must find a place to be allocated on here - TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards nodesState = null; Iterator unassignedIterator = routingNodes.unassigned().iterator(); while (unassignedIterator.hasNext()) { MutableShardRouting shard = unassignedIterator.next(); @@ -100,44 +106,27 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { continue; } - if (nodesState == null) { - Set nodesIds = Sets.newHashSet(); - nodesIds.addAll(nodes.dataNodes().keySet()); - nodesState = listGatewayStartedShards.list(nodesIds, null).actionGet(); - - if (nodesState.failures().length > 0) { - StringBuilder sb = new StringBuilder("failures when trying to list started shards on nodes:"); - for (int i = 0; i < nodesState.failures().length; i++) { - Throwable cause = ExceptionsHelper.unwrapCause(nodesState.failures()[i]); - if (cause instanceof ConnectTransportException) { - continue; - } - sb.append("\n -> ").append(nodesState.failures()[i].getDetailedMessage()); - } - logger.warn(sb.toString()); - } - } + TObjectLongHashMap nodesState = buildShardStates(nodes, shard); int numberOfAllocationsFound = 0; long highestVersion = -1; DiscoveryNode nodeWithHighestVersion = null; - for (TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards nodeState : nodesState) { - if (nodeState.state() == null) { - continue; - } + for (TObjectLongIterator it = nodesState.iterator(); it.hasNext();) { + it.advance(); + DiscoveryNode node = it.key(); + long version = it.value(); // since we don't check in NO allocation, we need to double check here - if (allocation.shouldIgnoreShardForNode(shard.shardId(), nodeState.node().id())) { + if (allocation.shouldIgnoreShardForNode(shard.shardId(), node.id())) { continue; } - Long version = nodeState.state().shards().get(shard.shardId()); - if (version != null) { + if (version != -1) { numberOfAllocationsFound++; if (highestVersion == -1) { - nodeWithHighestVersion = nodeState.node(); + nodeWithHighestVersion = node; highestVersion = version; } else { if (version > highestVersion) { - nodeWithHighestVersion = nodeState.node(); + nodeWithHighestVersion = node; highestVersion = version; } } @@ -221,7 +210,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { continue; } - ConcurrentMap shardStores = buildShardStores(nodes, shard); + Map shardStores = buildShardStores(nodes, shard); long lastSizeMatched = 0; DiscoveryNode lastDiscoNodeMatched = null; @@ -303,12 +292,80 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { return changed; } - private ConcurrentMap buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) { - ConcurrentMap shardStores = cachedStores.get(shard.shardId()); + private TObjectLongHashMap buildShardStates(DiscoveryNodes nodes, MutableShardRouting shard) { + TObjectLongHashMap shardStates = cachedShardsState.get(shard.shardId()); + Set nodeIds; + if (shardStates == null) { + shardStates = new TObjectLongHashMap(); + cachedShardsState.put(shard.shardId(), shardStates); + nodeIds = nodes.dataNodes().keySet(); + } else { + // clean nodes that have failed + for (DiscoveryNode node : shardStates.keySet()) { + if (!nodes.nodeExists(node.id())) { + shardStates.remove(node); + } + } + nodeIds = Sets.newHashSet(); + // we have stored cached from before, see if the nodes changed, if they have, go fetch again + for (DiscoveryNode node : nodes.dataNodes().values()) { + if (!shardStates.containsKey(node)) { + nodeIds.add(node.id()); + } + } + } + if (nodeIds.isEmpty()) { + return shardStates; + } + + TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), nodes.dataNodes().keySet(), listTimeout).actionGet(); + if (logger.isDebugEnabled()) { + if (response.failures().length > 0) { + StringBuilder sb = new StringBuilder(shard + ": failures when trying to list shards on nodes:"); + for (int i = 0; i < response.failures().length; i++) { + Throwable cause = ExceptionsHelper.unwrapCause(response.failures()[i]); + if (cause instanceof ConnectTransportException) { + continue; + } + sb.append("\n -> ").append(response.failures()[i].getDetailedMessage()); + } + logger.debug(sb.toString()); + } + } + + for (TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards nodeShardState : response) { + // -1 version means it does not exists, which is what the API returns, and what we expect to + shardStates.put(nodeShardState.node(), nodeShardState.version()); + } + return shardStates; + } + + private Map buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) { + Map shardStores = cachedStores.get(shard.shardId()); + Set nodesIds; if (shardStores == null) { - shardStores = ConcurrentCollections.newConcurrentMap(); - TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet(); - if (logger.isDebugEnabled()) { + shardStores = Maps.newHashMap(); + cachedStores.put(shard.shardId(), shardStores); + nodesIds = nodes.dataNodes().keySet(); + } else { + nodesIds = Sets.newHashSet(); + // clean nodes that have failed + for (DiscoveryNode node : shardStores.keySet()) { + if (!nodes.nodeExists(node.id())) { + shardStores.remove(node); + } + } + + for (DiscoveryNode node : nodes.dataNodes().values()) { + if (!shardStores.containsKey(node)) { + nodesIds.add(node.id()); + } + } + } + + if (!nodesIds.isEmpty()) { + TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodesIds, listTimeout).actionGet(); + if (logger.isTraceEnabled()) { if (nodesStoreFilesMetaData.failures().length > 0) { StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:"); for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) { @@ -318,7 +375,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { } sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage()); } - logger.debug(sb.toString()); + logger.trace(sb.toString()); } } @@ -327,46 +384,8 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData()); } } - cachedStores.put(shard.shardId(), shardStores); - } else { - // clean nodes that have failed - for (DiscoveryNode node : shardStores.keySet()) { - if (!nodes.nodeExists(node.id())) { - shardStores.remove(node); - } - } - - // we have stored cached from before, see if the nodes changed, if they have, go fetch again - Set fetchedNodes = Sets.newHashSet(); - for (DiscoveryNode node : nodes.dataNodes().values()) { - if (!shardStores.containsKey(node)) { - fetchedNodes.add(node.id()); - } - } - - if (!fetchedNodes.isEmpty()) { - TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, fetchedNodes, listTimeout).actionGet(); - if (logger.isTraceEnabled()) { - if (nodesStoreFilesMetaData.failures().length > 0) { - StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:"); - for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) { - Throwable cause = ExceptionsHelper.unwrapCause(nodesStoreFilesMetaData.failures()[i]); - if (cause instanceof ConnectTransportException) { - continue; - } - sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage()); - } - logger.trace(sb.toString()); - } - } - - for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) { - if (nodeStoreFilesMetaData.storeFilesMetaData() != null) { - shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData()); - } - } - } } + return shardStores; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java index 3ecfdaf4990..84d236aafce 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java @@ -33,11 +33,13 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -57,8 +59,8 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat return this; } - public ActionFuture list(Set nodesIds, @Nullable TimeValue timeout) { - return execute(new Request(nodesIds).timeout(timeout)); + public ActionFuture list(ShardId shardId, Set nodesIds, @Nullable TimeValue timeout) { + return execute(new Request(shardId, nodesIds).timeout(timeout)); } @Override protected String executor() { @@ -86,7 +88,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat } @Override protected NodeRequest newNodeRequest(String nodeId, Request request) { - return new NodeRequest(nodeId); + return new NodeRequest(request.shardId(), nodeId); } @Override protected NodeLocalGatewayStartedShards newNodeResponse() { @@ -109,7 +111,13 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat } @Override protected NodeLocalGatewayStartedShards nodeOperation(NodeRequest request) throws ElasticSearchException { - return new NodeLocalGatewayStartedShards(clusterService.localNode(), gateway.currentStartedShards()); + for (Map.Entry entry : gateway.currentStartedShards().shards().entrySet()) { + if (entry.getKey().equals(request.shardId)) { + assert entry.getValue() != null; + return new NodeLocalGatewayStartedShards(clusterService.localNode(), entry.getValue()); + } + } + return new NodeLocalGatewayStartedShards(clusterService.localNode(), -1); } @Override protected boolean accumulateExceptions() { @@ -118,11 +126,18 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat static class Request extends NodesOperationRequest { + private ShardId shardId; + public Request() { } - public Request(Set nodesIds) { + public Request(ShardId shardId, Set nodesIds) { super(nodesIds.toArray(new String[nodesIds.size()])); + this.shardId = shardId; + } + + public ShardId shardId() { + return this.shardId; } @Override public Request timeout(TimeValue timeout) { @@ -132,10 +147,12 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); + shardId = ShardId.readShardId(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + shardId.writeTo(out); } } @@ -176,53 +193,55 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat static class NodeRequest extends NodeOperationRequest { + ShardId shardId; + NodeRequest() { } - NodeRequest(String nodeId) { + NodeRequest(ShardId shardId, String nodeId) { super(nodeId); + this.shardId = shardId; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); + shardId = ShardId.readShardId(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + shardId.writeTo(out); } } public static class NodeLocalGatewayStartedShards extends NodeOperationResponse { - private LocalGatewayStartedShards state; + private long version = -1; NodeLocalGatewayStartedShards() { } - public NodeLocalGatewayStartedShards(DiscoveryNode node, LocalGatewayStartedShards state) { + public NodeLocalGatewayStartedShards(DiscoveryNode node, long version) { super(node); - this.state = state; + this.version = version; } - public LocalGatewayStartedShards state() { - return state; + public boolean hasVersion() { + return version != -1; + } + + public long version() { + return this.version; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - if (in.readBoolean()) { - state = LocalGatewayStartedShards.Builder.readFrom(in); - } + version = in.readLong(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - if (state == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - LocalGatewayStartedShards.Builder.writeTo(state, out); - } + out.writeLong(version); } } }