From 4f4471483de7638880e81da935cc4a4a2f7da4bb Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 29 Aug 2010 01:24:23 +0300 Subject: [PATCH] initial work on local gateway --- .../elasticsearch/cluster/ClusterState.java | 9 +- .../cluster/block/ClusterBlocks.java | 20 +- .../cluster/metadata/MetaData.java | 15 +- .../metadata/MetaDataCreateIndexService.java | 20 +- .../cluster/node/DiscoveryNode.java | 23 +- .../cluster/node/DiscoveryNodes.java | 19 +- .../cluster/routing/IndexRoutingTable.java | 28 ++ .../cluster/routing/RoutingNodes.java | 14 +- .../cluster/routing/RoutingTable.java | 10 +- .../discovery/zen/ZenDiscovery.java | 10 - .../zen/elect/ElectMasterService.java | 6 +- .../elasticsearch/env/NodeEnvironment.java | 3 +- .../gateway/local/LocalGateway.java | 325 ++++++++++++++++++ .../gateway/local/LocalGatewayModule.java | 43 +++ .../local/LocalGatewayNodeAllocation.java | 110 ++++++ .../gateway/local/LocalGatewayState.java | 222 ++++++++++++ .../local/TransportNodesListGatewayState.java | 220 ++++++++++++ ...ReplicaAsPrimaryDuringRelocationTests.java | 6 +- .../allocation/FailedShardsRoutingTests.java | 24 +- .../PrimaryElectionRoutingTests.java | 6 +- ...yNotRelocatedWhileBeingRecoveredTests.java | 2 +- .../allocation/RebalanceAfterActiveTests.java | 18 +- .../ReplicaAllocatedAfterPrimaryTests.java | 2 +- .../SingleShardNoReplicasRoutingTests.java | 17 +- .../SingleShardOneReplicaRoutingTests.java | 4 +- .../TenShardsOneReplicaRoutingTests.java | 12 +- .../UpdateNumberOfReplicasTests.java | 10 +- 27 files changed, 1103 insertions(+), 95 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayState.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayState.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java index 8266fbce094..d0c2e6aa830 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -94,7 +94,7 @@ public class ClusterState { } public RoutingNodes routingNodes() { - return routingTable.routingNodes(metaData); + return routingTable.routingNodes(metaData, blocks); } public RoutingNodes getRoutingNodes() { @@ -117,7 +117,7 @@ public class ClusterState { if (routingNodes != null) { return routingNodes; } - routingNodes = routingTable.routingNodes(metaData); + routingNodes = routingTable.routingNodes(metaData, blocks); return routingNodes; } @@ -177,6 +177,11 @@ public class ClusterState { return this; } + public Builder version(long version) { + this.version = version; + return this; + } + public Builder state(ClusterState state) { this.version = state.version(); this.nodes = state.nodes(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java index 3a06cdf12e5..3278f2dd056 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java @@ -91,6 +91,10 @@ public class ClusterBlocks { return levelHolders[level.id()].indices(); } + public boolean hasIndexBlock(String index, ClusterBlock block) { + return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block); + } + public void indexBlockedRaiseException(ClusterBlockLevel level, String index) throws ClusterBlockException { if (!indexBlocked(level, index)) { return; @@ -136,22 +140,6 @@ public class ClusterBlocks { } } - static class LevelHolder { - private final Set global = Sets.newHashSet(); - private final Map> indices = Maps.newHashMap(); - - LevelHolder() { - } - - public Set global() { - return global; - } - - public Map> indices() { - return indices; - } - } - public static Builder builder() { return new Builder(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 9f423e8d56b..669e5a4c57e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -303,12 +303,17 @@ public class MetaData implements Iterable { public static MetaData fromXContent(XContentParser parser, @Nullable Settings globalSettings) throws IOException { Builder builder = new Builder(); - String currentFieldName = null; - XContentParser.Token token = parser.nextToken(); - if (token == null) { - // no data... - return builder.build(); + XContentParser.Token token = parser.currentToken(); + String currentFieldName = parser.currentName(); + if (!"meta-data".equals(currentFieldName)) { + token = parser.nextToken(); + currentFieldName = parser.currentName(); + if (token == null) { + // no data... + return builder.build(); + } } + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 336c4d7715d..a9fc9b7ff24 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -24,11 +24,14 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; @@ -53,6 +56,7 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -218,7 +222,14 @@ public class MetaDataCreateIndexService extends AbstractComponent { listener.timeout = timeoutTask; } - return newClusterStateBuilder().state(currentState).metaData(newMetaData).build(); + ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + if (!request.blocks.isEmpty()) { + for (ClusterBlock block : request.blocks) { + blocks.addIndexBlock(request.index, block); + } + } + + return newClusterStateBuilder().state(currentState).blocks(blocks).metaData(newMetaData).build(); } catch (Exception e) { listener.onFailure(e); return currentState; @@ -314,6 +325,8 @@ public class MetaDataCreateIndexService extends AbstractComponent { TimeValue timeout = TimeValue.timeValueSeconds(5); + Set blocks = Sets.newHashSet(); + public Request(String cause, String index) { this.cause = cause; this.index = index; @@ -336,6 +349,11 @@ public class MetaDataCreateIndexService extends AbstractComponent { return this; } + public Request blocks(Set blocks) { + this.blocks.addAll(blocks); + return this; + } + public Request timeout(TimeValue timeout) { this.timeout = timeout; return this; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java index 5e742301e30..c2044c24c2c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java @@ -153,7 +153,10 @@ public class DiscoveryNode implements Streamable, Serializable { */ public boolean dataNode() { String data = attributes.get("data"); - return data == null || data.equals("true"); + if (data == null) { + return !clientNode(); + } + return data.equals("true"); } /** @@ -175,6 +178,24 @@ public class DiscoveryNode implements Streamable, Serializable { return clientNode(); } + /** + * Can this node become master or not. + */ + public boolean masterNode() { + String master = attributes.get("master"); + if (master == null) { + return !clientNode(); + } + return master.equals("true"); + } + + /** + * Can this node become master or not. + */ + public boolean isMasterNode() { + return masterNode(); + } + public static DiscoveryNode readNode(StreamInput in) throws IOException { DiscoveryNode node = new DiscoveryNode(); node.readFrom(in); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java index 912b681029a..a1a1fc9dccc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java @@ -46,13 +46,16 @@ public class DiscoveryNodes implements Iterable { private final ImmutableMap dataNodes; + private final ImmutableMap masterNodes; + private final String masterNodeId; private final String localNodeId; - private DiscoveryNodes(ImmutableMap nodes, ImmutableMap dataNodes, String masterNodeId, String localNodeId) { + private DiscoveryNodes(ImmutableMap nodes, ImmutableMap dataNodes, ImmutableMap masterNodes, String masterNodeId, String localNodeId) { this.nodes = nodes; this.dataNodes = dataNodes; + this.masterNodes = masterNodes; this.masterNodeId = masterNodeId; this.localNodeId = localNodeId; } @@ -104,6 +107,14 @@ public class DiscoveryNodes implements Iterable { return dataNodes(); } + public ImmutableMap masterNodes() { + return this.masterNodes; + } + + public ImmutableMap getMasterNodes() { + return masterNodes(); + } + public DiscoveryNode get(String nodeId) { return nodes.get(nodeId); } @@ -366,12 +377,16 @@ public class DiscoveryNodes implements Iterable { public DiscoveryNodes build() { ImmutableMap.Builder dataNodesBuilder = ImmutableMap.builder(); + ImmutableMap.Builder masterNodesBuilder = ImmutableMap.builder(); for (Map.Entry nodeEntry : nodes.entrySet()) { if (nodeEntry.getValue().dataNode()) { dataNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue()); } + if (nodeEntry.getValue().masterNode()) { + masterNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue()); + } } - return new DiscoveryNodes(ImmutableMap.copyOf(nodes), dataNodesBuilder.build(), masterNodeId, localNodeId); + return new DiscoveryNodes(ImmutableMap.copyOf(nodes), dataNodesBuilder.build(), masterNodesBuilder.build(), masterNodeId, localNodeId); } public static void writeTo(DiscoveryNodes nodes, StreamOutput out) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index 1c039a120fd..e7f53e83305 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -105,6 +105,34 @@ public class IndexRoutingTable implements Iterable { return shards.get(shardId); } + public boolean allPrimaryShardsActive() { + return primaryShardsActive() == shards().size(); + } + + public int primaryShardsActive() { + int counter = 0; + for (IndexShardRoutingTable shardRoutingTable : this) { + if (shardRoutingTable.primaryShard().active()) { + counter++; + } + } + return counter; + } + + public boolean allPrimaryShardsUnassigned() { + return primaryShardsUnassigned() == shards.size(); + } + + public int primaryShardsUnassigned() { + int counter = 0; + for (IndexShardRoutingTable shardRoutingTable : this) { + if (shardRoutingTable.primaryShard().unassigned()) { + counter++; + } + } + return counter; + } + public List shardsWithState(ShardRoutingState... states) { List shards = newArrayList(); for (IndexShardRoutingTable shardRoutingTable : this) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index f43d35cfd71..65ebcd29972 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing; +import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.util.concurrent.NotThreadSafe; @@ -35,6 +36,8 @@ public class RoutingNodes implements Iterable { private final MetaData metaData; + private final ClusterBlocks blocks; + private final RoutingTable routingTable; private final Map nodesToShards = newHashMap(); @@ -43,8 +46,9 @@ public class RoutingNodes implements Iterable { private final List ignoredUnassigned = newArrayList(); - public RoutingNodes(MetaData metaData, RoutingTable routingTable) { + public RoutingNodes(MetaData metaData, ClusterBlocks blocks, RoutingTable routingTable) { this.metaData = metaData; + this.blocks = blocks; this.routingTable = routingTable; Map> nodesToShards = newHashMap(); for (IndexRoutingTable indexRoutingTable : routingTable.indicesRouting().values()) { @@ -100,6 +104,14 @@ public class RoutingNodes implements Iterable { return metaData(); } + public ClusterBlocks blocks() { + return this.blocks; + } + + public ClusterBlocks getBlocks() { + return this.blocks; + } + public int requiredAverageNumberOfShardsPerNode() { return metaData.totalNumberOfShards() / nodesToShards.size(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index bf4cdaf090d..bc6d0b23d54 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -19,6 +19,8 @@ package org.elasticsearch.cluster.routing; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.Iterables; @@ -72,8 +74,12 @@ public class RoutingTable implements Iterable { return indicesRouting(); } - public RoutingNodes routingNodes(MetaData metaData) { - return new RoutingNodes(metaData, this); + public RoutingNodes routingNodes(ClusterState state) { + return routingNodes(state.metaData(), state.blocks()); + } + + public RoutingNodes routingNodes(MetaData metaData, ClusterBlocks blocks) { + return new RoutingNodes(metaData, blocks, this); } public RoutingTable validateRaiseException(MetaData metaData) throws RoutingValidationException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 1957b17b648..8070d096dcf 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -135,16 +135,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen @Override protected void doStart() throws ElasticSearchException { Map nodeAttributes = buildCommonNodesAttributes(settings); - Boolean zenMaster = componentSettings.getAsBoolean("master", null); - if (zenMaster != null) { - if (zenMaster.equals(Boolean.FALSE)) { - nodeAttributes.put("zen.master", "false"); - } - } else if (nodeAttributes.containsKey("client")) { - if (nodeAttributes.get("client").equals("true")) { - nodeAttributes.put("zen.master", "false"); - } - } // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling String nodeId = UUID.randomUUID().toString(); localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java index cc98ae8d9d5..4e77ce90032 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java @@ -79,10 +79,8 @@ public class ElectMasterService extends AbstractComponent { // clean non master nodes for (Iterator it = possibleNodes.iterator(); it.hasNext();) { DiscoveryNode node = it.next(); - if (node.attributes().containsKey("zen.master")) { - if (node.attributes().get("zen.master").equals("false")) { - it.remove(); - } + if (!node.masterNode()) { + it.remove(); } } Collections.sort(possibleNodes, nodeComparator); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 427433cfdc8..865997b815e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -49,7 +49,8 @@ public class NodeEnvironment extends AbstractComponent { @Inject public NodeEnvironment(Settings settings, Environment environment) throws IOException { super(settings); - if (!settings.getAsBoolean("node.data", true) || settings.getAsBoolean("node.client", false)) { + if (!settings.getAsBoolean("node.data", true) || settings.getAsBoolean("node.client", false) || + !settings.getAsBoolean("node.master", true)) { nodeFile = null; lock = null; return; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java new file mode 100644 index 00000000000..6929517ea89 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -0,0 +1,325 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway.local; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.common.collect.ImmutableSet; +import org.elasticsearch.common.collect.Sets; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.gateway.GatewayException; +import org.elasticsearch.index.gateway.none.NoneIndexGatewayModule; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.*; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.metadata.MetaData.*; +import static org.elasticsearch.common.unit.TimeValue.*; + +/** + * @author kimchy (shay.banon) + */ +public class LocalGateway extends AbstractLifecycleComponent implements Gateway, ClusterStateListener { + + public static final ClusterBlock INDEX_NOT_RECOVERED_BLOCK = new ClusterBlock(3, "index not recovered (not enough nodes with shards allocated found)", ClusterBlockLevel.ALL); + + private File location; + + private final ClusterName clusterName; + + private final ClusterService clusterService; + + private final NodeEnvironment nodeEnv; + + private final MetaDataCreateIndexService createIndexService; + + private final TransportNodesListGatewayState listGatewayState; + + private volatile LocalGatewayState currentState; + + @Inject public LocalGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService, + NodeEnvironment nodeEnv, ClusterName clusterName, ThreadPool threadPool, TransportNodesListGatewayState listGatewayState) { + super(settings); + this.clusterName = clusterName; + this.clusterService = clusterService; + this.createIndexService = createIndexService; + this.nodeEnv = nodeEnv; + this.listGatewayState = listGatewayState.initGateway(this); + } + + @Override public String type() { + return "local"; + } + + public LocalGatewayState currentState() { + return this.currentState; + } + + @Override protected void doStart() throws ElasticSearchException { + // if this is not a possible master node or data node, bail, we won't save anything here... + if (!clusterService.state().nodes().localNode().masterNode() || !clusterService.state().nodes().localNode().dataNode()) { + location = null; + return; + } + // create the location where the state will be stored + this.location = new File(nodeEnv.nodeFile(), "_state"); + this.location.mkdirs(); + + try { + long version = findLatestStateVersion(); + if (version != -1) { + this.currentState = readState(Streams.copyToByteArray(new FileInputStream(new File(location, "state-" + version)))); + } + } catch (Exception e) { + logger.warn("failed to read local state", e); + } + + clusterService.add(this); + } + + @Override protected void doStop() throws ElasticSearchException { + clusterService.remove(this); + } + + @Override protected void doClose() throws ElasticSearchException { + } + + @Override public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException { + Set nodesIds = Sets.newHashSet(); + nodesIds.addAll(clusterService.state().nodes().dataNodes().keySet()); + nodesIds.addAll(clusterService.state().nodes().masterNodes().keySet()); + TransportNodesListGatewayState.NodesLocalGatewayState nodesState = listGatewayState.list(nodesIds, null).actionGet(); + + TransportNodesListGatewayState.NodeLocalGatewayState electedState = null; + for (TransportNodesListGatewayState.NodeLocalGatewayState nodeState : nodesState) { + if (nodeState.state() == null) { + continue; + } + if (electedState == null) { + electedState = nodeState; + } else if (nodeState.state().version() > electedState.state().version()) { + electedState = nodeState; + } + } + if (electedState == null) { + logger.debug("no state elected"); + listener.onSuccess(); + return; + } + logger.debug("elected state from [{}]", electedState.node()); + final LocalGatewayState state = electedState.state(); + final AtomicInteger indicesCounter = new AtomicInteger(state.metaData().indices().size()); + clusterService.submitStateUpdateTask("local-gateway-elected-state", new ProcessedClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + MetaData.Builder metaDataBuilder = newMetaDataBuilder() + .metaData(currentState.metaData()); + // mark the metadata as read from gateway + metaDataBuilder.markAsRecoveredFromGateway(); + + return newClusterStateBuilder().state(currentState) + .version(state.version()) + .metaData(metaDataBuilder).build(); + } + + @Override public void clusterStateProcessed(ClusterState clusterState) { + // go over the meta data and create indices, we don't really need to copy over + // the meta data per index, since we create the index and it will be added automatically + for (final IndexMetaData indexMetaData : state.metaData()) { + try { + createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index()) + .settings(indexMetaData.settings()) + .mappingsCompressed(indexMetaData.mappings()) + .blocks(ImmutableSet.of(INDEX_NOT_RECOVERED_BLOCK)) + .timeout(timeValueSeconds(30)), + + new MetaDataCreateIndexService.Listener() { + @Override public void onResponse(MetaDataCreateIndexService.Response response) { + if (indicesCounter.decrementAndGet() == 0) { + listener.onSuccess(); + } + } + + @Override public void onFailure(Throwable t) { + logger.error("failed to create index [{}]", indexMetaData.index(), t); + } + }); + } catch (IOException e) { + logger.error("failed to create index [{}]", indexMetaData.index(), e); + } + } + } + }); + } + + @Override public Class suggestIndexGateway() { + return NoneIndexGatewayModule.class; + } + + @Override public void reset() throws Exception { + } + + @Override public void clusterChanged(final ClusterChangedEvent event) { + // nothing to do until we actually recover from hte gateway + if (!event.state().metaData().recoveredFromGateway()) { + return; + } + + // go over the indices, if they are blocked, and all are allocated, update the cluster state that it is no longer blocked + for (Map.Entry> entry : event.state().blocks().indices().entrySet()) { + final String index = entry.getKey(); + ImmutableSet indexBlocks = entry.getValue(); + if (indexBlocks.contains(INDEX_NOT_RECOVERED_BLOCK)) { + IndexRoutingTable indexRoutingTable = event.state().routingTable().index(index); + if (indexRoutingTable != null && indexRoutingTable.allPrimaryShardsActive()) { + clusterService.submitStateUpdateTask("remove-index-block (all primary shards active for [" + index + "])", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + blocks.removeIndexBlock(index, INDEX_NOT_RECOVERED_BLOCK); + return ClusterState.builder().state(currentState).blocks(blocks).build(); + } + }); + } + } + } + + if (!event.routingTableChanged() && !event.metaDataChanged()) { + return; + } + + // builder the current state + LocalGatewayState.Builder builder = LocalGatewayState.builder(); + if (currentState != null) { + builder.state(currentState); + } + builder.version(event.state().version()); + builder.metaData(event.state().metaData()); + // remove from the current state all the shards that are primary and started, we won't need them anymore + for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + if (indexShardRoutingTable.primaryShard().active()) { + builder.remove(indexShardRoutingTable.shardId()); + } + } + } + // now, add all the ones that are active and on this node + RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId()); + if (routingNode != null) { + // out node is not in play yet... + for (MutableShardRouting shardRouting : routingNode) { + if (shardRouting.active()) { + builder.put(shardRouting.shardId(), event.state().version()); + } + } + } + + try { + LocalGatewayState stateToWrite = builder.build(); + BinaryXContentBuilder xContentBuilder = XContentFactory.contentBinaryBuilder(XContentType.JSON); + xContentBuilder.prettyPrint(); + xContentBuilder.startObject(); + LocalGatewayState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); + xContentBuilder.endObject(); + + File stateFile = new File(location, "state-" + event.state().version()); + FileOutputStream fos = new FileOutputStream(stateFile); + fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength()); + fos.close(); + + FileSystemUtils.syncFile(stateFile); + + currentState = stateToWrite; + } catch (IOException e) { + logger.warn("failed to write updated state", e); + } + + // delete all the other files + File[] files = location.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return !name.equals("state-" + event.state().version()); + } + }); + for (File file : files) { + file.delete(); + } + } + + private long findLatestStateVersion() throws IOException { + long index = -1; + for (File stateFile : location.listFiles()) { + if (logger.isTraceEnabled()) { + logger.trace("[findLatestState]: Processing [" + stateFile.getName() + "]"); + } + String name = stateFile.getName(); + if (!name.startsWith("state-")) { + continue; + } + long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1)); + if (fileIndex >= index) { + // try and read the meta data + try { + readState(Streams.copyToByteArray(new FileInputStream(stateFile))); + index = fileIndex; + } catch (IOException e) { + logger.warn("[findLatestState]: Failed to read state from [" + name + "], ignoring...", e); + } + } + } + + return index; + } + + private LocalGatewayState readState(byte[] data) throws IOException { + XContentParser parser = null; + try { + parser = XContentFactory.xContent(XContentType.JSON).createParser(data); + return LocalGatewayState.Builder.fromXContent(parser, settings); + } finally { + if (parser != null) { + parser.close(); + } + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java new file mode 100644 index 00000000000..9bfa6a91f1a --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway.local; + +import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.inject.PreProcessModule; +import org.elasticsearch.gateway.Gateway; + +/** + * @author kimchy (shay.banon) + */ +public class LocalGatewayModule extends AbstractModule implements PreProcessModule { + + @Override protected void configure() { + bind(Gateway.class).to(LocalGateway.class).asEagerSingleton(); + bind(TransportNodesListGatewayState.class).asEagerSingleton(); + } + + @Override public void processModule(Module module) { + if (module instanceof ShardAllocationModule) { + ((ShardAllocationModule) module).addNodeAllocation(LocalGatewayNodeAllocation.class); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java new file mode 100644 index 00000000000..01eb0dbc49e --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java @@ -0,0 +1,110 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway.local; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.allocation.NodeAllocation; +import org.elasticsearch.cluster.routing.allocation.NodeAllocations; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.collect.Sets; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; + +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * @author kimchy (shay.banon) + */ +public class LocalGatewayNodeAllocation extends NodeAllocation { + + private final TransportNodesListGatewayState listGatewayState; + + @Inject public LocalGatewayNodeAllocation(Settings settings, TransportNodesListGatewayState listGatewayState) { + super(settings); + this.listGatewayState = listGatewayState; + } + + @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { + boolean changed = false; + + for (IndexRoutingTable indexRoutingTable : routingNodes.routingTable()) { + // only do the allocation if there is a local "INDEX NOT RECOVERED" block + if (!routingNodes.blocks().hasIndexBlock(indexRoutingTable.index(), LocalGateway.INDEX_NOT_RECOVERED_BLOCK)) { + continue; + } + + if (indexRoutingTable.allPrimaryShardsUnassigned()) { + // all primary are unassigned for the index, see if we can allocate it on existing nodes, if not, don't assign + Set nodesIds = Sets.newHashSet(); + nodesIds.addAll(nodes.dataNodes().keySet()); + nodesIds.addAll(nodes.masterNodes().keySet()); + TransportNodesListGatewayState.NodesLocalGatewayState nodesState = listGatewayState.list(nodesIds, null).actionGet(); + + // make a list of ShardId to Node, each one from the latest version + Map> shards = Maps.newHashMap(); + for (TransportNodesListGatewayState.NodeLocalGatewayState nodeState : nodesState) { + for (Map.Entry entry : nodeState.state().shards().entrySet()) { + if (entry.getKey().index().name().equals(indexRoutingTable.index())) { + Tuple t = shards.get(entry.getKey()); + if (t == null || entry.getValue() > t.v2().longValue()) { + t = new Tuple(nodeState.node(), entry.getValue()); + shards.put(entry.getKey(), t); + } + } + } + } + + // check if we managed to allocate to all of them, if not, move all relevant shards to ignored + if (shards.size() < indexRoutingTable.shards().size()) { + for (Iterator it = routingNodes.unassigned().iterator(); it.hasNext();) { + MutableShardRouting shardRouting = it.next(); + if (shardRouting.index().equals(indexRoutingTable.index())) { + it.remove(); + routingNodes.ignoredUnassigned().add(shardRouting); + } + } + } else { + changed = true; + // we found all nodes to allocate to, do the allocation + for (Iterator it = routingNodes.unassigned().iterator(); it.hasNext();) { + MutableShardRouting shardRouting = it.next(); + if (shardRouting.primary()) { + DiscoveryNode node = shards.get(shardRouting.shardId()).v1(); + RoutingNode routingNode = routingNodes.node(node.id()); + routingNode.add(shardRouting); + it.remove(); + } + } + } + } + } + + return changed; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayState.java new file mode 100644 index 00000000000..b9b44d3fe93 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayState.java @@ -0,0 +1,222 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway.local; + +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.builder.XContentBuilder; +import org.elasticsearch.index.shard.ShardId; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Map; + +/** + * @author kimchy (shay.banon) + */ +public class LocalGatewayState { + + public static class StartedShard { + private final long version; + private final ShardId shardId; + + public StartedShard(long version, ShardId shardId) { + this.version = version; + this.shardId = shardId; + } + + public long version() { + return version; + } + + public ShardId shardId() { + return shardId; + } + } + + private final long version; + + private final MetaData metaData; + + private final ImmutableMap shards; + + public LocalGatewayState(long version, MetaData metaData, Map shards) { + this.version = version; + this.metaData = metaData; + this.shards = ImmutableMap.copyOf(shards); + } + + public long version() { + return version; + } + + public MetaData metaData() { + return metaData; + } + + public ImmutableMap shards() { + return this.shards; + } + + public Long startedShardVersion(ShardId shardId) { + return shards.get(shardId); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private long version; + + private MetaData metaData; + + private Map shards = Maps.newHashMap(); + + public Builder state(LocalGatewayState state) { + this.version = state.version(); + this.metaData = state.metaData(); + this.shards.putAll(state.shards); + return this; + } + + public Builder version(long version) { + this.version = version; + return this; + } + + public Builder metaData(MetaData metaData) { + this.metaData = metaData; + return this; + } + + public Builder remove(ShardId shardId) { + this.shards.remove(shardId); + return this; + } + + public Builder put(ShardId shardId, long version) { + this.shards.put(shardId, version); + return this; + } + + public LocalGatewayState build() { + return new LocalGatewayState(version, metaData, shards); + } + + public static void toXContent(LocalGatewayState state, XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject("state"); + + builder.field("version", state.version()); + MetaData.Builder.toXContent(state.metaData(), builder, params); + + builder.startArray("shards"); + for (Map.Entry entry : state.shards.entrySet()) { + builder.startObject(); + builder.field("index", entry.getKey().index().name()); + builder.field("id", entry.getKey().id()); + builder.field("version", entry.getValue()); + builder.endObject(); + } + builder.endArray(); + + builder.endObject(); + } + + public static LocalGatewayState fromXContent(XContentParser parser, @Nullable Settings globalSettings) throws IOException { + Builder builder = new Builder(); + + String currentFieldName = null; + XContentParser.Token token = parser.nextToken(); + if (token == null) { + // no data... + return builder.build(); + } + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if ("meta-data".equals(currentFieldName)) { + builder.metaData = MetaData.Builder.fromXContent(parser, globalSettings); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if ("shards".equals(currentFieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token == XContentParser.Token.START_OBJECT) { + String shardIndex = null; + int shardId = -1; + long version = -1; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if ("index".equals(currentFieldName)) { + shardIndex = parser.text(); + } else if ("id".equals(currentFieldName)) { + shardId = parser.intValue(); + } else if ("version".equals(currentFieldName)) { + version = parser.longValue(); + } + } + } + builder.shards.put(new ShardId(shardIndex, shardId), version); + } + } + } + } else if (token.isValue()) { + if ("version".equals(currentFieldName)) { + builder.version = parser.longValue(); + } + } + } + + return builder.build(); + } + + public static LocalGatewayState readFrom(StreamInput in, @Nullable Settings globalSettings) throws IOException { + LocalGatewayState.Builder builder = new Builder(); + builder.version = in.readLong(); + builder.metaData = MetaData.Builder.readFrom(in, globalSettings); + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + builder.shards.put(ShardId.readShardId(in), in.readLong()); + } + return builder.build(); + } + + public static void writeTo(LocalGatewayState state, StreamOutput out) throws IOException { + out.writeLong(state.version()); + MetaData.Builder.writeTo(state.metaData(), out); + + out.writeVInt(state.shards.size()); + for (Map.Entry entry : state.shards.entrySet()) { + entry.getKey().writeTo(out); + out.writeLong(entry.getValue()); + } + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayState.java new file mode 100644 index 00000000000..a7eb41f88bd --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayState.java @@ -0,0 +1,220 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway.local; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.nodes.*; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Lists; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReferenceArray; + +/** + * @author kimchy (shay.banon) + */ +public class TransportNodesListGatewayState extends TransportNodesOperationAction { + + private LocalGateway gateway; + + @Inject public TransportNodesListGatewayState(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) { + super(settings, clusterName, threadPool, clusterService, transportService); + } + + TransportNodesListGatewayState initGateway(LocalGateway gateway) { + this.gateway = gateway; + return this; + } + + public ActionFuture list(Set nodesIds, @Nullable TimeValue timeout) { + return execute(new Request(nodesIds).timeout(timeout)); + } + + @Override protected String transportAction() { + return "/gateway/local/state"; + } + + @Override protected String transportNodeAction() { + return "/gateway/local/state/node"; + } + + @Override protected Request newRequest() { + return new Request(); + } + + @Override protected NodeRequest newNodeRequest() { + return new NodeRequest(); + } + + @Override protected NodeRequest newNodeRequest(String nodeId, Request request) { + return new NodeRequest(nodeId); + } + + @Override protected NodeLocalGatewayState newNodeResponse() { + return new NodeLocalGatewayState(); + } + + @Override protected NodesLocalGatewayState newResponse(Request request, AtomicReferenceArray responses) { + final List nodesList = Lists.newArrayList(); + final List failures = Lists.newArrayList(); + for (int i = 0; i < responses.length(); i++) { + Object resp = responses.get(i); + if (resp instanceof NodeLocalGatewayState) { // will also filter out null response for unallocated ones + nodesList.add((NodeLocalGatewayState) resp); + } else if (resp instanceof FailedNodeException) { + failures.add((FailedNodeException) resp); + } + } + return new NodesLocalGatewayState(clusterName, nodesList.toArray(new NodeLocalGatewayState[nodesList.size()]), + failures.toArray(new FailedNodeException[failures.size()])); + } + + @Override protected NodeLocalGatewayState nodeOperation(NodeRequest request) throws ElasticSearchException { + return new NodeLocalGatewayState(clusterService.state().nodes().localNode(), gateway.currentState()); + } + + @Override protected boolean accumulateExceptions() { + return true; + } + + static class Request extends NodesOperationRequest { + + public Request() { + } + + public Request(Set nodesIds) { + super(nodesIds.toArray(new String[nodesIds.size()])); + } + + @Override public Request timeout(TimeValue timeout) { + super.timeout(timeout); + return this; + } + + @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + } + + public static class NodesLocalGatewayState extends NodesOperationResponse { + + private FailedNodeException[] failures; + + NodesLocalGatewayState() { + } + + public NodesLocalGatewayState(ClusterName clusterName, NodeLocalGatewayState[] nodes, FailedNodeException[] failures) { + super(clusterName, nodes); + this.failures = failures; + } + + public FailedNodeException[] failures() { + return failures; + } + + @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + nodes = new NodeLocalGatewayState[in.readVInt()]; + for (int i = 0; i < nodes.length; i++) { + nodes[i] = new NodeLocalGatewayState(); + nodes[i].readFrom(in); + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(nodes.length); + for (NodeLocalGatewayState response : nodes) { + response.writeTo(out); + } + } + } + + + static class NodeRequest extends NodeOperationRequest { + + NodeRequest() { + } + + NodeRequest(String nodeId) { + super(nodeId); + } + + @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + } + + public static class NodeLocalGatewayState extends NodeOperationResponse { + + private LocalGatewayState state; + + NodeLocalGatewayState() { + } + + public NodeLocalGatewayState(DiscoveryNode node, LocalGatewayState state) { + super(node); + this.state = state; + } + + public LocalGatewayState state() { + return state; + } + + @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + if (in.readBoolean()) { + state = LocalGatewayState.Builder.readFrom(in, null); + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (state == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + LocalGatewayState.Builder.writeTo(state, out); + } + } + } +} diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java index 8e51df2e37a..657da2adcc5 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java @@ -69,17 +69,17 @@ public class ElectReplicaAsPrimaryDuringRelocationTests { clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start the primary shards"); - RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start the replica shards"); - routingNodes = routingTable.routingNodes(clusterState.metaData()); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(2)); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 622d793a2a8..ee32fc0c2fe 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -72,7 +72,7 @@ public class FailedShardsRoutingTests { clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start the shards (primaries)"); - RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); @@ -90,7 +90,7 @@ public class FailedShardsRoutingTests { } logger.info("Start the shards (backups)"); - routingNodes = routingTable.routingNodes(clusterState.metaData()); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); @@ -112,7 +112,7 @@ public class FailedShardsRoutingTests { prevRoutingTable = routingTable; routingTable = strategy.reroute(clusterState); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(3)); @@ -123,11 +123,11 @@ public class FailedShardsRoutingTests { assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(2)); logger.info("Fail the shards on node 3"); - routingNodes = routingTable.routingNodes(clusterState.metaData()); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(3)); @@ -139,7 +139,7 @@ public class FailedShardsRoutingTests { prevRoutingTable = routingTable; routingTable = strategy.reroute(clusterState); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(3)); @@ -150,11 +150,11 @@ public class FailedShardsRoutingTests { assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(2)); logger.info("Start the shards on node 3"); - routingNodes = routingTable.routingNodes(clusterState.metaData()); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(3)); @@ -227,7 +227,7 @@ public class FailedShardsRoutingTests { } logger.info("Start the primary shards"); - RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); @@ -250,12 +250,12 @@ public class FailedShardsRoutingTests { assertThat(prevRoutingTable == routingTable, equalTo(true)); logger.info("Fail backup shards on node2"); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; List failedShards = routingNodes.node("node2").shardsWithState(INITIALIZING); routingTable = strategy.applyFailedShards(clusterState, failedShards); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(10)); @@ -272,7 +272,7 @@ public class FailedShardsRoutingTests { // fail them again... routingTable = strategy.applyFailedShards(clusterState, failedShards); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(10)); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java index 022e5fc66d4..1a6f4d40823 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java @@ -68,13 +68,13 @@ public class PrimaryElectionRoutingTests { clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start the primary shard (on node1)"); - RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start the backup shard (on node2)"); - routingNodes = routingTable.routingNodes(clusterState.metaData()); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); @@ -84,7 +84,7 @@ public class PrimaryElectionRoutingTests { prevRoutingTable = routingTable; routingTable = strategy.reroute(clusterState); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(1)); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java index 76698939cb7..01b31ccd45a 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java @@ -68,7 +68,7 @@ public class PrimaryNotRelocatedWhileBeingRecoveredTests { clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start the primary shard (on node1)"); - RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + RoutingNodes routingNodes = clusterState.routingNodes(); routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java index d1b26ca48b6..5e0e7298eaa 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java @@ -84,11 +84,11 @@ public class RebalanceAfterActiveTests { } logger.info("start all the primary shards, replicas will start initializing"); - RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); for (int i = 0; i < routingTable.index("test").shards().size(); i++) { assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); @@ -103,7 +103,7 @@ public class RebalanceAfterActiveTests { prevRoutingTable = routingTable; routingTable = strategy.reroute(clusterState); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); for (int i = 0; i < routingTable.index("test").shards().size(); i++) { assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); @@ -112,33 +112,33 @@ public class RebalanceAfterActiveTests { } logger.info("start the replica shards, rebalancing should start"); - routingNodes = routingTable.routingNodes(clusterState.metaData()); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); // we only allow one relocation at a time assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5)); assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(5)); logger.info("complete relocation, other half of relocation should happen"); - routingNodes = routingTable.routingNodes(clusterState.metaData()); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); // we now only relocate 3, since 2 remain where they are! assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(7)); assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(3)); logger.info("complete relocation, thats it!"); - routingNodes = routingTable.routingNodes(clusterState.metaData()); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10)); // make sure we have an even relocation diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java index 968d1d79640..06b8fc35373 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java @@ -87,7 +87,7 @@ public class ReplicaAllocatedAfterPrimaryTests { assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), nullValue()); logger.info("Start all the primary shards"); - RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java index 1239f034dcc..66483e4baf4 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java @@ -92,9 +92,10 @@ public class SingleShardNoReplicasRoutingTests { clusterState = newClusterStateBuilder().state(clusterState).build(); routingTable = strategy.reroute(clusterState); assertThat(routingTable == prevRoutingTable, equalTo(true)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Marking the shard as started"); - RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); @@ -141,7 +142,7 @@ public class SingleShardNoReplicasRoutingTests { assertThat(routingTable == prevRoutingTable, equalTo(true)); logger.info("Start the shard on node 2"); - routingNodes = routingTable.routingNodes(clusterState.metaData()); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); @@ -190,7 +191,7 @@ public class SingleShardNoReplicasRoutingTests { assertThat(routingTable.index("test").shard(0).shards().get(0).currentNodeId(), equalTo("node1")); logger.info("Marking the shard as failed"); - RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); @@ -255,7 +256,7 @@ public class SingleShardNoReplicasRoutingTests { int nodeIndex = Integer.parseInt(nodeId.substring("node".length())); assertThat(nodeIndex, lessThan(25)); } - RoutingNodes routingNodes = routingTable.routingNodes(metaData); + RoutingNodes routingNodes = clusterState.routingNodes(); Set encounteredIndices = newHashSet(); for (RoutingNode routingNode : routingNodes) { assertThat(routingNode.numberOfShardsWithState(STARTED), equalTo(0)); @@ -348,7 +349,7 @@ public class SingleShardNoReplicasRoutingTests { assertThat(routingTable.index("test" + i).shard(0).shards().size(), equalTo(1)); assertThat(routingTable.index("test" + i).shard(0).shards().get(0).state(), equalTo(INITIALIZING)); } - RoutingNodes routingNodes = routingTable.routingNodes(metaData); + RoutingNodes routingNodes = clusterState.routingNodes(); assertThat(routingNodes.numberOfShardsOfType(INITIALIZING), equalTo(numberOfIndices)); assertThat(routingNodes.node("node1").numberOfShardsWithState(INITIALIZING), anyOf(equalTo(3), equalTo(4))); assertThat(routingNodes.node("node2").numberOfShardsWithState(INITIALIZING), anyOf(equalTo(3), equalTo(4))); @@ -365,7 +366,7 @@ public class SingleShardNoReplicasRoutingTests { assertThat(prevRoutingTable == routingTable, equalTo(true)); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); @@ -377,7 +378,7 @@ public class SingleShardNoReplicasRoutingTests { assertThat(routingTable.index("test" + i).shard(0).shards().size(), equalTo(1)); assertThat(routingTable.index("test" + i).shard(0).shards().get(0).state(), anyOf(equalTo(RELOCATING), equalTo(STARTED))); } - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); assertThat("4 source shard routing are relocating", routingNodes.numberOfShardsOfType(RELOCATING), equalTo(4)); assertThat("4 target shard routing are initializing", routingNodes.numberOfShardsOfType(INITIALIZING), equalTo(4)); @@ -394,7 +395,7 @@ public class SingleShardNoReplicasRoutingTests { assertThat(routingTable.index("test" + i).shard(0).shards().size(), equalTo(1)); assertThat(routingTable.index("test" + i).shard(0).shards().get(0).state(), anyOf(equalTo(RELOCATING), equalTo(STARTED))); } - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); assertThat(routingNodes.numberOfShardsOfType(STARTED), equalTo(numberOfIndices)); for (RoutingNode routingNode : routingNodes) { assertThat(routingNode.numberOfShardsWithState(STARTED), equalTo(2)); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java index 2279a316d49..8bb796586e1 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java @@ -95,7 +95,7 @@ public class SingleShardOneReplicaRoutingTests { assertThat(prevRoutingTable == routingTable, equalTo(true)); logger.info("Start the primary shard (on node1)"); - RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); @@ -118,7 +118,7 @@ public class SingleShardOneReplicaRoutingTests { assertThat(prevRoutingTable == routingTable, equalTo(true)); logger.info("Start the backup shard"); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java index 410f67876bb..faa5192c4d9 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java @@ -99,7 +99,7 @@ public class TenShardsOneReplicaRoutingTests { assertThat(prevRoutingTable == routingTable, equalTo(true)); logger.info("Start the primary shard (on node1)"); - RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); @@ -123,11 +123,11 @@ public class TenShardsOneReplicaRoutingTests { assertThat(prevRoutingTable == routingTable, equalTo(true)); logger.info("Start the backup shard"); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(10)); @@ -148,7 +148,7 @@ public class TenShardsOneReplicaRoutingTests { prevRoutingTable = routingTable; routingTable = strategy.reroute(clusterState); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(10)); @@ -159,11 +159,11 @@ public class TenShardsOneReplicaRoutingTests { assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(6)); logger.info("Start the shards on node 3"); - routingNodes = routingTable.routingNodes(clusterState.metaData()); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = routingTable.routingNodes(metaData); + routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(10)); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java index b9fb86462e3..34fde7b2bde 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java @@ -60,13 +60,13 @@ public class UpdateNumberOfReplicasTests { clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start all the primary shards"); - RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData()); + RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); logger.info("Start all the replica shards"); - routingNodes = routingTable.routingNodes(clusterState.metaData()); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); @@ -82,7 +82,7 @@ public class UpdateNumberOfReplicasTests { assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2")); logger.info("add another replica"); - routingNodes = routingTable.routingNodes(clusterState.metaData()); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = newRoutingTableBuilder().routingTable(routingTable).updateNumberOfReplicas(2).build(); metaData = MetaData.newMetaDataBuilder().metaData(clusterState.metaData()).updateNumberOfReplicas(2).build(); @@ -117,7 +117,7 @@ public class UpdateNumberOfReplicasTests { assertThat(routingTable.index("test").shard(0).replicaShards().get(1).state(), equalTo(INITIALIZING)); assertThat(routingTable.index("test").shard(0).replicaShards().get(1).currentNodeId(), equalTo("node3")); - routingNodes = routingTable.routingNodes(clusterState.metaData()); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); @@ -134,7 +134,7 @@ public class UpdateNumberOfReplicasTests { assertThat(routingTable.index("test").shard(0).replicaShards().get(1).currentNodeId(), equalTo("node3")); logger.info("now remove a replica"); - routingNodes = routingTable.routingNodes(clusterState.metaData()); + routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; routingTable = newRoutingTableBuilder().routingTable(routingTable).updateNumberOfReplicas(1).build(); metaData = MetaData.newMetaDataBuilder().metaData(clusterState.metaData()).updateNumberOfReplicas(1).build();