From 9f5d01ca4cc06bf7affdb5a656dc838c383e0c4c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 26 Nov 2013 11:56:37 +0100 Subject: [PATCH] Cut DiscoveryNodes over to ImmutableOpenMap. --- .../TransportNodesShutdownAction.java | 11 +-- .../TransportClientNodesService.java | 5 +- .../cluster/node/DiscoveryNodes.java | 85 +++++++++++-------- .../cluster/routing/RoutingNodes.java | 4 +- .../routing/allocation/AllocationService.java | 5 +- .../discovery/zen/ZenDiscovery.java | 22 ++--- .../zen/elect/ElectMasterService.java | 6 +- .../BlobReuseExistingGatewayAllocator.java | 20 +++-- .../gateway/local/LocalGateway.java | 9 +- .../gateway/local/LocalGatewayAllocator.java | 30 ++++--- .../TransportNodesListGatewayMetaState.java | 7 +- ...ransportNodesListGatewayStartedShards.java | 6 +- .../TransportNodesListShardStoreMetaData.java | 2 +- 13 files changed, 120 insertions(+), 92 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportNodesShutdownAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportNodesShutdownAction.java index 45f2701cd85..260abf84c1d 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportNodesShutdownAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportNodesShutdownAction.java @@ -19,7 +19,8 @@ package org.elasticsearch.action.admin.cluster.node.shutdown; -import com.google.common.collect.Sets; +import com.carrotsearch.hppc.ObjectOpenHashSet; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.action.ActionListener; @@ -38,7 +39,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; -import java.util.Set; import java.util.concurrent.CountDownLatch; /** @@ -101,7 +101,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc if (disabled) { throw new ElasticSearchIllegalStateException("Shutdown is disabled"); } - final Set nodes = Sets.newHashSet(); + final ObjectOpenHashSet nodes = new ObjectOpenHashSet(); if (state.nodes().isAllNodes(request.nodesIds)) { logger.info("[cluster_shutdown]: requested, shutting down in [{}]", request.delay); nodes.addAll(state.nodes().dataNodes().values()); @@ -119,7 +119,8 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc clusterService.stop(); final CountDownLatch latch = new CountDownLatch(nodes.size()); - for (final DiscoveryNode node : nodes) { + for (ObjectCursor cursor : nodes) { + final DiscoveryNode node = cursor.value; if (node.id().equals(state.nodes().masterNodeId())) { // don't shutdown the master yet... latch.countDown(); @@ -219,7 +220,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc }); t.start(); } - listener.onResponse(new NodesShutdownResponse(clusterName, nodes.toArray(new DiscoveryNode[nodes.size()]))); + listener.onResponse(new NodesShutdownResponse(clusterName, nodes.toArray(DiscoveryNode.class))); } private class NodeShutdownRequestHandler extends BaseTransportRequestHandler { diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index a3ed6da3947..e8aa4106151 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -19,6 +19,7 @@ package org.elasticsearch.client.transport; +import com.carrotsearch.hppc.cursors.ObjectCursor; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -472,8 +473,8 @@ public class TransportClientNodesService extends AbstractComponent { if (!ignoreClusterName && !clusterName.equals(clusterStateResponse.getClusterName())) { logger.warn("node {} not part of the cluster {}, ignoring...", clusterStateResponse.getState().nodes().localNode(), clusterName); } - for (DiscoveryNode node : clusterStateResponse.getState().nodes().dataNodes().values()) { - newNodes.add(node); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().dataNodes().values()) { + newNodes.add(cursor.value); } } diff --git a/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java index b00bb4d290d..8e563638a20 100644 --- a/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java +++ b/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java @@ -19,26 +19,27 @@ package org.elasticsearch.cluster.node; +import com.carrotsearch.hppc.ObjectOpenHashSet; +import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.UnmodifiableIterator; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.transport.TransportAddress; import java.io.IOException; -import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import static com.google.common.collect.Lists.newArrayList; -import static com.google.common.collect.Maps.newHashMap; /** * This class holds all {@link DiscoveryNode} in the cluster and provides convinience methods to @@ -48,17 +49,17 @@ public class DiscoveryNodes implements Iterable { public static final DiscoveryNodes EMPTY_NODES = builder().build(); - private final ImmutableMap nodes; + private final ImmutableOpenMap nodes; - private final ImmutableMap dataNodes; + private final ImmutableOpenMap dataNodes; - private final ImmutableMap masterNodes; + private final ImmutableOpenMap masterNodes; private final String masterNodeId; private final String localNodeId; - private DiscoveryNodes(ImmutableMap nodes, ImmutableMap dataNodes, ImmutableMap masterNodes, String masterNodeId, String localNodeId) { + private DiscoveryNodes(ImmutableOpenMap nodes, ImmutableOpenMap dataNodes, ImmutableOpenMap masterNodes, String masterNodeId, String localNodeId) { this.nodes = nodes; this.dataNodes = dataNodes; this.masterNodes = masterNodes; @@ -68,7 +69,18 @@ public class DiscoveryNodes implements Iterable { @Override public UnmodifiableIterator iterator() { - return nodes.values().iterator(); + final Iterator> cursor = nodes.values().iterator(); + return new UnmodifiableIterator() { + @Override + public boolean hasNext() { + return cursor.hasNext(); + } + + @Override + public DiscoveryNode next() { + return cursor.next().value; + } + }; } /** @@ -113,7 +125,7 @@ public class DiscoveryNodes implements Iterable { * * @return {@link Map} of the discovered nodes arranged by their ids */ - public ImmutableMap nodes() { + public ImmutableOpenMap nodes() { return this.nodes; } @@ -122,7 +134,7 @@ public class DiscoveryNodes implements Iterable { * * @return {@link Map} of the discovered nodes arranged by their ids */ - public ImmutableMap getNodes() { + public ImmutableOpenMap getNodes() { return nodes(); } @@ -131,7 +143,7 @@ public class DiscoveryNodes implements Iterable { * * @return {@link Map} of the discovered data nodes arranged by their ids */ - public ImmutableMap dataNodes() { + public ImmutableOpenMap dataNodes() { return this.dataNodes; } @@ -140,7 +152,7 @@ public class DiscoveryNodes implements Iterable { * * @return {@link Map} of the discovered data nodes arranged by their ids */ - public ImmutableMap getDataNodes() { + public ImmutableOpenMap getDataNodes() { return dataNodes(); } @@ -149,7 +161,7 @@ public class DiscoveryNodes implements Iterable { * * @return {@link Map} of the discovered master nodes arranged by their ids */ - public ImmutableMap masterNodes() { + public ImmutableOpenMap masterNodes() { return this.masterNodes; } @@ -158,7 +170,7 @@ public class DiscoveryNodes implements Iterable { * * @return {@link Map} of the discovered master nodes arranged by their ids */ - public ImmutableMap getMasterNodes() { + public ImmutableOpenMap getMasterNodes() { return masterNodes(); } @@ -167,8 +179,10 @@ public class DiscoveryNodes implements Iterable { * * @return {@link Map} of the discovered master and data nodes arranged by their ids */ - public ImmutableMap masterAndDataNodes() { - return MapBuilder.newMapBuilder().putAll(dataNodes).putAll(masterNodes).immutableMap(); + public ImmutableOpenMap masterAndDataNodes() { + ImmutableOpenMap.Builder nodes = ImmutableOpenMap.builder(dataNodes); + nodes.putAll(masterNodes); + return nodes.build(); } /** @@ -270,7 +284,8 @@ public class DiscoveryNodes implements Iterable { * @return node identified by the given address or null if no such node exists */ public DiscoveryNode findByAddress(TransportAddress address) { - for (DiscoveryNode node : nodes.values()) { + for (ObjectCursor cursor : nodes.values()) { + DiscoveryNode node = cursor.value; if (node.address().equals(address)) { return node; } @@ -310,7 +325,7 @@ public class DiscoveryNodes implements Iterable { } return nodesIds; } else { - Set resolvedNodesIds = new HashSet(nodesIds.length); + ObjectOpenHashSet resolvedNodesIds = new ObjectOpenHashSet(nodesIds.length); for (String nodeId : nodesIds) { if (nodeId.equals("_local")) { String localNodeId = localNodeId(); @@ -342,15 +357,15 @@ public class DiscoveryNodes implements Iterable { String matchAttrValue = nodeId.substring(index + 1); if ("data".equals(matchAttrName)) { if (Booleans.parseBoolean(matchAttrValue, true)) { - resolvedNodesIds.addAll(dataNodes.keySet()); + resolvedNodesIds.addAll(dataNodes.keys()); } else { - resolvedNodesIds.removeAll(dataNodes.keySet()); + resolvedNodesIds.removeAll(dataNodes.keys()); } } else if ("master".equals(matchAttrName)) { if (Booleans.parseBoolean(matchAttrValue, true)) { - resolvedNodesIds.addAll(masterNodes.keySet()); + resolvedNodesIds.addAll(masterNodes.keys()); } else { - resolvedNodesIds.removeAll(masterNodes.keySet()); + resolvedNodesIds.removeAll(masterNodes.keys()); } } else { for (DiscoveryNode node : this) { @@ -366,7 +381,7 @@ public class DiscoveryNodes implements Iterable { } } } - return resolvedNodesIds.toArray(new String[resolvedNodesIds.size()]); + return resolvedNodesIds.toArray(String.class); } } @@ -553,18 +568,18 @@ public class DiscoveryNodes implements Iterable { public static class Builder { - private Map nodes = newHashMap(); + private final ImmutableOpenMap.Builder nodes; private String masterNodeId; private String localNodeId; public Builder() { - + nodes = ImmutableOpenMap.builder(); } public Builder(DiscoveryNodes nodes) { this.masterNodeId = nodes.masterNodeId(); this.localNodeId = nodes.localNodeId(); - this.nodes.putAll(nodes.nodes()); + this.nodes = ImmutableOpenMap.builder(nodes.nodes()); } public Builder put(DiscoveryNode node) { @@ -588,17 +603,17 @@ public class DiscoveryNodes implements Iterable { } public DiscoveryNodes build() { - ImmutableMap.Builder dataNodesBuilder = ImmutableMap.builder(); - ImmutableMap.Builder masterNodesBuilder = ImmutableMap.builder(); - for (Map.Entry nodeEntry : nodes.entrySet()) { - if (nodeEntry.getValue().dataNode()) { - dataNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue()); + ImmutableOpenMap.Builder dataNodesBuilder = ImmutableOpenMap.builder(); + ImmutableOpenMap.Builder masterNodesBuilder = ImmutableOpenMap.builder(); + for (ObjectObjectCursor nodeEntry : nodes) { + if (nodeEntry.value.dataNode()) { + dataNodesBuilder.put(nodeEntry.key, nodeEntry.value); } - if (nodeEntry.getValue().masterNode()) { - masterNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue()); + if (nodeEntry.value.masterNode()) { + masterNodesBuilder.put(nodeEntry.key, nodeEntry.value); } } - return new DiscoveryNodes(ImmutableMap.copyOf(nodes), dataNodesBuilder.build(), masterNodesBuilder.build(), masterNodeId, localNodeId); + return new DiscoveryNodes(nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), masterNodeId, localNodeId); } public static void writeTo(DiscoveryNodes nodes, StreamOutput out) throws IOException { diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 49b5c228fc5..c2d4e0cb78b 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -64,8 +64,8 @@ public class RoutingNodes implements Iterable { this.routingTable = clusterState.routingTable(); Map> nodesToShards = newHashMap(); // fill in the nodeToShards with the "live" nodes - for (DiscoveryNode node : clusterState.nodes().dataNodes().values()) { - nodesToShards.put(node.id(), new ArrayList()); + for (ObjectCursor cursor : clusterState.nodes().dataNodes().values()) { + nodesToShards.put(cursor.value.id(), new ArrayList()); } // fill in the inverse of node -> shards allocated diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index dc4ae280678..edaee306f80 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing.allocation; +import com.carrotsearch.hppc.cursors.ObjectCursor; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.elasticsearch.ElasticSearchException; @@ -169,7 +170,6 @@ public class AllocationService extends AbstractComponent { // shuffle the unassigned nodes, just so we won't have things like poison failed shards Collections.shuffle(routingNodes.unassigned()); RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo()); - Iterable dataNodes = allocation.nodes().dataNodes().values(); boolean changed = false; // first, clear from the shards any node id they used to belong to that is now dead changed |= deassociateDeadNodes(allocation); @@ -327,7 +327,8 @@ public class AllocationService extends AbstractComponent { * new nodes); */ private void applyNewNodes(RoutingAllocation allocation) { - for (DiscoveryNode node : allocation.nodes().dataNodes().values()) { + for (ObjectCursor cursor : allocation.nodes().dataNodes().values()) { + DiscoveryNode node = cursor.value; if (!allocation.routingNodes().nodesToShards().containsKey(node.id())) { RoutingNode routingNode = new RoutingNode(node.id(), node); allocation.routingNodes().nodesToShards().put(node.id(), routingNode); diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 4604e54fb58..9176006d999 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -315,7 +315,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen // update the fact that we are the master... latestDiscoNodes = builder.build(); ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NO_MASTER_BLOCK).build(); - return ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build(); + return ClusterState.builder(currentState).nodes(latestDiscoNodes).blocks(clusterBlocks).build(); } @Override @@ -483,34 +483,34 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return currentState; } - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes()) + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(currentState.nodes()) // make sure the old master node, which has failed, is not part of the nodes we publish .remove(masterNode.id()) - .masterNodeId(null); + .masterNodeId(null).build(); - if (!electMaster.hasEnoughMasterNodes(nodesBuilder.build())) { - return rejoin(ClusterState.builder(currentState).nodes(nodesBuilder).build(), "not enough master nodes after master left (reason = " + reason + ")"); + if (!electMaster.hasEnoughMasterNodes(discoveryNodes)) { + return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "not enough master nodes after master left (reason = " + reason + ")"); } - final DiscoveryNode electedMaster = electMaster.electMaster(nodesBuilder.build()); // elect master + final DiscoveryNode electedMaster = electMaster.electMaster(discoveryNodes); // elect master if (localNode.equals(electedMaster)) { master = true; masterFD.stop("got elected as new master since master left (reason = " + reason + ")"); nodesFD.start(); - nodesBuilder.masterNodeId(localNode.id()); - latestDiscoNodes = nodesBuilder.build(); + discoveryNodes = DiscoveryNodes.builder(discoveryNodes).masterNodeId(localNode.id()).build(); + latestDiscoNodes = discoveryNodes; return ClusterState.builder(currentState).nodes(latestDiscoNodes).build(); } else { nodesFD.stop(); if (electedMaster != null) { - nodesBuilder.masterNodeId(electedMaster.id()); + discoveryNodes = DiscoveryNodes.builder(discoveryNodes).masterNodeId(electedMaster.id()).build(); masterFD.restart(electedMaster, "possible elected master since master left (reason = " + reason + ")"); - latestDiscoNodes = nodesBuilder.build(); + latestDiscoNodes = discoveryNodes; return ClusterState.builder(currentState) .nodes(latestDiscoNodes) .build(); } else { - return rejoin(ClusterState.builder(currentState).nodes(nodesBuilder.build()).build(), "master_left and no other node elected to become master"); + return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "master_left and no other node elected to become master"); } } } diff --git a/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java b/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java index 611842e0f6c..b966a09cc80 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java +++ b/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java @@ -19,12 +19,14 @@ package org.elasticsearch.discovery.zen.elect; +import com.carrotsearch.hppc.ObjectContainer; import com.google.common.collect.Lists; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -70,8 +72,8 @@ public class ElectMasterService extends AbstractComponent { /** * Returns a list of the next possible masters. */ - public DiscoveryNode[] nextPossibleMasters(Iterable nodes, int numberOfPossibleMasters) { - List sortedNodes = sortedMasterNodes(nodes); + public DiscoveryNode[] nextPossibleMasters(ObjectContainer nodes, int numberOfPossibleMasters) { + List sortedNodes = sortedMasterNodes(Arrays.asList(nodes.toArray(DiscoveryNode.class))); if (sortedNodes == null) { return new DiscoveryNode[0]; } diff --git a/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java b/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java index 8b69aed56b9..d0c6a3397a2 100644 --- a/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java +++ b/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java @@ -19,8 +19,9 @@ package org.elasticsearch.gateway.blobstore; +import com.carrotsearch.hppc.ObjectOpenHashSet; +import com.carrotsearch.hppc.cursors.ObjectCursor; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -50,7 +51,6 @@ import org.elasticsearch.transport.ConnectTransportException; import java.util.Iterator; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentMap; /** @@ -115,8 +115,8 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme // pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing boolean canBeAllocatedToAtLeastOneNode = false; - for (DiscoveryNode discoNode : nodes.dataNodes().values()) { - RoutingNode node = routingNodes.node(discoNode.id()); + for (ObjectCursor cursor : nodes.dataNodes().values()) { + RoutingNode node = routingNodes.node(cursor.value.id()); if (node == null) { continue; } @@ -263,13 +263,13 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme private Map buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) { Map shardStores = cachedStores.get(shard.shardId()); - Set nodesIds; + ObjectOpenHashSet nodesIds; if (shardStores == null) { shardStores = Maps.newHashMap(); cachedStores.put(shard.shardId(), shardStores); - nodesIds = nodes.dataNodes().keySet(); + nodesIds = ObjectOpenHashSet.from(nodes.dataNodes().keys()); } else { - nodesIds = Sets.newHashSet(); + nodesIds = ObjectOpenHashSet.newInstance(); // clean nodes that have failed for (Iterator it = shardStores.keySet().iterator(); it.hasNext(); ) { DiscoveryNode node = it.next(); @@ -278,7 +278,8 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme } } - for (DiscoveryNode node : nodes.dataNodes().values()) { + for (ObjectCursor cursor : nodes.dataNodes().values()) { + DiscoveryNode node = cursor.value; if (!shardStores.containsKey(node)) { nodesIds.add(node.id()); } @@ -286,7 +287,8 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme } if (!nodesIds.isEmpty()) { - TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodesIds, listTimeout).actionGet(); + String[] nodesIdsArray = nodesIds.toArray(String.class); + TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodesIdsArray, listTimeout).actionGet(); if (logger.isTraceEnabled()) { if (nodesStoreFilesMetaData.failures().length > 0) { StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:"); diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index 6f7e02d60e7..942c4548b00 100644 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -20,8 +20,8 @@ package org.elasticsearch.gateway.local; import com.carrotsearch.hppc.ObjectFloatOpenHashMap; +import com.carrotsearch.hppc.ObjectOpenHashSet; import com.carrotsearch.hppc.cursors.ObjectCursor; -import com.google.common.collect.Sets; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -43,8 +43,6 @@ import org.elasticsearch.gateway.local.state.meta.TransportNodesListGatewayMetaS import org.elasticsearch.gateway.local.state.shards.LocalGatewayShardsState; import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule; -import java.util.Set; - /** * */ @@ -99,10 +97,9 @@ public class LocalGateway extends AbstractLifecycleComponent implements @Override public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException { - Set nodesIds = Sets.newHashSet(); - nodesIds.addAll(clusterService.state().nodes().masterNodes().keySet()); + ObjectOpenHashSet nodesIds = ObjectOpenHashSet.from(clusterService.state().nodes().masterNodes().keys()); logger.trace("performing state recovery from {}", nodesIds); - TransportNodesListGatewayMetaState.NodesLocalGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet(); + TransportNodesListGatewayMetaState.NodesLocalGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds.toArray(String.class), null).actionGet(); int requiredAllocation = 1; diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java index 8208a265321..259e4c23bea 100644 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java +++ b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java @@ -20,6 +20,8 @@ package org.elasticsearch.gateway.local; import com.carrotsearch.hppc.ObjectLongOpenHashMap; +import com.carrotsearch.hppc.ObjectOpenHashSet; +import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.predicates.ObjectPredicate; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -256,8 +258,8 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA // pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing boolean canBeAllocatedToAtLeastOneNode = false; - for (DiscoveryNode discoNode : nodes.dataNodes().values()) { - RoutingNode node = routingNodes.node(discoNode.id()); + for (ObjectCursor cursor : nodes.dataNodes().values()) { + RoutingNode node = routingNodes.node(cursor.value.id()); if (node == null) { continue; } @@ -360,11 +362,11 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA private ObjectLongOpenHashMap buildShardStates(final DiscoveryNodes nodes, MutableShardRouting shard) { ObjectLongOpenHashMap shardStates = cachedShardsState.get(shard.shardId()); - Set nodeIds; + ObjectOpenHashSet nodeIds; if (shardStates == null) { shardStates = new ObjectLongOpenHashMap(); cachedShardsState.put(shard.shardId(), shardStates); - nodeIds = nodes.dataNodes().keySet(); + nodeIds = ObjectOpenHashSet.from(nodes.dataNodes().keys()); } else { // clean nodes that have failed shardStates.keys().removeAll(new ObjectPredicate() { @@ -373,9 +375,10 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA return !nodes.nodeExists(node.id()); } }); - nodeIds = Sets.newHashSet(); + nodeIds = ObjectOpenHashSet.newInstance(); // we have stored cached from before, see if the nodes changed, if they have, go fetch again - for (DiscoveryNode node : nodes.dataNodes().values()) { + for (ObjectCursor cursor : nodes.dataNodes().values()) { + DiscoveryNode node = cursor.value; if (!shardStates.containsKey(node)) { nodeIds.add(node.id()); } @@ -385,7 +388,8 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA return shardStates; } - TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), nodes.dataNodes().keySet(), listTimeout).actionGet(); + String[] nodesIdsArray = nodeIds.toArray(String.class); + TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), nodesIdsArray, listTimeout).actionGet(); if (logger.isDebugEnabled()) { if (response.failures().length > 0) { StringBuilder sb = new StringBuilder(shard + ": failures when trying to list shards on nodes:"); @@ -409,13 +413,13 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA private Map buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) { Map shardStores = cachedStores.get(shard.shardId()); - Set nodesIds; + ObjectOpenHashSet nodesIds; if (shardStores == null) { shardStores = Maps.newHashMap(); cachedStores.put(shard.shardId(), shardStores); - nodesIds = nodes.dataNodes().keySet(); + nodesIds = ObjectOpenHashSet.from(nodes.dataNodes().keys()); } else { - nodesIds = Sets.newHashSet(); + nodesIds = ObjectOpenHashSet.newInstance(); // clean nodes that have failed for (Iterator it = shardStores.keySet().iterator(); it.hasNext(); ) { DiscoveryNode node = it.next(); @@ -424,7 +428,8 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA } } - for (DiscoveryNode node : nodes.dataNodes().values()) { + for (ObjectCursor cursor : nodes.dataNodes().values()) { + DiscoveryNode node = cursor.value; if (!shardStores.containsKey(node)) { nodesIds.add(node.id()); } @@ -432,7 +437,8 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA } if (!nodesIds.isEmpty()) { - TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodesIds, listTimeout).actionGet(); + String[] nodesIdsArray = nodesIds.toArray(String.class); + TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodesIdsArray, listTimeout).actionGet(); if (logger.isTraceEnabled()) { if (nodesStoreFilesMetaData.failures().length > 0) { StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:"); diff --git a/src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java index 8a8d5f8ca2a..61df720fba4 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java @@ -39,7 +39,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicReferenceArray; /** @@ -59,7 +58,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA return this; } - public ActionFuture list(Set nodesIds, @Nullable TimeValue timeout) { + public ActionFuture list(String[] nodesIds, @Nullable TimeValue timeout) { return execute(new Request(nodesIds).timeout(timeout)); } @@ -133,8 +132,8 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA public Request() { } - public Request(Set nodesIds) { - super(nodesIds.toArray(new String[nodesIds.size()])); + public Request(String... nodesIds) { + super(nodesIds); } @Override diff --git a/src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java b/src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java index c2b38900bb3..355c758cb3e 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java @@ -59,7 +59,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat return this; } - public ActionFuture list(ShardId shardId, Set nodesIds, @Nullable TimeValue timeout) { + public ActionFuture list(ShardId shardId, String[] nodesIds, @Nullable TimeValue timeout) { return execute(new Request(shardId, nodesIds).timeout(timeout)); } @@ -143,6 +143,10 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat super(nodesIds.toArray(new String[nodesIds.size()])); this.shardId = shardId; } + public Request(ShardId shardId, String... nodesIds) { + super(nodesIds); + this.shardId = shardId; + } public ShardId shardId() { return this.shardId; diff --git a/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 2065c7424bc..ab9877cff46 100644 --- a/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -72,7 +72,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio this.nodeEnv = nodeEnv; } - public ActionFuture list(ShardId shardId, boolean onlyUnallocated, Set nodesIds, @Nullable TimeValue timeout) { + public ActionFuture list(ShardId shardId, boolean onlyUnallocated, String[] nodesIds, @Nullable TimeValue timeout) { return execute(new Request(shardId, onlyUnallocated, nodesIds).timeout(timeout)); }