Cut DiscoveryNodes over to ImmutableOpenMap.

This commit is contained in:
Martijn van Groningen 2013-11-26 11:56:37 +01:00
parent 5f7146aab8
commit 9f5d01ca4c
13 changed files with 120 additions and 92 deletions

View File

@ -19,7 +19,8 @@
package org.elasticsearch.action.admin.cluster.node.shutdown;
import com.google.common.collect.Sets;
import com.carrotsearch.hppc.ObjectOpenHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.ActionListener;
@ -38,7 +39,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
/**
@ -101,7 +101,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
if (disabled) {
throw new ElasticSearchIllegalStateException("Shutdown is disabled");
}
final Set<DiscoveryNode> nodes = Sets.newHashSet();
final ObjectOpenHashSet<DiscoveryNode> nodes = new ObjectOpenHashSet<DiscoveryNode>();
if (state.nodes().isAllNodes(request.nodesIds)) {
logger.info("[cluster_shutdown]: requested, shutting down in [{}]", request.delay);
nodes.addAll(state.nodes().dataNodes().values());
@ -119,7 +119,8 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
clusterService.stop();
final CountDownLatch latch = new CountDownLatch(nodes.size());
for (final DiscoveryNode node : nodes) {
for (ObjectCursor<DiscoveryNode> cursor : nodes) {
final DiscoveryNode node = cursor.value;
if (node.id().equals(state.nodes().masterNodeId())) {
// don't shutdown the master yet...
latch.countDown();
@ -219,7 +220,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
});
t.start();
}
listener.onResponse(new NodesShutdownResponse(clusterName, nodes.toArray(new DiscoveryNode[nodes.size()])));
listener.onResponse(new NodesShutdownResponse(clusterName, nodes.toArray(DiscoveryNode.class)));
}
private class NodeShutdownRequestHandler extends BaseTransportRequestHandler<NodeShutdownRequest> {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.client.transport;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@ -472,8 +473,8 @@ public class TransportClientNodesService extends AbstractComponent {
if (!ignoreClusterName && !clusterName.equals(clusterStateResponse.getClusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...", clusterStateResponse.getState().nodes().localNode(), clusterName);
}
for (DiscoveryNode node : clusterStateResponse.getState().nodes().dataNodes().values()) {
newNodes.add(node);
for (ObjectCursor<DiscoveryNode> cursor : clusterStateResponse.getState().nodes().dataNodes().values()) {
newNodes.add(cursor.value);
}
}

View File

@ -19,26 +19,27 @@
package org.elasticsearch.cluster.node;
import com.carrotsearch.hppc.ObjectOpenHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.transport.TransportAddress;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Maps.newHashMap;
/**
* This class holds all {@link DiscoveryNode} in the cluster and provides convinience methods to
@ -48,17 +49,17 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
public static final DiscoveryNodes EMPTY_NODES = builder().build();
private final ImmutableMap<String, DiscoveryNode> nodes;
private final ImmutableOpenMap<String, DiscoveryNode> nodes;
private final ImmutableMap<String, DiscoveryNode> dataNodes;
private final ImmutableOpenMap<String, DiscoveryNode> dataNodes;
private final ImmutableMap<String, DiscoveryNode> masterNodes;
private final ImmutableOpenMap<String, DiscoveryNode> masterNodes;
private final String masterNodeId;
private final String localNodeId;
private DiscoveryNodes(ImmutableMap<String, DiscoveryNode> nodes, ImmutableMap<String, DiscoveryNode> dataNodes, ImmutableMap<String, DiscoveryNode> masterNodes, String masterNodeId, String localNodeId) {
private DiscoveryNodes(ImmutableOpenMap<String, DiscoveryNode> nodes, ImmutableOpenMap<String, DiscoveryNode> dataNodes, ImmutableOpenMap<String, DiscoveryNode> masterNodes, String masterNodeId, String localNodeId) {
this.nodes = nodes;
this.dataNodes = dataNodes;
this.masterNodes = masterNodes;
@ -68,7 +69,18 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
@Override
public UnmodifiableIterator<DiscoveryNode> iterator() {
return nodes.values().iterator();
final Iterator<ObjectCursor<DiscoveryNode>> cursor = nodes.values().iterator();
return new UnmodifiableIterator<DiscoveryNode>() {
@Override
public boolean hasNext() {
return cursor.hasNext();
}
@Override
public DiscoveryNode next() {
return cursor.next().value;
}
};
}
/**
@ -113,7 +125,7 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
*
* @return {@link Map} of the discovered nodes arranged by their ids
*/
public ImmutableMap<String, DiscoveryNode> nodes() {
public ImmutableOpenMap<String, DiscoveryNode> nodes() {
return this.nodes;
}
@ -122,7 +134,7 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
*
* @return {@link Map} of the discovered nodes arranged by their ids
*/
public ImmutableMap<String, DiscoveryNode> getNodes() {
public ImmutableOpenMap<String, DiscoveryNode> getNodes() {
return nodes();
}
@ -131,7 +143,7 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
*
* @return {@link Map} of the discovered data nodes arranged by their ids
*/
public ImmutableMap<String, DiscoveryNode> dataNodes() {
public ImmutableOpenMap<String, DiscoveryNode> dataNodes() {
return this.dataNodes;
}
@ -140,7 +152,7 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
*
* @return {@link Map} of the discovered data nodes arranged by their ids
*/
public ImmutableMap<String, DiscoveryNode> getDataNodes() {
public ImmutableOpenMap<String, DiscoveryNode> getDataNodes() {
return dataNodes();
}
@ -149,7 +161,7 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
*
* @return {@link Map} of the discovered master nodes arranged by their ids
*/
public ImmutableMap<String, DiscoveryNode> masterNodes() {
public ImmutableOpenMap<String, DiscoveryNode> masterNodes() {
return this.masterNodes;
}
@ -158,7 +170,7 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
*
* @return {@link Map} of the discovered master nodes arranged by their ids
*/
public ImmutableMap<String, DiscoveryNode> getMasterNodes() {
public ImmutableOpenMap<String, DiscoveryNode> getMasterNodes() {
return masterNodes();
}
@ -167,8 +179,10 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
*
* @return {@link Map} of the discovered master and data nodes arranged by their ids
*/
public ImmutableMap<String, DiscoveryNode> masterAndDataNodes() {
return MapBuilder.<String, DiscoveryNode>newMapBuilder().putAll(dataNodes).putAll(masterNodes).immutableMap();
public ImmutableOpenMap<String, DiscoveryNode> masterAndDataNodes() {
ImmutableOpenMap.Builder<String, DiscoveryNode> nodes = ImmutableOpenMap.builder(dataNodes);
nodes.putAll(masterNodes);
return nodes.build();
}
/**
@ -270,7 +284,8 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
* @return node identified by the given address or <code>null</code> if no such node exists
*/
public DiscoveryNode findByAddress(TransportAddress address) {
for (DiscoveryNode node : nodes.values()) {
for (ObjectCursor<DiscoveryNode> cursor : nodes.values()) {
DiscoveryNode node = cursor.value;
if (node.address().equals(address)) {
return node;
}
@ -310,7 +325,7 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
}
return nodesIds;
} else {
Set<String> resolvedNodesIds = new HashSet<String>(nodesIds.length);
ObjectOpenHashSet<String> resolvedNodesIds = new ObjectOpenHashSet<String>(nodesIds.length);
for (String nodeId : nodesIds) {
if (nodeId.equals("_local")) {
String localNodeId = localNodeId();
@ -342,15 +357,15 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
String matchAttrValue = nodeId.substring(index + 1);
if ("data".equals(matchAttrName)) {
if (Booleans.parseBoolean(matchAttrValue, true)) {
resolvedNodesIds.addAll(dataNodes.keySet());
resolvedNodesIds.addAll(dataNodes.keys());
} else {
resolvedNodesIds.removeAll(dataNodes.keySet());
resolvedNodesIds.removeAll(dataNodes.keys());
}
} else if ("master".equals(matchAttrName)) {
if (Booleans.parseBoolean(matchAttrValue, true)) {
resolvedNodesIds.addAll(masterNodes.keySet());
resolvedNodesIds.addAll(masterNodes.keys());
} else {
resolvedNodesIds.removeAll(masterNodes.keySet());
resolvedNodesIds.removeAll(masterNodes.keys());
}
} else {
for (DiscoveryNode node : this) {
@ -366,7 +381,7 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
}
}
}
return resolvedNodesIds.toArray(new String[resolvedNodesIds.size()]);
return resolvedNodesIds.toArray(String.class);
}
}
@ -553,18 +568,18 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
public static class Builder {
private Map<String, DiscoveryNode> nodes = newHashMap();
private final ImmutableOpenMap.Builder<String, DiscoveryNode> nodes;
private String masterNodeId;
private String localNodeId;
public Builder() {
nodes = ImmutableOpenMap.builder();
}
public Builder(DiscoveryNodes nodes) {
this.masterNodeId = nodes.masterNodeId();
this.localNodeId = nodes.localNodeId();
this.nodes.putAll(nodes.nodes());
this.nodes = ImmutableOpenMap.builder(nodes.nodes());
}
public Builder put(DiscoveryNode node) {
@ -588,17 +603,17 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
}
public DiscoveryNodes build() {
ImmutableMap.Builder<String, DiscoveryNode> dataNodesBuilder = ImmutableMap.builder();
ImmutableMap.Builder<String, DiscoveryNode> masterNodesBuilder = ImmutableMap.builder();
for (Map.Entry<String, DiscoveryNode> nodeEntry : nodes.entrySet()) {
if (nodeEntry.getValue().dataNode()) {
dataNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue());
ImmutableOpenMap.Builder<String, DiscoveryNode> dataNodesBuilder = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiscoveryNode> masterNodesBuilder = ImmutableOpenMap.builder();
for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) {
if (nodeEntry.value.dataNode()) {
dataNodesBuilder.put(nodeEntry.key, nodeEntry.value);
}
if (nodeEntry.getValue().masterNode()) {
masterNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue());
if (nodeEntry.value.masterNode()) {
masterNodesBuilder.put(nodeEntry.key, nodeEntry.value);
}
}
return new DiscoveryNodes(ImmutableMap.copyOf(nodes), dataNodesBuilder.build(), masterNodesBuilder.build(), masterNodeId, localNodeId);
return new DiscoveryNodes(nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), masterNodeId, localNodeId);
}
public static void writeTo(DiscoveryNodes nodes, StreamOutput out) throws IOException {

View File

@ -64,8 +64,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
this.routingTable = clusterState.routingTable();
Map<String, List<MutableShardRouting>> nodesToShards = newHashMap();
// fill in the nodeToShards with the "live" nodes
for (DiscoveryNode node : clusterState.nodes().dataNodes().values()) {
nodesToShards.put(node.id(), new ArrayList<MutableShardRouting>());
for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().dataNodes().values()) {
nodesToShards.put(cursor.value.id(), new ArrayList<MutableShardRouting>());
}
// fill in the inverse of node -> shards allocated

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticSearchException;
@ -169,7 +170,6 @@ public class AllocationService extends AbstractComponent {
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
Collections.shuffle(routingNodes.unassigned());
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
Iterable<DiscoveryNode> dataNodes = allocation.nodes().dataNodes().values();
boolean changed = false;
// first, clear from the shards any node id they used to belong to that is now dead
changed |= deassociateDeadNodes(allocation);
@ -327,7 +327,8 @@ public class AllocationService extends AbstractComponent {
* new nodes);
*/
private void applyNewNodes(RoutingAllocation allocation) {
for (DiscoveryNode node : allocation.nodes().dataNodes().values()) {
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().dataNodes().values()) {
DiscoveryNode node = cursor.value;
if (!allocation.routingNodes().nodesToShards().containsKey(node.id())) {
RoutingNode routingNode = new RoutingNode(node.id(), node);
allocation.routingNodes().nodesToShards().put(node.id(), routingNode);

View File

@ -315,7 +315,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// update the fact that we are the master...
latestDiscoNodes = builder.build();
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NO_MASTER_BLOCK).build();
return ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build();
return ClusterState.builder(currentState).nodes(latestDiscoNodes).blocks(clusterBlocks).build();
}
@Override
@ -483,34 +483,34 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return currentState;
}
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes())
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(currentState.nodes())
// make sure the old master node, which has failed, is not part of the nodes we publish
.remove(masterNode.id())
.masterNodeId(null);
.masterNodeId(null).build();
if (!electMaster.hasEnoughMasterNodes(nodesBuilder.build())) {
return rejoin(ClusterState.builder(currentState).nodes(nodesBuilder).build(), "not enough master nodes after master left (reason = " + reason + ")");
if (!electMaster.hasEnoughMasterNodes(discoveryNodes)) {
return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "not enough master nodes after master left (reason = " + reason + ")");
}
final DiscoveryNode electedMaster = electMaster.electMaster(nodesBuilder.build()); // elect master
final DiscoveryNode electedMaster = electMaster.electMaster(discoveryNodes); // elect master
if (localNode.equals(electedMaster)) {
master = true;
masterFD.stop("got elected as new master since master left (reason = " + reason + ")");
nodesFD.start();
nodesBuilder.masterNodeId(localNode.id());
latestDiscoNodes = nodesBuilder.build();
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).masterNodeId(localNode.id()).build();
latestDiscoNodes = discoveryNodes;
return ClusterState.builder(currentState).nodes(latestDiscoNodes).build();
} else {
nodesFD.stop();
if (electedMaster != null) {
nodesBuilder.masterNodeId(electedMaster.id());
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).masterNodeId(electedMaster.id()).build();
masterFD.restart(electedMaster, "possible elected master since master left (reason = " + reason + ")");
latestDiscoNodes = nodesBuilder.build();
latestDiscoNodes = discoveryNodes;
return ClusterState.builder(currentState)
.nodes(latestDiscoNodes)
.build();
} else {
return rejoin(ClusterState.builder(currentState).nodes(nodesBuilder.build()).build(), "master_left and no other node elected to become master");
return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "master_left and no other node elected to become master");
}
}
}

View File

@ -19,12 +19,14 @@
package org.elasticsearch.discovery.zen.elect;
import com.carrotsearch.hppc.ObjectContainer;
import com.google.common.collect.Lists;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@ -70,8 +72,8 @@ public class ElectMasterService extends AbstractComponent {
/**
* Returns a list of the next possible masters.
*/
public DiscoveryNode[] nextPossibleMasters(Iterable<DiscoveryNode> nodes, int numberOfPossibleMasters) {
List<DiscoveryNode> sortedNodes = sortedMasterNodes(nodes);
public DiscoveryNode[] nextPossibleMasters(ObjectContainer<DiscoveryNode> nodes, int numberOfPossibleMasters) {
List<DiscoveryNode> sortedNodes = sortedMasterNodes(Arrays.asList(nodes.toArray(DiscoveryNode.class)));
if (sortedNodes == null) {
return new DiscoveryNode[0];
}

View File

@ -19,8 +19,9 @@
package org.elasticsearch.gateway.blobstore;
import com.carrotsearch.hppc.ObjectOpenHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -50,7 +51,6 @@ import org.elasticsearch.transport.ConnectTransportException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
/**
@ -115,8 +115,8 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme
// pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing
boolean canBeAllocatedToAtLeastOneNode = false;
for (DiscoveryNode discoNode : nodes.dataNodes().values()) {
RoutingNode node = routingNodes.node(discoNode.id());
for (ObjectCursor<DiscoveryNode> cursor : nodes.dataNodes().values()) {
RoutingNode node = routingNodes.node(cursor.value.id());
if (node == null) {
continue;
}
@ -263,13 +263,13 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme
private Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) {
Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = cachedStores.get(shard.shardId());
Set<String> nodesIds;
ObjectOpenHashSet<String> nodesIds;
if (shardStores == null) {
shardStores = Maps.newHashMap();
cachedStores.put(shard.shardId(), shardStores);
nodesIds = nodes.dataNodes().keySet();
nodesIds = ObjectOpenHashSet.from(nodes.dataNodes().keys());
} else {
nodesIds = Sets.newHashSet();
nodesIds = ObjectOpenHashSet.newInstance();
// clean nodes that have failed
for (Iterator<DiscoveryNode> it = shardStores.keySet().iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
@ -278,7 +278,8 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme
}
}
for (DiscoveryNode node : nodes.dataNodes().values()) {
for (ObjectCursor<DiscoveryNode> cursor : nodes.dataNodes().values()) {
DiscoveryNode node = cursor.value;
if (!shardStores.containsKey(node)) {
nodesIds.add(node.id());
}
@ -286,7 +287,8 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme
}
if (!nodesIds.isEmpty()) {
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodesIds, listTimeout).actionGet();
String[] nodesIdsArray = nodesIds.toArray(String.class);
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodesIdsArray, listTimeout).actionGet();
if (logger.isTraceEnabled()) {
if (nodesStoreFilesMetaData.failures().length > 0) {
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:");

View File

@ -20,8 +20,8 @@
package org.elasticsearch.gateway.local;
import com.carrotsearch.hppc.ObjectFloatOpenHashMap;
import com.carrotsearch.hppc.ObjectOpenHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -43,8 +43,6 @@ import org.elasticsearch.gateway.local.state.meta.TransportNodesListGatewayMetaS
import org.elasticsearch.gateway.local.state.shards.LocalGatewayShardsState;
import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule;
import java.util.Set;
/**
*
*/
@ -99,10 +97,9 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
@Override
public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
Set<String> nodesIds = Sets.newHashSet();
nodesIds.addAll(clusterService.state().nodes().masterNodes().keySet());
ObjectOpenHashSet<String> nodesIds = ObjectOpenHashSet.from(clusterService.state().nodes().masterNodes().keys());
logger.trace("performing state recovery from {}", nodesIds);
TransportNodesListGatewayMetaState.NodesLocalGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();
TransportNodesListGatewayMetaState.NodesLocalGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds.toArray(String.class), null).actionGet();
int requiredAllocation = 1;

View File

@ -20,6 +20,8 @@
package org.elasticsearch.gateway.local;
import com.carrotsearch.hppc.ObjectLongOpenHashMap;
import com.carrotsearch.hppc.ObjectOpenHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.predicates.ObjectPredicate;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@ -256,8 +258,8 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
// pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing
boolean canBeAllocatedToAtLeastOneNode = false;
for (DiscoveryNode discoNode : nodes.dataNodes().values()) {
RoutingNode node = routingNodes.node(discoNode.id());
for (ObjectCursor<DiscoveryNode> cursor : nodes.dataNodes().values()) {
RoutingNode node = routingNodes.node(cursor.value.id());
if (node == null) {
continue;
}
@ -360,11 +362,11 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
private ObjectLongOpenHashMap<DiscoveryNode> buildShardStates(final DiscoveryNodes nodes, MutableShardRouting shard) {
ObjectLongOpenHashMap<DiscoveryNode> shardStates = cachedShardsState.get(shard.shardId());
Set<String> nodeIds;
ObjectOpenHashSet<String> nodeIds;
if (shardStates == null) {
shardStates = new ObjectLongOpenHashMap<DiscoveryNode>();
cachedShardsState.put(shard.shardId(), shardStates);
nodeIds = nodes.dataNodes().keySet();
nodeIds = ObjectOpenHashSet.from(nodes.dataNodes().keys());
} else {
// clean nodes that have failed
shardStates.keys().removeAll(new ObjectPredicate<DiscoveryNode>() {
@ -373,9 +375,10 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
return !nodes.nodeExists(node.id());
}
});
nodeIds = Sets.newHashSet();
nodeIds = ObjectOpenHashSet.newInstance();
// we have stored cached from before, see if the nodes changed, if they have, go fetch again
for (DiscoveryNode node : nodes.dataNodes().values()) {
for (ObjectCursor<DiscoveryNode> cursor : nodes.dataNodes().values()) {
DiscoveryNode node = cursor.value;
if (!shardStates.containsKey(node)) {
nodeIds.add(node.id());
}
@ -385,7 +388,8 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
return shardStates;
}
TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), nodes.dataNodes().keySet(), listTimeout).actionGet();
String[] nodesIdsArray = nodeIds.toArray(String.class);
TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), nodesIdsArray, listTimeout).actionGet();
if (logger.isDebugEnabled()) {
if (response.failures().length > 0) {
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list shards on nodes:");
@ -409,13 +413,13 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
private Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) {
Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = cachedStores.get(shard.shardId());
Set<String> nodesIds;
ObjectOpenHashSet<String> nodesIds;
if (shardStores == null) {
shardStores = Maps.newHashMap();
cachedStores.put(shard.shardId(), shardStores);
nodesIds = nodes.dataNodes().keySet();
nodesIds = ObjectOpenHashSet.from(nodes.dataNodes().keys());
} else {
nodesIds = Sets.newHashSet();
nodesIds = ObjectOpenHashSet.newInstance();
// clean nodes that have failed
for (Iterator<DiscoveryNode> it = shardStores.keySet().iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
@ -424,7 +428,8 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
}
}
for (DiscoveryNode node : nodes.dataNodes().values()) {
for (ObjectCursor<DiscoveryNode> cursor : nodes.dataNodes().values()) {
DiscoveryNode node = cursor.value;
if (!shardStores.containsKey(node)) {
nodesIds.add(node.id());
}
@ -432,7 +437,8 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
}
if (!nodesIds.isEmpty()) {
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodesIds, listTimeout).actionGet();
String[] nodesIdsArray = nodesIds.toArray(String.class);
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodesIdsArray, listTimeout).actionGet();
if (logger.isTraceEnabled()) {
if (nodesStoreFilesMetaData.failures().length > 0) {
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:");

View File

@ -39,7 +39,6 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
@ -59,7 +58,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA
return this;
}
public ActionFuture<NodesLocalGatewayMetaState> list(Set<String> nodesIds, @Nullable TimeValue timeout) {
public ActionFuture<NodesLocalGatewayMetaState> list(String[] nodesIds, @Nullable TimeValue timeout) {
return execute(new Request(nodesIds).timeout(timeout));
}
@ -133,8 +132,8 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA
public Request() {
}
public Request(Set<String> nodesIds) {
super(nodesIds.toArray(new String[nodesIds.size()]));
public Request(String... nodesIds) {
super(nodesIds);
}
@Override

View File

@ -59,7 +59,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
return this;
}
public ActionFuture<NodesLocalGatewayStartedShards> list(ShardId shardId, Set<String> nodesIds, @Nullable TimeValue timeout) {
public ActionFuture<NodesLocalGatewayStartedShards> list(ShardId shardId, String[] nodesIds, @Nullable TimeValue timeout) {
return execute(new Request(shardId, nodesIds).timeout(timeout));
}
@ -143,6 +143,10 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
super(nodesIds.toArray(new String[nodesIds.size()]));
this.shardId = shardId;
}
public Request(ShardId shardId, String... nodesIds) {
super(nodesIds);
this.shardId = shardId;
}
public ShardId shardId() {
return this.shardId;

View File

@ -72,7 +72,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
this.nodeEnv = nodeEnv;
}
public ActionFuture<NodesStoreFilesMetaData> list(ShardId shardId, boolean onlyUnallocated, Set<String> nodesIds, @Nullable TimeValue timeout) {
public ActionFuture<NodesStoreFilesMetaData> list(ShardId shardId, boolean onlyUnallocated, String[] nodesIds, @Nullable TimeValue timeout) {
return execute(new Request(shardId, onlyUnallocated, nodesIds).timeout(timeout));
}