From ca3e0c6d4976b24c6b6a8f833603e92159271ba5 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 20 Jul 2015 00:38:19 +0200 Subject: [PATCH] Initial Refactor Gateway Allocator Break it into more manageable code by separating allocation primaries and allocating replicas. Start adding basic unit tests for primary shard allocator. --- .../gateway/GatewayAllocator.java | 495 +++--------------- .../gateway/PrimaryShardAllocator.java | 298 +++++++++++ .../gateway/ReplicaShardAllocator.java | 212 ++++++++ .../settings/IndexDynamicSettingsModule.java | 3 +- .../gateway/PrimaryShardAllocatorTests.java | 319 +++++++++++ .../index/store/CorruptedFileTest.java | 4 +- .../test/ElasticsearchAllocationTestCase.java | 40 ++ 7 files changed, 938 insertions(+), 433 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java create mode 100644 core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java create mode 100644 core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index e6cd435d9a3..4357a81961b 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -19,41 +19,28 @@ package org.elasticsearch.gateway; -import com.carrotsearch.hppc.ObjectLongHashMap; -import com.carrotsearch.hppc.cursors.ObjectCursor; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; -import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.*; -import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; -import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; -import java.util.*; import java.util.concurrent.ConcurrentMap; /** @@ -61,26 +48,19 @@ import java.util.concurrent.ConcurrentMap; */ public class GatewayAllocator extends AbstractComponent { - public static final String INDEX_RECOVERY_INITIAL_SHARDS = "index.recovery.initial_shards"; - - private final String initialShards; - - private final TransportNodesListGatewayStartedShards startedAction; - private final TransportNodesListShardStoreMetaData storeAction; private RoutingService routingService; + private final PrimaryShardAllocator primaryShardAllocator; + private final ReplicaShardAllocator replicaShardAllocator; + private final ConcurrentMap> asyncFetchStarted = ConcurrentCollections.newConcurrentMap(); private final ConcurrentMap> asyncFetchStore = ConcurrentCollections.newConcurrentMap(); @Inject - public GatewayAllocator(Settings settings, TransportNodesListGatewayStartedShards startedAction, TransportNodesListShardStoreMetaData storeAction) { + public GatewayAllocator(Settings settings, final TransportNodesListGatewayStartedShards startedAction, final TransportNodesListShardStoreMetaData storeAction) { super(settings); - this.startedAction = startedAction; - this.storeAction = storeAction; - - this.initialShards = settings.get("gateway.initial_shards", settings.get("gateway.local.initial_shards", "quorum")); - - logger.debug("using initial_shards [{}]", initialShards); + this.primaryShardAllocator = new InternalPrimaryShardAllocator(settings, startedAction); + this.replicaShardAllocator = new InternalReplicaShardAllocator(settings, storeAction); } public void setReallocation(final ClusterService clusterService, final RoutingService routingService) { @@ -132,416 +112,21 @@ public class GatewayAllocator extends AbstractComponent { } } - /** - * Return {@code true} if the index is configured to allow shards to be - * recovered on any node - */ - private boolean recoverOnAnyNode(@IndexSettings Settings idxSettings) { - return IndexMetaData.isOnSharedFilesystem(idxSettings) && - idxSettings.getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false); - } - - public boolean allocateUnassigned(RoutingAllocation allocation) { + public boolean allocateUnassigned(final RoutingAllocation allocation) { boolean changed = false; - DiscoveryNodes nodes = allocation.nodes(); - RoutingNodes routingNodes = allocation.routingNodes(); - // First, handle primaries, they must find a place to be allocated on here - final MetaData metaData = routingNodes.metaData(); - RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); + RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); unassigned.sort(new PriorityComparator() { @Override protected Settings getIndexSettings(String index) { - IndexMetaData indexMetaData = metaData.index(index); + IndexMetaData indexMetaData = allocation.metaData().index(index); return indexMetaData.getSettings(); } }); // sort for priority ordering - Iterator unassignedIterator = unassigned.iterator(); - while (unassignedIterator.hasNext()) { - ShardRouting shard = unassignedIterator.next(); - if (!shard.primary()) { - continue; - } - - // this is an API allocation, ignore since we know there is no data... - if (!routingNodes.routingTable().index(shard.index()).shard(shard.id()).primaryAllocatedPostApi()) { - continue; - } - - AsyncShardFetch fetch = asyncFetchStarted.get(shard.shardId()); - if (fetch == null) { - fetch = new InternalAsyncFetch<>(logger, "shard_started", shard.shardId(), startedAction); - asyncFetchStarted.put(shard.shardId(), fetch); - } - AsyncShardFetch.FetchResult shardState = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId())); - if (shardState.hasData() == false) { - logger.trace("{}: ignoring allocation, still fetching shard started state", shard); - unassignedIterator.remove(); - routingNodes.ignoredUnassigned().add(shard); - continue; - } - shardState.processAllocation(allocation); - - IndexMetaData indexMetaData = metaData.index(shard.getIndex()); - - /** - * Build a map of DiscoveryNodes to shard state number for the given shard. - * A state of -1 means the shard does not exist on the node, where any - * shard state >= 0 is the state version of the shard on that node's disk. - * - * A shard on shared storage will return at least shard state 0 for all - * nodes, indicating that the shard can be allocated to any node. - */ - ObjectLongHashMap nodesState = new ObjectLongHashMap<>(); - for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) { - long version = nodeShardState.version(); - // -1 version means it does not exists, which is what the API returns, and what we expect to - if (nodeShardState.storeException() == null) { - logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version); - nodesState.put(nodeShardState.getNode(), version); - } else { - // when there is an store exception, we disregard the reported version and assign it as -1 (same as shard does not exist) - logger.trace("[{}] on node [{}] has version [{}] but the store can not be opened, treating as version -1", nodeShardState.storeException(), shard, nodeShardState.getNode(), version); - nodesState.put(nodeShardState.getNode(), -1); - } - } - - int numberOfAllocationsFound = 0; - long highestVersion = -1; - final Map nodesWithVersion = Maps.newHashMap(); - - assert !nodesState.containsKey(null); - final Object[] keys = nodesState.keys; - final long[] values = nodesState.values; - Settings idxSettings = indexMetaData.settings(); - for (int i = 0; i < keys.length; i++) { - if (keys[i] == null) { - continue; - } - - DiscoveryNode node = (DiscoveryNode) keys[i]; - long version = values[i]; - // since we don't check in NO allocation, we need to double check here - if (allocation.shouldIgnoreShardForNode(shard.shardId(), node.id())) { - continue; - } - if (recoverOnAnyNode(idxSettings)) { - numberOfAllocationsFound++; - if (version > highestVersion) { - highestVersion = version; - } - // We always put the node without clearing the map - nodesWithVersion.put(node, version); - } else if (version != -1) { - numberOfAllocationsFound++; - // If we've found a new "best" candidate, clear the - // current candidates and add it - if (version > highestVersion) { - highestVersion = version; - nodesWithVersion.clear(); - nodesWithVersion.put(node, version); - } else if (version == highestVersion) { - // If the candidate is the same, add it to the - // list, but keep the current candidate - nodesWithVersion.put(node, version); - } - } - } - // Now that we have a map of nodes to versions along with the - // number of allocations found (and not ignored), we need to sort - // it so the node with the highest version is at the beginning - List nodesWithHighestVersion = Lists.newArrayList(); - nodesWithHighestVersion.addAll(nodesWithVersion.keySet()); - CollectionUtil.timSort(nodesWithHighestVersion, new Comparator() { - @Override - public int compare(DiscoveryNode o1, DiscoveryNode o2) { - return Long.compare(nodesWithVersion.get(o2), nodesWithVersion.get(o1)); - } - }); - - if (logger.isDebugEnabled()) { - logger.debug("[{}][{}] found {} allocations of {}, highest version: [{}]", - shard.index(), shard.id(), numberOfAllocationsFound, shard, highestVersion); - } - if (logger.isTraceEnabled()) { - StringBuilder sb = new StringBuilder("["); - for (DiscoveryNode n : nodesWithHighestVersion) { - sb.append("["); - sb.append(n.getName()); - sb.append("]"); - sb.append(" -> "); - sb.append(nodesWithVersion.get(n)); - sb.append(", "); - } - sb.append("]"); - logger.trace("{} candidates for allocation: {}", shard, sb.toString()); - } - - // check if the counts meets the minimum set - int requiredAllocation = 1; - // if we restore from a repository one copy is more then enough - if (shard.restoreSource() == null) { - try { - String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards)); - if ("quorum".equals(initialShards)) { - if (indexMetaData.numberOfReplicas() > 1) { - requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1; - } - } else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) { - if (indexMetaData.numberOfReplicas() > 2) { - requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2); - } - } else if ("one".equals(initialShards)) { - requiredAllocation = 1; - } else if ("full".equals(initialShards) || "all".equals(initialShards)) { - requiredAllocation = indexMetaData.numberOfReplicas() + 1; - } else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) { - if (indexMetaData.numberOfReplicas() > 1) { - requiredAllocation = indexMetaData.numberOfReplicas(); - } - } else { - requiredAllocation = Integer.parseInt(initialShards); - } - } catch (Exception e) { - logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard); - } - } - - // not enough found for this shard, continue... - if (numberOfAllocationsFound < requiredAllocation) { - // if we are restoring this shard we still can allocate - if (shard.restoreSource() == null) { - // we can't really allocate, so ignore it and continue - unassignedIterator.remove(); - routingNodes.ignoredUnassigned().add(shard); - if (logger.isDebugEnabled()) { - logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}], required_number [{}]", shard.index(), shard.id(), numberOfAllocationsFound, requiredAllocation); - } - } else if (logger.isDebugEnabled()) { - logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource()); - } - continue; - } - - Set throttledNodes = Sets.newHashSet(); - Set noNodes = Sets.newHashSet(); - for (DiscoveryNode discoNode : nodesWithHighestVersion) { - RoutingNode node = routingNodes.node(discoNode.id()); - if (node == null) { - continue; - } - - Decision decision = allocation.deciders().canAllocate(shard, node, allocation); - if (decision.type() == Decision.Type.THROTTLE) { - throttledNodes.add(discoNode); - } else if (decision.type() == Decision.Type.NO) { - noNodes.add(discoNode); - } else { - if (logger.isDebugEnabled()) { - logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, discoNode); - } - // we found a match - changed = true; - // make sure we create one with the version from the recovered state - routingNodes.initialize(new ShardRouting(shard, highestVersion), node.nodeId()); - unassignedIterator.remove(); - - // found a node, so no throttling, no "no", and break out of the loop - throttledNodes.clear(); - noNodes.clear(); - break; - } - } - if (throttledNodes.isEmpty()) { - // if we have a node that we "can't" allocate to, force allocation, since this is our master data! - if (!noNodes.isEmpty()) { - DiscoveryNode discoNode = noNodes.iterator().next(); - RoutingNode node = routingNodes.node(discoNode.id()); - if (logger.isDebugEnabled()) { - logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, discoNode); - } - // we found a match - changed = true; - // make sure we create one with the version from the recovered state - routingNodes.initialize(new ShardRouting(shard, highestVersion), node.nodeId()); - unassignedIterator.remove(); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, throttledNodes); - } - // we are throttling this, but we have enough to allocate to this node, ignore it for now - unassignedIterator.remove(); - routingNodes.ignoredUnassigned().add(shard); - } - } - - if (!routingNodes.hasUnassigned()) { - return changed; - } - - // Now, handle replicas, try to assign them to nodes that are similar to the one the primary was allocated on - unassignedIterator = unassigned.iterator(); - while (unassignedIterator.hasNext()) { - ShardRouting shard = unassignedIterator.next(); - if (shard.primary()) { - continue; - } - - // pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing - boolean canBeAllocatedToAtLeastOneNode = false; - for (ObjectCursor cursor : nodes.dataNodes().values()) { - RoutingNode node = routingNodes.node(cursor.value.id()); - if (node == null) { - continue; - } - // if we can't allocate it on a node, ignore it, for example, this handles - // cases for only allocating a replica after a primary - Decision decision = allocation.deciders().canAllocate(shard, node, allocation); - if (decision.type() == Decision.Type.YES) { - canBeAllocatedToAtLeastOneNode = true; - break; - } - } - - if (!canBeAllocatedToAtLeastOneNode) { - logger.trace("{}: ignoring allocation, can't be allocated on any node", shard); - unassignedIterator.remove(); - routingNodes.ignoredUnassigned().add(shard); - continue; - } - - AsyncShardFetch fetch = asyncFetchStore.get(shard.shardId()); - if (fetch == null) { - fetch = new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction); - asyncFetchStore.put(shard.shardId(), fetch); - } - AsyncShardFetch.FetchResult shardStores = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId())); - if (shardStores.hasData() == false) { - logger.trace("{}: ignoring allocation, still fetching shard stores", shard); - unassignedIterator.remove(); - routingNodes.ignoredUnassigned().add(shard); - continue; // still fetching - } - shardStores.processAllocation(allocation); - - long lastSizeMatched = 0; - DiscoveryNode lastDiscoNodeMatched = null; - RoutingNode lastNodeMatched = null; - boolean hasReplicaData = false; - IndexMetaData indexMetaData = metaData.index(shard.getIndex()); - - for (Map.Entry nodeStoreEntry : shardStores.getData().entrySet()) { - DiscoveryNode discoNode = nodeStoreEntry.getKey(); - TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData(); - logger.trace("{}: checking node [{}]", shard, discoNode); - - if (storeFilesMetaData == null) { - // already allocated on that node... - continue; - } - - RoutingNode node = routingNodes.node(discoNode.id()); - if (node == null) { - continue; - } - - // check if we can allocate on that node... - // we only check for NO, since if this node is THROTTLING and it has enough "same data" - // then we will try and assign it next time - Decision decision = allocation.deciders().canAllocate(shard, node, allocation); - if (decision.type() == Decision.Type.NO) { - continue; - } - - // if it is already allocated, we can't assign to it... - if (storeFilesMetaData.allocated()) { - continue; - } - - if (!shard.primary()) { - hasReplicaData |= storeFilesMetaData.iterator().hasNext(); - ShardRouting primaryShard = routingNodes.activePrimary(shard); - if (primaryShard != null) { - assert primaryShard.active(); - DiscoveryNode primaryNode = nodes.get(primaryShard.currentNodeId()); - if (primaryNode != null) { - TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData primaryNodeFilesStore = shardStores.getData().get(primaryNode); - if (primaryNodeFilesStore != null) { - TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryNodeStore = primaryNodeFilesStore.storeFilesMetaData(); - if (primaryNodeStore != null && primaryNodeStore.allocated()) { - long sizeMatched = 0; - - String primarySyncId = primaryNodeStore.syncId(); - String replicaSyncId = storeFilesMetaData.syncId(); - // see if we have a sync id we can make use of - if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) { - logger.trace("{}: node [{}] has same sync id {} as primary", shard, discoNode.name(), replicaSyncId); - lastNodeMatched = node; - lastSizeMatched = Long.MAX_VALUE; - lastDiscoNodeMatched = discoNode; - } else { - for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { - String metaDataFileName = storeFileMetaData.name(); - if (primaryNodeStore.fileExists(metaDataFileName) && primaryNodeStore.file(metaDataFileName).isSame(storeFileMetaData)) { - sizeMatched += storeFileMetaData.length(); - } - } - logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data", - shard, discoNode.name(), new ByteSizeValue(sizeMatched), sizeMatched); - if (sizeMatched > lastSizeMatched) { - lastSizeMatched = sizeMatched; - lastDiscoNodeMatched = discoNode; - lastNodeMatched = node; - } - } - } - } - } - } - } - } - - if (lastNodeMatched != null) { - // we only check on THROTTLE since we checked before before on NO - Decision decision = allocation.deciders().canAllocate(shard, lastNodeMatched, allocation); - if (decision.type() == Decision.Type.THROTTLE) { - if (logger.isDebugEnabled()) { - logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); - } - // we are throttling this, but we have enough to allocate to this node, ignore it for now - unassignedIterator.remove(); - routingNodes.ignoredUnassigned().add(shard); - } else { - if (logger.isDebugEnabled()) { - logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); - } - // we found a match - changed = true; - routingNodes.initialize(shard, lastNodeMatched.nodeId()); - unassignedIterator.remove(); - } - } else if (hasReplicaData == false) { - // if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation - // of the replica shard needs to be delayed, and if so, add it to the ignore unassigned list - // note: we only care about replica in delayed allocation, since if we have an unassigned primary it - // will anyhow wait to find an existing copy of the shard to be allocated - // note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService - long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings()); - if (delay > 0) { - logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueMillis(delay)); - /** - * mark it as changed, since we want to kick a publishing to schedule future allocation, - * see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}). - */ - changed = true; - unassignedIterator.remove(); - routingNodes.ignoredUnassigned().add(shard); - } - } - } + changed |= primaryShardAllocator.allocateUnassigned(allocation); + changed |= replicaShardAllocator.allocateUnassigned(allocation); return changed; } @@ -558,4 +143,54 @@ public class GatewayAllocator extends AbstractComponent { } } + class InternalPrimaryShardAllocator extends PrimaryShardAllocator { + + private final TransportNodesListGatewayStartedShards startedAction; + + public InternalPrimaryShardAllocator(Settings settings, TransportNodesListGatewayStartedShards startedAction) { + super(settings); + this.startedAction = startedAction; + } + + @Override + protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { + AsyncShardFetch fetch = asyncFetchStarted.get(shard.shardId()); + if (fetch == null) { + fetch = new InternalAsyncFetch<>(logger, "shard_started", shard.shardId(), startedAction); + asyncFetchStarted.put(shard.shardId(), fetch); + } + AsyncShardFetch.FetchResult shardState = + fetch.fetchData(allocation.nodes(), allocation.metaData(), allocation.getIgnoreNodes(shard.shardId())); + + if (shardState.hasData() == true) { + shardState.processAllocation(allocation); + } + return shardState; + } + } + + class InternalReplicaShardAllocator extends ReplicaShardAllocator { + + private final TransportNodesListShardStoreMetaData storeAction; + + public InternalReplicaShardAllocator(Settings settings, TransportNodesListShardStoreMetaData storeAction) { + super(settings); + this.storeAction = storeAction; + } + + @Override + protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { + AsyncShardFetch fetch = asyncFetchStore.get(shard.shardId()); + if (fetch == null) { + fetch = new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction); + asyncFetchStore.put(shard.shardId(), fetch); + } + AsyncShardFetch.FetchResult shardStores = + fetch.fetchData(allocation.nodes(), allocation.metaData(), allocation.getIgnoreNodes(shard.shardId())); + if (shardStores.hasData() == true) { + shardStores.processAllocation(allocation); + } + return shardStores; + } + } } diff --git a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java new file mode 100644 index 00000000000..55f9db47985 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java @@ -0,0 +1,298 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.lucene.util.CollectionUtil; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.settings.IndexSettings; + +import java.util.*; + +/** + * The primary shard allocator allocates primary shard that were not created as + * a result of an API to a node that held them last to be recovered. + */ +public abstract class PrimaryShardAllocator extends AbstractComponent { + + public static final String INDEX_RECOVERY_INITIAL_SHARDS = "index.recovery.initial_shards"; + + private final String initialShards; + + public PrimaryShardAllocator(Settings settings) { + super(settings); + this.initialShards = settings.get("gateway.initial_shards", settings.get("gateway.local.initial_shards", "quorum")); + logger.debug("using initial_shards [{}]", initialShards); + } + + public boolean allocateUnassigned(RoutingAllocation allocation) { + boolean changed = false; + final RoutingNodes routingNodes = allocation.routingNodes(); + final MetaData metaData = routingNodes.metaData(); + + final Iterator unassignedIterator = routingNodes.unassigned().iterator(); + while (unassignedIterator.hasNext()) { + ShardRouting shard = unassignedIterator.next(); + + if (needToFindPrimaryCopy(shard, routingNodes.routingTable().index(shard.index()).shard(shard.id())) == false) { + continue; + } + + AsyncShardFetch.FetchResult shardState = fetchData(shard, allocation); + if (shardState.hasData() == false) { + logger.trace("{}: ignoring allocation, still fetching shard started state", shard); + unassignedIterator.remove(); + routingNodes.ignoredUnassigned().add(shard); + continue; + } + + IndexMetaData indexMetaData = metaData.index(shard.getIndex()); + + NodesAndVersions nodesAndVersions = buildNodesAndVersions(shard, recoverOnAnyNode(indexMetaData.settings()), allocation.getIgnoreNodes(shard.shardId()), shardState); + logger.debug("[{}][{}] found {} allocations of {}, highest version: [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound, shard, nodesAndVersions.highestVersion); + + if (isEnoughAllocationsFound(shard, indexMetaData, nodesAndVersions) == false) { + // if we are restoring this shard we still can allocate + if (shard.restoreSource() == null) { + // we can't really allocate, so ignore it and continue + unassignedIterator.remove(); + routingNodes.ignoredUnassigned().add(shard); + logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound); + } else { + logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource()); + } + continue; + } + + NodesToAllocate nodesToAllocate = buildNodesToAllocate(shard, allocation, nodesAndVersions); + if (nodesToAllocate.yesNodes.isEmpty() == false) { + DiscoveryNode node = nodesToAllocate.yesNodes.get(0); + logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node); + changed = true; + routingNodes.initialize(new ShardRouting(shard, nodesAndVersions.highestVersion), node.id()); + unassignedIterator.remove(); + } else if (nodesToAllocate.throttleNodes.isEmpty() == true && nodesToAllocate.noNodes.isEmpty() == false) { + DiscoveryNode node = nodesToAllocate.noNodes.get(0); + logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node); + changed = true; + routingNodes.initialize(new ShardRouting(shard, nodesAndVersions.highestVersion), node.id()); + unassignedIterator.remove(); + } else { + // we are throttling this, but we have enough to allocate to this node, ignore it for now + logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodes); + unassignedIterator.remove(); + routingNodes.ignoredUnassigned().add(shard); + } + } + return changed; + } + + /** + * Does the shard need to find a primary copy? + */ + boolean needToFindPrimaryCopy(ShardRouting shard, IndexShardRoutingTable indexShardRoutingTable) { + if (shard.primary() == false) { + return false; + } + + // this is an API allocation, ignore since we know there is no data... + if (indexShardRoutingTable.primaryAllocatedPostApi() == false) { + return false; + } + + return true; + } + + private boolean isEnoughAllocationsFound(ShardRouting shard, IndexMetaData indexMetaData, NodesAndVersions nodesAndVersions) { + // check if the counts meets the minimum set + int requiredAllocation = 1; + // if we restore from a repository one copy is more then enough + if (shard.restoreSource() == null) { + try { + String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards)); + if ("quorum".equals(initialShards)) { + if (indexMetaData.numberOfReplicas() > 1) { + requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1; + } + } else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) { + if (indexMetaData.numberOfReplicas() > 2) { + requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2); + } + } else if ("one".equals(initialShards)) { + requiredAllocation = 1; + } else if ("full".equals(initialShards) || "all".equals(initialShards)) { + requiredAllocation = indexMetaData.numberOfReplicas() + 1; + } else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) { + if (indexMetaData.numberOfReplicas() > 1) { + requiredAllocation = indexMetaData.numberOfReplicas(); + } + } else { + requiredAllocation = Integer.parseInt(initialShards); + } + } catch (Exception e) { + logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard); + } + } + + return nodesAndVersions.allocationsFound >= requiredAllocation; + } + + /** + * Based on the nodes and versions, build the list of yes/no/throttle nodes that the shard applies to. + */ + private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, NodesAndVersions nodesAndVersions) { + List yesNodes = new ArrayList<>(); + List throttledNodes = new ArrayList<>(); + List noNodes = new ArrayList<>(); + for (DiscoveryNode discoNode : nodesAndVersions.nodes) { + RoutingNode node = allocation.routingNodes().node(discoNode.id()); + if (node == null) { + continue; + } + + Decision decision = allocation.deciders().canAllocate(shard, node, allocation); + if (decision.type() == Decision.Type.THROTTLE) { + throttledNodes.add(discoNode); + } else if (decision.type() == Decision.Type.NO) { + noNodes.add(discoNode); + } else { + yesNodes.add(discoNode); + } + } + return new NodesToAllocate(Collections.unmodifiableList(yesNodes), Collections.unmodifiableList(throttledNodes), Collections.unmodifiableList(noNodes)); + } + + /** + * Builds a list of nodes and version + */ + private NodesAndVersions buildNodesAndVersions(ShardRouting shard, boolean recoveryOnAnyNode, Set ignoreNodes, + AsyncShardFetch.FetchResult shardState) { + final Map nodesWithVersion = Maps.newHashMap(); + int numberOfAllocationsFound = 0; + long highestVersion = -1; + for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) { + long version = nodeShardState.version(); + DiscoveryNode node = nodeShardState.getNode(); + + if (ignoreNodes.contains(node.id())) { + continue; + } + + // -1 version means it does not exists, which is what the API returns, and what we expect to + if (nodeShardState.storeException() == null) { + logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version); + } else { + // when there is an store exception, we disregard the reported version and assign it as -1 (same as shard does not exist) + logger.trace("[{}] on node [{}] has version [{}] but the store can not be opened, treating as version -1", nodeShardState.storeException(), shard, nodeShardState.getNode(), version); + version = -1; + } + + if (recoveryOnAnyNode) { + numberOfAllocationsFound++; + if (version > highestVersion) { + highestVersion = version; + } + // We always put the node without clearing the map + nodesWithVersion.put(node, version); + } else if (version != -1) { + numberOfAllocationsFound++; + // If we've found a new "best" candidate, clear the + // current candidates and add it + if (version > highestVersion) { + highestVersion = version; + nodesWithVersion.clear(); + nodesWithVersion.put(node, version); + } else if (version == highestVersion) { + // If the candidate is the same, add it to the + // list, but keep the current candidate + nodesWithVersion.put(node, version); + } + } + } + // Now that we have a map of nodes to versions along with the + // number of allocations found (and not ignored), we need to sort + // it so the node with the highest version is at the beginning + List nodesWithHighestVersion = Lists.newArrayList(); + nodesWithHighestVersion.addAll(nodesWithVersion.keySet()); + CollectionUtil.timSort(nodesWithHighestVersion, new Comparator() { + @Override + public int compare(DiscoveryNode o1, DiscoveryNode o2) { + return Long.compare(nodesWithVersion.get(o2), nodesWithVersion.get(o1)); + } + }); + + if (logger.isTraceEnabled()) { + StringBuilder sb = new StringBuilder("["); + for (DiscoveryNode n : nodesWithVersion.keySet()) { + sb.append("[").append(n.getName()).append("]").append(" -> ").append(nodesWithVersion.get(n)).append(", "); + } + sb.append("]"); + logger.trace("{} candidates for allocation: {}", shard, sb.toString()); + } + + return new NodesAndVersions(Collections.unmodifiableList(nodesWithHighestVersion), numberOfAllocationsFound, highestVersion); + } + + /** + * Return {@code true} if the index is configured to allow shards to be + * recovered on any node + */ + private boolean recoverOnAnyNode(@IndexSettings Settings idxSettings) { + return IndexMetaData.isOnSharedFilesystem(idxSettings) && + idxSettings.getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false); + } + + protected abstract AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); + + static class NodesAndVersions { + public final List nodes; + public final int allocationsFound; + public final long highestVersion; + + public NodesAndVersions(List nodes, int allocationsFound, long highestVersion) { + this.nodes = nodes; + this.allocationsFound = allocationsFound; + this.highestVersion = highestVersion; + } + } + + static class NodesToAllocate { + final List yesNodes; + final List throttleNodes; + final List noNodes; + + public NodesToAllocate(List yesNodes, List throttleNodes, List noNodes) { + this.yesNodes = yesNodes; + this.throttleNodes = throttleNodes; + this.noNodes = noNodes; + } + } +} diff --git a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java new file mode 100644 index 00000000000..06469c05de4 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -0,0 +1,212 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; + +import java.util.Iterator; +import java.util.Map; + +/** + */ +public abstract class ReplicaShardAllocator extends AbstractComponent { + + public ReplicaShardAllocator(Settings settings) { + super(settings); + } + + public boolean allocateUnassigned(RoutingAllocation allocation) { + boolean changed = false; + final RoutingNodes routingNodes = allocation.routingNodes(); + final MetaData metaData = routingNodes.metaData(); + + final Iterator unassignedIterator = routingNodes.unassigned().iterator(); + while (unassignedIterator.hasNext()) { + ShardRouting shard = unassignedIterator.next(); + if (shard.primary()) { + continue; + } + + // pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing + boolean canBeAllocatedToAtLeastOneNode = false; + for (ObjectCursor cursor : allocation.nodes().dataNodes().values()) { + RoutingNode node = routingNodes.node(cursor.value.id()); + if (node == null) { + continue; + } + // if we can't allocate it on a node, ignore it, for example, this handles + // cases for only allocating a replica after a primary + Decision decision = allocation.deciders().canAllocate(shard, node, allocation); + if (decision.type() == Decision.Type.YES) { + canBeAllocatedToAtLeastOneNode = true; + break; + } + } + + if (!canBeAllocatedToAtLeastOneNode) { + logger.trace("{}: ignoring allocation, can't be allocated on any node", shard); + unassignedIterator.remove(); + routingNodes.ignoredUnassigned().add(shard); + continue; + } + + AsyncShardFetch.FetchResult shardStores = fetchData(shard, allocation); + if (shardStores.hasData() == false) { + logger.trace("{}: ignoring allocation, still fetching shard stores", shard); + unassignedIterator.remove(); + routingNodes.ignoredUnassigned().add(shard); + continue; // still fetching + } + + long lastSizeMatched = 0; + DiscoveryNode lastDiscoNodeMatched = null; + RoutingNode lastNodeMatched = null; + boolean hasReplicaData = false; + IndexMetaData indexMetaData = metaData.index(shard.getIndex()); + + for (Map.Entry nodeStoreEntry : shardStores.getData().entrySet()) { + DiscoveryNode discoNode = nodeStoreEntry.getKey(); + TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData(); + logger.trace("{}: checking node [{}]", shard, discoNode); + + if (storeFilesMetaData == null) { + // already allocated on that node... + continue; + } + + RoutingNode node = routingNodes.node(discoNode.id()); + if (node == null) { + continue; + } + + // check if we can allocate on that node... + // we only check for NO, since if this node is THROTTLING and it has enough "same data" + // then we will try and assign it next time + Decision decision = allocation.deciders().canAllocate(shard, node, allocation); + if (decision.type() == Decision.Type.NO) { + continue; + } + + // if it is already allocated, we can't assign to it... + if (storeFilesMetaData.allocated()) { + continue; + } + + if (!shard.primary()) { + hasReplicaData |= storeFilesMetaData.iterator().hasNext(); + ShardRouting primaryShard = routingNodes.activePrimary(shard); + if (primaryShard != null) { + assert primaryShard.active(); + DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); + if (primaryNode != null) { + TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData primaryNodeFilesStore = shardStores.getData().get(primaryNode); + if (primaryNodeFilesStore != null) { + TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryNodeStore = primaryNodeFilesStore.storeFilesMetaData(); + if (primaryNodeStore != null && primaryNodeStore.allocated()) { + long sizeMatched = 0; + + String primarySyncId = primaryNodeStore.syncId(); + String replicaSyncId = storeFilesMetaData.syncId(); + // see if we have a sync id we can make use of + if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) { + logger.trace("{}: node [{}] has same sync id {} as primary", shard, discoNode.name(), replicaSyncId); + lastNodeMatched = node; + lastSizeMatched = Long.MAX_VALUE; + lastDiscoNodeMatched = discoNode; + } else { + for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { + String metaDataFileName = storeFileMetaData.name(); + if (primaryNodeStore.fileExists(metaDataFileName) && primaryNodeStore.file(metaDataFileName).isSame(storeFileMetaData)) { + sizeMatched += storeFileMetaData.length(); + } + } + logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data", + shard, discoNode.name(), new ByteSizeValue(sizeMatched), sizeMatched); + if (sizeMatched > lastSizeMatched) { + lastSizeMatched = sizeMatched; + lastDiscoNodeMatched = discoNode; + lastNodeMatched = node; + } + } + } + } + } + } + } + } + + if (lastNodeMatched != null) { + // we only check on THROTTLE since we checked before before on NO + Decision decision = allocation.deciders().canAllocate(shard, lastNodeMatched, allocation); + if (decision.type() == Decision.Type.THROTTLE) { + if (logger.isDebugEnabled()) { + logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); + } + // we are throttling this, but we have enough to allocate to this node, ignore it for now + unassignedIterator.remove(); + routingNodes.ignoredUnassigned().add(shard); + } else { + if (logger.isDebugEnabled()) { + logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); + } + // we found a match + changed = true; + routingNodes.initialize(shard, lastNodeMatched.nodeId()); + unassignedIterator.remove(); + } + } else if (hasReplicaData == false) { + // if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation + // of the replica shard needs to be delayed, and if so, add it to the ignore unassigned list + // note: we only care about replica in delayed allocation, since if we have an unassigned primary it + // will anyhow wait to find an existing copy of the shard to be allocated + // note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService + long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings()); + if (delay > 0) { + logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueMillis(delay)); + /** + * mark it as changed, since we want to kick a publishing to schedule future allocation, + * see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}). + */ + changed = true; + unassignedIterator.remove(); + routingNodes.ignoredUnassigned().add(shard); + } + } + } + return changed; + } + + protected abstract AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); +} diff --git a/core/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/core/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java index 77a71f908a1..804e64568da 100644 --- a/core/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java +++ b/core/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.settings; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.gateway.PrimaryShardAllocator; import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider; @@ -74,7 +75,7 @@ public class IndexDynamicSettingsModule extends AbstractModule { indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_PRIORITY, Validator.NON_NEGATIVE_INTEGER); indexDynamicSettings.addDynamicSetting(IndicesTTLService.INDEX_TTL_DISABLE_PURGE); indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_REFRESH_INTERVAL, Validator.TIME); - indexDynamicSettings.addDynamicSetting(GatewayAllocator.INDEX_RECOVERY_INITIAL_SHARDS); + indexDynamicSettings.addDynamicSetting(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS); indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN); indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME); indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN); diff --git a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java new file mode 100644 index 00000000000..8796c91340e --- /dev/null +++ b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java @@ -0,0 +1,319 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway; + +import org.apache.lucene.index.CorruptIndexException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.SnapshotId; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ElasticsearchAllocationTestCase; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; + +/** + */ +public class PrimaryShardAllocatorTests extends ElasticsearchAllocationTestCase { + + private final ShardId shardId = new ShardId("test", 0); + private final DiscoveryNode node1 = newNode("node1"); + private final DiscoveryNode node2 = newNode("node2"); + private final DiscoveryNode node3 = newNode("node3"); + private TestAllocator testAllocator; + + @Before + public void buildTestAllocator() { + this.testAllocator = new TestAllocator(); + } + + /** + * Verifies that the canProcess method of primary allocation behaves correctly + * and processes only the applicable shard. + */ + @Test + public void testNoProcessReplica() { + ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, false, ShardRoutingState.UNASSIGNED, 0, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null)); + assertThat(testAllocator.needToFindPrimaryCopy(shard, null), equalTo(false)); + } + + /** + * Tests that when async fetch returns that there is no data, the shard will not be allocated. + */ + @Test + public void testNoAsyncFetchData() { + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders()); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1)); + assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId)); + } + + /** + * Tests when the node returns that no data was found for it (-1), it will be moved to ignore unassigned. + */ + @Test + public void testNoAllocationFound() { + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders()); + testAllocator.addData(node1, -1); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1)); + assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId)); + } + + /** + * Tests when the node returns that no data was found for it (-1), it will be moved to ignore unassigned. + */ + @Test + public void testStoreException() { + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders()); + testAllocator.addData(node1, 3, new CorruptIndexException("test", "test")); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1)); + assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId)); + } + + /** + * Tests that when there is a node to allocate the shard to, it will be allocated to it. + */ + @Test + public void testFoundAllocationAndAllocating() { + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders()); + testAllocator.addData(node1, 10); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(true)); + assertThat(allocation.routingNodes().ignoredUnassigned().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id())); + } + + /** + * Tests that when there is a node to allocate to, but it is throttling (and it is the only one), + * it will be moved to ignore unassigned until it can be allocated to. + */ + @Test + public void testFoundAllocationButThrottlingDecider() { + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders()); + testAllocator.addData(node1, 10); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1)); + assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId)); + } + + /** + * Tests that when there is a node to be allocated to, but it the decider said "no", we still + * force the allocation to it. + */ + @Test + public void testFoundAllocationButNoDecider() { + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders()); + testAllocator.addData(node1, 10); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(true)); + assertThat(allocation.routingNodes().ignoredUnassigned().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id())); + } + + /** + * Tests that the highest version node is chosed for allocation. + */ + @Test + public void testAllocateToTheHighestVersion() { + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders()); + testAllocator.addData(node1, 10).addData(node2, 12); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(true)); + assertThat(allocation.routingNodes().ignoredUnassigned().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id())); + } + + /** + * Tests that when restoring from snapshot, even if we didn't find any node to allocate on, the shard + * will remain in the unassigned list to be allocated later. + */ + @Test + public void testRestoreIgnoresNoNodesToAllocate() { + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsRestore(metaData.index(shardId.getIndex()), new RestoreSource(new SnapshotId("test", "test"), Version.CURRENT, shardId.getIndex())) + .build(); + ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); + RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null); + + testAllocator.addData(node1, -1).addData(node2, -1); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().ignoredUnassigned().isEmpty(), equalTo(true)); + } + + /** + * Tests that only when enough copies of the shard exists we are going to allocate it. This test + * verifies that with same version (1), and quorum allocation. + */ + @Test + public void testEnoughCopiesFoundForAllocation() { + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsRecovery(metaData.index(shardId.getIndex())) + .build(); + ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); + + RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1)); + assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas + + testAllocator.addData(node1, 1); + allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null); + changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1)); + assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas + + testAllocator.addData(node2, 1); + allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null); + changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(true)); + assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(0)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), anyOf(equalTo(node2.id()), equalTo(node1.id()))); + } + + /** + * Tests that only when enough copies of the shard exists we are going to allocate it. This test + * verifies that even with different version, we treat different versions as a copy, and count them. + */ + @Test + public void testEnoughCopiesFoundForAllocationWithDifferentVersion() { + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsRecovery(metaData.index(shardId.getIndex())) + .build(); + ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); + + RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1)); + assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas + + testAllocator.addData(node1, 1); + allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null); + changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1)); + assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas + + testAllocator.addData(node2, 2); + allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null); + changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(true)); + assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(0)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id())); + } + + private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders) { + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsRecovery(metaData.index(shardId.getIndex())) + .build(); + ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); + return new RoutingAllocation(deciders, state.routingNodes(), state.nodes(), null); + } + + class TestAllocator extends PrimaryShardAllocator { + + private Map data; + + public TestAllocator() { + super(Settings.EMPTY); + } + + public TestAllocator clear() { + data = null; + return this; + } + + public TestAllocator addData(DiscoveryNode node, long version) { + return addData(node, version, null); + } + + public TestAllocator addData(DiscoveryNode node, long version, @Nullable Throwable storeException) { + if (data == null) { + data = new HashMap<>(); + } + data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, version, storeException)); + return this; + } + + @Override + protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { + return new AsyncShardFetch.FetchResult<>(shardId, data, Collections.emptySet(), Collections.emptySet()); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java index 93722be9a73..ff19add418d 100644 --- a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java +++ b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java @@ -51,7 +51,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.gateway.GatewayAllocator; +import org.elasticsearch.gateway.PrimaryShardAllocator; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.*; import org.elasticsearch.index.translog.TranslogService; @@ -514,7 +514,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest { internalCluster().ensureAtLeastNumDataNodes(2); assertAcked(prepareCreate("test").setSettings(Settings.builder() - .put(GatewayAllocator.INDEX_RECOVERY_INITIAL_SHARDS, "one") + .put(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS, "one") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1) .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose diff --git a/core/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java b/core/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java index 25f13c8b573..764a33239fa 100644 --- a/core/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java +++ b/core/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java @@ -23,13 +23,16 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecidersModule; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.TransportAddress; @@ -121,4 +124,41 @@ public abstract class ElasticsearchAllocationTestCase extends ElasticsearchTestC RoutingTable routingTable = strategy.applyStartedShards(clusterState, newArrayList(initializingShards.get(randomInt(initializingShards.size() - 1)))).routingTable(); return ClusterState.builder(clusterState).routingTable(routingTable).build(); } + + public static AllocationDeciders yesAllocationDeciders() { + return new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {new TestAllocateDecision(Decision.YES)}); + } + + public static AllocationDeciders noAllocationDeciders() { + return new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {new TestAllocateDecision(Decision.NO)}); + } + + public static AllocationDeciders throttleAllocationDeciders() { + return new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {new TestAllocateDecision(Decision.THROTTLE)}); + } + + static class TestAllocateDecision extends AllocationDecider { + + private final Decision decision; + + public TestAllocateDecision(Decision decision) { + super(Settings.EMPTY); + this.decision = decision; + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return decision; + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) { + return decision; + } + + @Override + public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) { + return decision; + } + } }