diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java index dce5c2afac2..9dd2619d383 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java @@ -21,6 +21,7 @@ package org.elasticsearch.gateway.local; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.*; @@ -31,6 +32,9 @@ import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.trove.ExtTObjectIntHasMap; +import org.elasticsearch.common.trove.TObjectIntHashMap; +import org.elasticsearch.common.trove.TObjectIntIterator; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -65,6 +69,8 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { private final TimeValue listTimeout; + private final String initialShards; + @Inject public LocalGatewayNodeAllocation(Settings settings, IndicesService indicesService, TransportNodesListGatewayStartedShards listGatewayStartedShards, TransportNodesListShardStoreMetaData listShardStoreMetaData) { super(settings); @@ -73,6 +79,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { this.listShardStoreMetaData = listShardStoreMetaData; this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30)); + this.initialShards = componentSettings.get("initial_shards", "quorum"); } @Override public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List startedShards) { @@ -166,16 +173,21 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { // make a list of ShardId to Node, each one from the latest version Map> shards = Maps.newHashMap(); + // and a list of the number of shard instances + TObjectIntHashMap shardsCounts = new ExtTObjectIntHasMap().defaultReturnValue(-1); for (TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards nodeState : nodesState) { if (nodeState.state() == null) { continue; } for (Map.Entry entry : nodeState.state().shards().entrySet()) { - if (entry.getKey().index().name().equals(indexRoutingTable.index())) { - Tuple t = shards.get(entry.getKey()); + ShardId shardId = entry.getKey(); + if (shardId.index().name().equals(indexRoutingTable.index())) { + shardsCounts.adjustOrPutValue(shardId, 1, 1); + + Tuple t = shards.get(shardId); if (t == null || entry.getValue() > t.v2().longValue()) { t = new Tuple(nodeState.node(), entry.getValue()); - shards.put(entry.getKey(), t); + shards.put(shardId, t); } } } @@ -183,25 +195,48 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { // check if we managed to allocate to all of them, if not, move all relevant shards to ignored if (shards.size() < indexRoutingTable.shards().size()) { - for (Iterator it = routingNodes.unassigned().iterator(); it.hasNext();) { - MutableShardRouting shardRouting = it.next(); - if (shardRouting.index().equals(indexRoutingTable.index())) { - it.remove(); - routingNodes.ignoredUnassigned().add(shardRouting); + moveIndexToIgnoreUnassigned(routingNodes, indexRoutingTable); + } else { + // check if the counts meets the minimum set + int requiredNumber = 1; + IndexMetaData indexMetaData = routingNodes.metaData().index(indexRoutingTable.index()); + if ("quorum".equals(initialShards)) { + if (indexMetaData.numberOfReplicas() > 1) { + requiredNumber = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1; + } + } else if ("full".equals(initialShards)) { + requiredNumber = indexMetaData.numberOfReplicas() + 1; + } else if ("full-1".equals(initialShards)) { + if (indexMetaData.numberOfReplicas() > 1) { + requiredNumber = indexMetaData.numberOfReplicas(); + } + } else { + requiredNumber = Integer.parseInt(initialShards); + } + + boolean allocate = true; + for (TObjectIntIterator it = shardsCounts.iterator(); it.hasNext();) { + it.advance(); + if (it.value() < requiredNumber) { + allocate = false; } } - } else { - changed = true; - // we found all nodes to allocate to, do the allocation - for (Iterator it = routingNodes.unassigned().iterator(); it.hasNext();) { - MutableShardRouting shardRouting = it.next(); - if (shardRouting.primary()) { - DiscoveryNode node = shards.get(shardRouting.shardId()).v1(); - logger.debug("[{}][{}] initial allocation to [{}]", shardRouting.index(), shardRouting.id(), node); - RoutingNode routingNode = routingNodes.node(node.id()); - routingNode.add(shardRouting); - it.remove(); + + if (allocate) { + changed = true; + // we found all nodes to allocate to, do the allocation + for (Iterator it = routingNodes.unassigned().iterator(); it.hasNext();) { + MutableShardRouting shardRouting = it.next(); + if (shardRouting.primary()) { + DiscoveryNode node = shards.get(shardRouting.shardId()).v1(); + logger.debug("[{}][{}] initial allocation to [{}]", shardRouting.index(), shardRouting.id(), node); + RoutingNode routingNode = routingNodes.node(node.id()); + routingNode.add(shardRouting); + it.remove(); + } } + } else { + moveIndexToIgnoreUnassigned(routingNodes, indexRoutingTable); } } } @@ -322,6 +357,16 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { return changed; } + private void moveIndexToIgnoreUnassigned(RoutingNodes routingNodes, IndexRoutingTable indexRoutingTable) { + for (Iterator it = routingNodes.unassigned().iterator(); it.hasNext();) { + MutableShardRouting shardRouting = it.next(); + if (shardRouting.index().equals(indexRoutingTable.index())) { + it.remove(); + routingNodes.ignoredUnassigned().add(shardRouting); + } + } + } + private ConcurrentMap buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) { ConcurrentMap shardStores = cachedStores.get(shard.shardId()); if (shardStores == null) { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/QuorumLocalGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/QuorumLocalGatewayTests.java new file mode 100644 index 00000000000..000a60010a2 --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/QuorumLocalGatewayTests.java @@ -0,0 +1,116 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.gateway.local; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.internal.InternalNode; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + +import static org.elasticsearch.client.Requests.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; +import static org.elasticsearch.common.xcontent.XContentFactory.*; +import static org.elasticsearch.index.query.xcontent.QueryBuilders.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author kimchy (shay.banon) + */ +public class QuorumLocalGatewayTests extends AbstractNodesTests { + + @AfterMethod public void cleanAndCloseNodes() throws Exception { + for (int i = 0; i < 10; i++) { + if (node("node" + i) != null) { + node("node" + i).stop(); + // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well + ((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset(); + } + } + closeAllNodes(); + } + + @Test public void testQuorumRecovery() throws Exception { + // clean three nodes + buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); + buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); + buildNode("node3", settingsBuilder().put("gateway.type", "local").build()); + cleanAndCloseNodes(); + + Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); + Node node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); + Node node3 = startNode("node3", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); + + node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); + node1.client().admin().indices().prepareFlush().execute().actionGet(); + node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet(); + node1.client().admin().indices().prepareRefresh().execute().actionGet(); + + logger.info("--> running cluster_health (wait for the shards to startup)"); + ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet(); + logger.info("--> done cluster_health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + + for (int i = 0; i < 10; i++) { + assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l)); + } + + logger.info("--> closing first node, and indexing more data to the second node"); + closeNode("node1"); + + logger.info("--> running cluster_health (wait for the shards to startup)"); + clusterHealth = client("node2").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(4)).actionGet(); + logger.info("--> done cluster_health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW)); + + node2.client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet(); + node2.client().admin().indices().prepareRefresh().execute().actionGet(); + + for (int i = 0; i < 10; i++) { + assertThat(node2.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(3l)); + } + + logger.info("--> closing the second node and third node"); + closeNode("node2"); + closeNode("node3"); + + logger.info("--> starting the nodes back, verifying we got the latest version"); + + node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").build()); + node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").build()); + node2 = startNode("node3", settingsBuilder().put("gateway.type", "local").build()); + + logger.info("--> running cluster_health (wait for the shards to startup)"); + clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet(); + logger.info("--> done cluster_health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + + for (int i = 0; i < 10; i++) { + assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(3l)); + } + } +}