diff --git a/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index f77eeb253ac..c0ae88a5102 100644 --- a/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -23,8 +23,10 @@ import com.carrotsearch.hppc.ObjectLongHashMap; import com.carrotsearch.hppc.ObjectHashSet; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.predicates.ObjectPredicate; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -45,14 +47,13 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.transport.ConnectTransportException; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentMap; /** @@ -101,6 +102,15 @@ public class GatewayAllocator extends AbstractComponent { } } + /** + * Return {@code true} if the index is configured to allow shards to be + * recovered on any node + */ + private boolean recoverOnAnyNode(@IndexSettings Settings idxSettings) { + return IndexMetaData.isOnSharedFilesystem(idxSettings) && + idxSettings.getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false); + } + public boolean allocateUnassigned(RoutingAllocation allocation) { boolean changed = false; DiscoveryNodes nodes = allocation.nodes(); @@ -125,11 +135,13 @@ public class GatewayAllocator extends AbstractComponent { int numberOfAllocationsFound = 0; long highestVersion = -1; - Set nodesWithHighestVersion = Sets.newHashSet(); + final Map nodesWithVersion = Maps.newHashMap(); assert !nodesState.containsKey(null); final Object[] keys = nodesState.keys; final long[] values = nodesState.values; + IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index()); + Settings idxSettings = indexMetaData.settings(); for (int i = 0; i < keys.length; i++) { if (keys[i] == null) { continue; @@ -141,29 +153,63 @@ public class GatewayAllocator extends AbstractComponent { if (allocation.shouldIgnoreShardForNode(shard.shardId(), node.id())) { continue; } - if (version != -1) { + if (recoverOnAnyNode(idxSettings)) { numberOfAllocationsFound++; - if (highestVersion == -1) { - nodesWithHighestVersion.add(node); + if (version > highestVersion) { highestVersion = version; - } else { - if (version > highestVersion) { - nodesWithHighestVersion.clear(); - nodesWithHighestVersion.add(node); - highestVersion = version; - } else if (version == highestVersion) { - nodesWithHighestVersion.add(node); - } + } + // We always put the node without clearing the map + nodesWithVersion.put(node, version); + } else if (version != -1) { + numberOfAllocationsFound++; + // If we've found a new "best" candidate, clear the + // current candidates and add it + if (version > highestVersion) { + highestVersion = version; + nodesWithVersion.clear(); + nodesWithVersion.put(node, version); + } else if (version == highestVersion) { + // If the candidate is the same, add it to the + // list, but keep the current candidate + nodesWithVersion.put(node, version); } } } + // Now that we have a map of nodes to versions along with the + // number of allocations found (and not ignored), we need to sort + // it so the node with the highest version is at the beginning + List nodesWithHighestVersion = Lists.newArrayList(); + nodesWithHighestVersion.addAll(nodesWithVersion.keySet()); + CollectionUtil.timSort(nodesWithHighestVersion, new Comparator() { + @Override + public int compare(DiscoveryNode o1, DiscoveryNode o2) { + return Long.compare(nodesWithVersion.get(o2), nodesWithVersion.get(o1)); + } + }); + + if (logger.isDebugEnabled()) { + logger.debug("[{}][{}] found {} allocations of {}, highest version: [{}]", + shard.index(), shard.id(), numberOfAllocationsFound, shard, highestVersion); + } + if (logger.isTraceEnabled()) { + StringBuilder sb = new StringBuilder("["); + for (DiscoveryNode n : nodesWithHighestVersion) { + sb.append("["); + sb.append(n.getName()); + sb.append("]"); + sb.append(" -> "); + sb.append(nodesWithVersion.get(n)); + sb.append(", "); + } + sb.append("]"); + logger.trace("{} candidates for allocation: {}", shard, sb.toString()); + } // check if the counts meets the minimum set int requiredAllocation = 1; // if we restore from a repository one copy is more then enough if (shard.restoreSource() == null) { try { - IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index()); String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards)); if ("quorum".equals(initialShards)) { if (indexMetaData.numberOfReplicas() > 1) { @@ -415,13 +461,6 @@ public class GatewayAllocator extends AbstractComponent { for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : response) { long version = nodeShardState.version(); - Settings idxSettings = indexMetaData.settings(); - if (IndexMetaData.isOnSharedFilesystem(idxSettings) && - idxSettings.getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false)) { - // Shared filesystems use 0 as a minimum shard state, which - // means that the shard can be allocated to any node - version = Math.max(0, version); - } // -1 version means it does not exists, which is what the API returns, and what we expect to logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version); diff --git a/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java b/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java index d3a1a00e98b..125046ce43f 100644 --- a/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java +++ b/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java @@ -52,6 +52,7 @@ import java.nio.file.Path; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -612,20 +613,67 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { assertThat(hits[3].field("foo").getValue().toString(), equalTo("foo")); } + /** wait until none of the nodes have shards allocated on them */ + private void assertNoShardsOn(final List nodeList) throws Exception { + assertBusy(new Runnable() { + @Override + public void run() { + ClusterStateResponse resp = client().admin().cluster().prepareState().get(); + RoutingNodes nodes = resp.getState().getRoutingNodes(); + for (RoutingNode node : nodes) { + logger.info("--> node {} has {} shards", node.node().getName(), node.numberOfOwningShards()); + if (nodeList.contains(node.node().getName())) { + assertThat("no shards on node", node.numberOfOwningShards(), equalTo(0)); + } + } + } + }); + } + + /** wait until the node has the specified number of shards allocated on it */ + private void assertShardCountOn(final String nodeName, final int shardCount) throws Exception { + assertBusy(new Runnable() { + @Override + public void run() { + ClusterStateResponse resp = client().admin().cluster().prepareState().get(); + RoutingNodes nodes = resp.getState().getRoutingNodes(); + for (RoutingNode node : nodes) { + logger.info("--> node {} has {} shards", node.node().getName(), node.numberOfOwningShards()); + if (nodeName.equals(node.node().getName())) { + assertThat(node.numberOfOwningShards(), equalTo(shardCount)); + } + } + } + }); + } + @Test public void testIndexOnSharedFSRecoversToAnyNode() throws Exception { Settings nodeSettings = nodeSettings(); + Settings fooSettings = ImmutableSettings.builder().put(nodeSettings).put("node.affinity", "foo").build(); + Settings barSettings = ImmutableSettings.builder().put(nodeSettings).put("node.affinity", "bar").build(); - internalCluster().startNode(nodeSettings); + final Future> fooNodes = internalCluster().startNodesAsync(2, fooSettings); + final Future> barNodes = internalCluster().startNodesAsync(2, barSettings); + fooNodes.get(); + barNodes.get(); Path dataPath = createTempDir(); String IDX = "test"; + Settings includeFoo = ImmutableSettings.builder() + .put("index.routing.allocation.include.affinity", "foo") + .build(); + Settings includeBar = ImmutableSettings.builder() + .put("index.routing.allocation.include.affinity", "bar") + .build(); + Settings idxSettings = ImmutableSettings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString()) .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) .put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true) + .put(includeFoo) // start with requiring the shards on "foo" .build(); // only one node, so all primaries will end up on node1 @@ -637,17 +685,43 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get(); client().prepareIndex(IDX, "doc", "3").setSource("foo", "baz").get(); client().prepareIndex(IDX, "doc", "4").setSource("foo", "eggplant").get(); + flushAndRefresh(IDX); - // start a second node - internalCluster().startNode(nodeSettings); + // put shards on "bar" + client().admin().indices().prepareUpdateSettings(IDX).setSettings(includeBar).get(); - // node1 is master, stop that one, since we only have primaries, - // usually this would mean data loss, but not on shared fs! - internalCluster().stopCurrentMasterNode(); + // wait for the shards to move from "foo" nodes to "bar" nodes + assertNoShardsOn(fooNodes.get()); + // put shards back on "foo" + client().admin().indices().prepareUpdateSettings(IDX).setSettings(includeFoo).get(); + + // wait for the shards to move from "bar" nodes to "foo" nodes + assertNoShardsOn(barNodes.get()); + + // Stop a foo node + logger.info("--> stopping first 'foo' node"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(fooNodes.get().get(0))); + + // Ensure that the other foo node has all the shards now + assertShardCountOn(fooNodes.get().get(1), 5); + + // Assert no shards on the "bar" nodes + assertNoShardsOn(barNodes.get()); + + // Stop the second "foo" node + logger.info("--> stopping second 'foo' node"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(fooNodes.get().get(1))); + + // The index should still be able to be allocated (on the "bar" nodes), + // all the "foo" nodes are gone ensureGreen(IDX); - refresh(); - SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).addFieldDataField("foo").addSort("foo", SortOrder.ASC).get(); - assertHitCount(resp, 4); + + // Start another "foo" node and make sure the index moves back + logger.info("--> starting additional 'foo' node"); + String newFooNode = internalCluster().startNode(fooSettings); + + assertShardCountOn(newFooNode, 5); + assertNoShardsOn(barNodes.get()); } }