Initial Refactor Gateway Allocator

Break it into more manageable code by separating allocation primaries and allocating replicas. Start adding basic unit tests for primary shard allocator.
This commit is contained in:
Shay Banon 2015-07-20 00:38:19 +02:00
parent 64750fa8bb
commit ca3e0c6d49
7 changed files with 938 additions and 433 deletions

View File

@ -19,41 +19,28 @@
package org.elasticsearch.gateway; package org.elasticsearch.gateway;
import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import java.util.*;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
/** /**
@ -61,26 +48,19 @@ import java.util.concurrent.ConcurrentMap;
*/ */
public class GatewayAllocator extends AbstractComponent { public class GatewayAllocator extends AbstractComponent {
public static final String INDEX_RECOVERY_INITIAL_SHARDS = "index.recovery.initial_shards";
private final String initialShards;
private final TransportNodesListGatewayStartedShards startedAction;
private final TransportNodesListShardStoreMetaData storeAction;
private RoutingService routingService; private RoutingService routingService;
private final PrimaryShardAllocator primaryShardAllocator;
private final ReplicaShardAllocator replicaShardAllocator;
private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>> asyncFetchStarted = ConcurrentCollections.newConcurrentMap(); private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>> asyncFetchStarted = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>> asyncFetchStore = ConcurrentCollections.newConcurrentMap(); private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>> asyncFetchStore = ConcurrentCollections.newConcurrentMap();
@Inject @Inject
public GatewayAllocator(Settings settings, TransportNodesListGatewayStartedShards startedAction, TransportNodesListShardStoreMetaData storeAction) { public GatewayAllocator(Settings settings, final TransportNodesListGatewayStartedShards startedAction, final TransportNodesListShardStoreMetaData storeAction) {
super(settings); super(settings);
this.startedAction = startedAction; this.primaryShardAllocator = new InternalPrimaryShardAllocator(settings, startedAction);
this.storeAction = storeAction; this.replicaShardAllocator = new InternalReplicaShardAllocator(settings, storeAction);
this.initialShards = settings.get("gateway.initial_shards", settings.get("gateway.local.initial_shards", "quorum"));
logger.debug("using initial_shards [{}]", initialShards);
} }
public void setReallocation(final ClusterService clusterService, final RoutingService routingService) { public void setReallocation(final ClusterService clusterService, final RoutingService routingService) {
@ -132,416 +112,21 @@ public class GatewayAllocator extends AbstractComponent {
} }
} }
/** public boolean allocateUnassigned(final RoutingAllocation allocation) {
* Return {@code true} if the index is configured to allow shards to be
* recovered on any node
*/
private boolean recoverOnAnyNode(@IndexSettings Settings idxSettings) {
return IndexMetaData.isOnSharedFilesystem(idxSettings) &&
idxSettings.getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false);
}
public boolean allocateUnassigned(RoutingAllocation allocation) {
boolean changed = false; boolean changed = false;
DiscoveryNodes nodes = allocation.nodes();
RoutingNodes routingNodes = allocation.routingNodes();
// First, handle primaries, they must find a place to be allocated on here RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
final MetaData metaData = routingNodes.metaData();
RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
unassigned.sort(new PriorityComparator() { unassigned.sort(new PriorityComparator() {
@Override @Override
protected Settings getIndexSettings(String index) { protected Settings getIndexSettings(String index) {
IndexMetaData indexMetaData = metaData.index(index); IndexMetaData indexMetaData = allocation.metaData().index(index);
return indexMetaData.getSettings(); return indexMetaData.getSettings();
} }
}); // sort for priority ordering }); // sort for priority ordering
Iterator<ShardRouting> unassignedIterator = unassigned.iterator();
while (unassignedIterator.hasNext()) {
ShardRouting shard = unassignedIterator.next();
if (!shard.primary()) { changed |= primaryShardAllocator.allocateUnassigned(allocation);
continue; changed |= replicaShardAllocator.allocateUnassigned(allocation);
}
// this is an API allocation, ignore since we know there is no data...
if (!routingNodes.routingTable().index(shard.index()).shard(shard.id()).primaryAllocatedPostApi()) {
continue;
}
AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetch = asyncFetchStarted.get(shard.shardId());
if (fetch == null) {
fetch = new InternalAsyncFetch<>(logger, "shard_started", shard.shardId(), startedAction);
asyncFetchStarted.put(shard.shardId(), fetch);
}
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId()));
if (shardState.hasData() == false) {
logger.trace("{}: ignoring allocation, still fetching shard started state", shard);
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
continue;
}
shardState.processAllocation(allocation);
IndexMetaData indexMetaData = metaData.index(shard.getIndex());
/**
* Build a map of DiscoveryNodes to shard state number for the given shard.
* A state of -1 means the shard does not exist on the node, where any
* shard state >= 0 is the state version of the shard on that node's disk.
*
* A shard on shared storage will return at least shard state 0 for all
* nodes, indicating that the shard can be allocated to any node.
*/
ObjectLongHashMap<DiscoveryNode> nodesState = new ObjectLongHashMap<>();
for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
long version = nodeShardState.version();
// -1 version means it does not exists, which is what the API returns, and what we expect to
if (nodeShardState.storeException() == null) {
logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version);
nodesState.put(nodeShardState.getNode(), version);
} else {
// when there is an store exception, we disregard the reported version and assign it as -1 (same as shard does not exist)
logger.trace("[{}] on node [{}] has version [{}] but the store can not be opened, treating as version -1", nodeShardState.storeException(), shard, nodeShardState.getNode(), version);
nodesState.put(nodeShardState.getNode(), -1);
}
}
int numberOfAllocationsFound = 0;
long highestVersion = -1;
final Map<DiscoveryNode, Long> nodesWithVersion = Maps.newHashMap();
assert !nodesState.containsKey(null);
final Object[] keys = nodesState.keys;
final long[] values = nodesState.values;
Settings idxSettings = indexMetaData.settings();
for (int i = 0; i < keys.length; i++) {
if (keys[i] == null) {
continue;
}
DiscoveryNode node = (DiscoveryNode) keys[i];
long version = values[i];
// since we don't check in NO allocation, we need to double check here
if (allocation.shouldIgnoreShardForNode(shard.shardId(), node.id())) {
continue;
}
if (recoverOnAnyNode(idxSettings)) {
numberOfAllocationsFound++;
if (version > highestVersion) {
highestVersion = version;
}
// We always put the node without clearing the map
nodesWithVersion.put(node, version);
} else if (version != -1) {
numberOfAllocationsFound++;
// If we've found a new "best" candidate, clear the
// current candidates and add it
if (version > highestVersion) {
highestVersion = version;
nodesWithVersion.clear();
nodesWithVersion.put(node, version);
} else if (version == highestVersion) {
// If the candidate is the same, add it to the
// list, but keep the current candidate
nodesWithVersion.put(node, version);
}
}
}
// Now that we have a map of nodes to versions along with the
// number of allocations found (and not ignored), we need to sort
// it so the node with the highest version is at the beginning
List<DiscoveryNode> nodesWithHighestVersion = Lists.newArrayList();
nodesWithHighestVersion.addAll(nodesWithVersion.keySet());
CollectionUtil.timSort(nodesWithHighestVersion, new Comparator<DiscoveryNode>() {
@Override
public int compare(DiscoveryNode o1, DiscoveryNode o2) {
return Long.compare(nodesWithVersion.get(o2), nodesWithVersion.get(o1));
}
});
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] found {} allocations of {}, highest version: [{}]",
shard.index(), shard.id(), numberOfAllocationsFound, shard, highestVersion);
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("[");
for (DiscoveryNode n : nodesWithHighestVersion) {
sb.append("[");
sb.append(n.getName());
sb.append("]");
sb.append(" -> ");
sb.append(nodesWithVersion.get(n));
sb.append(", ");
}
sb.append("]");
logger.trace("{} candidates for allocation: {}", shard, sb.toString());
}
// check if the counts meets the minimum set
int requiredAllocation = 1;
// if we restore from a repository one copy is more then enough
if (shard.restoreSource() == null) {
try {
String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards));
if ("quorum".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1;
}
} else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 2) {
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2);
}
} else if ("one".equals(initialShards)) {
requiredAllocation = 1;
} else if ("full".equals(initialShards) || "all".equals(initialShards)) {
requiredAllocation = indexMetaData.numberOfReplicas() + 1;
} else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = indexMetaData.numberOfReplicas();
}
} else {
requiredAllocation = Integer.parseInt(initialShards);
}
} catch (Exception e) {
logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard);
}
}
// not enough found for this shard, continue...
if (numberOfAllocationsFound < requiredAllocation) {
// if we are restoring this shard we still can allocate
if (shard.restoreSource() == null) {
// we can't really allocate, so ignore it and continue
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}], required_number [{}]", shard.index(), shard.id(), numberOfAllocationsFound, requiredAllocation);
}
} else if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource());
}
continue;
}
Set<DiscoveryNode> throttledNodes = Sets.newHashSet();
Set<DiscoveryNode> noNodes = Sets.newHashSet();
for (DiscoveryNode discoNode : nodesWithHighestVersion) {
RoutingNode node = routingNodes.node(discoNode.id());
if (node == null) {
continue;
}
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
throttledNodes.add(discoNode);
} else if (decision.type() == Decision.Type.NO) {
noNodes.add(discoNode);
} else {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, discoNode);
}
// we found a match
changed = true;
// make sure we create one with the version from the recovered state
routingNodes.initialize(new ShardRouting(shard, highestVersion), node.nodeId());
unassignedIterator.remove();
// found a node, so no throttling, no "no", and break out of the loop
throttledNodes.clear();
noNodes.clear();
break;
}
}
if (throttledNodes.isEmpty()) {
// if we have a node that we "can't" allocate to, force allocation, since this is our master data!
if (!noNodes.isEmpty()) {
DiscoveryNode discoNode = noNodes.iterator().next();
RoutingNode node = routingNodes.node(discoNode.id());
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, discoNode);
}
// we found a match
changed = true;
// make sure we create one with the version from the recovered state
routingNodes.initialize(new ShardRouting(shard, highestVersion), node.nodeId());
unassignedIterator.remove();
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, throttledNodes);
}
// we are throttling this, but we have enough to allocate to this node, ignore it for now
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
}
}
if (!routingNodes.hasUnassigned()) {
return changed;
}
// Now, handle replicas, try to assign them to nodes that are similar to the one the primary was allocated on
unassignedIterator = unassigned.iterator();
while (unassignedIterator.hasNext()) {
ShardRouting shard = unassignedIterator.next();
if (shard.primary()) {
continue;
}
// pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing
boolean canBeAllocatedToAtLeastOneNode = false;
for (ObjectCursor<DiscoveryNode> cursor : nodes.dataNodes().values()) {
RoutingNode node = routingNodes.node(cursor.value.id());
if (node == null) {
continue;
}
// if we can't allocate it on a node, ignore it, for example, this handles
// cases for only allocating a replica after a primary
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.YES) {
canBeAllocatedToAtLeastOneNode = true;
break;
}
}
if (!canBeAllocatedToAtLeastOneNode) {
logger.trace("{}: ignoring allocation, can't be allocated on any node", shard);
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
continue;
}
AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetch = asyncFetchStore.get(shard.shardId());
if (fetch == null) {
fetch = new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction);
asyncFetchStore.put(shard.shardId(), fetch);
}
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId()));
if (shardStores.hasData() == false) {
logger.trace("{}: ignoring allocation, still fetching shard stores", shard);
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
continue; // still fetching
}
shardStores.processAllocation(allocation);
long lastSizeMatched = 0;
DiscoveryNode lastDiscoNodeMatched = null;
RoutingNode lastNodeMatched = null;
boolean hasReplicaData = false;
IndexMetaData indexMetaData = metaData.index(shard.getIndex());
for (Map.Entry<DiscoveryNode, TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> nodeStoreEntry : shardStores.getData().entrySet()) {
DiscoveryNode discoNode = nodeStoreEntry.getKey();
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData();
logger.trace("{}: checking node [{}]", shard, discoNode);
if (storeFilesMetaData == null) {
// already allocated on that node...
continue;
}
RoutingNode node = routingNodes.node(discoNode.id());
if (node == null) {
continue;
}
// check if we can allocate on that node...
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.NO) {
continue;
}
// if it is already allocated, we can't assign to it...
if (storeFilesMetaData.allocated()) {
continue;
}
if (!shard.primary()) {
hasReplicaData |= storeFilesMetaData.iterator().hasNext();
ShardRouting primaryShard = routingNodes.activePrimary(shard);
if (primaryShard != null) {
assert primaryShard.active();
DiscoveryNode primaryNode = nodes.get(primaryShard.currentNodeId());
if (primaryNode != null) {
TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData primaryNodeFilesStore = shardStores.getData().get(primaryNode);
if (primaryNodeFilesStore != null) {
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryNodeStore = primaryNodeFilesStore.storeFilesMetaData();
if (primaryNodeStore != null && primaryNodeStore.allocated()) {
long sizeMatched = 0;
String primarySyncId = primaryNodeStore.syncId();
String replicaSyncId = storeFilesMetaData.syncId();
// see if we have a sync id we can make use of
if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) {
logger.trace("{}: node [{}] has same sync id {} as primary", shard, discoNode.name(), replicaSyncId);
lastNodeMatched = node;
lastSizeMatched = Long.MAX_VALUE;
lastDiscoNodeMatched = discoNode;
} else {
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
String metaDataFileName = storeFileMetaData.name();
if (primaryNodeStore.fileExists(metaDataFileName) && primaryNodeStore.file(metaDataFileName).isSame(storeFileMetaData)) {
sizeMatched += storeFileMetaData.length();
}
}
logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data",
shard, discoNode.name(), new ByteSizeValue(sizeMatched), sizeMatched);
if (sizeMatched > lastSizeMatched) {
lastSizeMatched = sizeMatched;
lastDiscoNodeMatched = discoNode;
lastNodeMatched = node;
}
}
}
}
}
}
}
}
if (lastNodeMatched != null) {
// we only check on THROTTLE since we checked before before on NO
Decision decision = allocation.deciders().canAllocate(shard, lastNodeMatched, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
}
// we are throttling this, but we have enough to allocate to this node, ignore it for now
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
} else {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
}
// we found a match
changed = true;
routingNodes.initialize(shard, lastNodeMatched.nodeId());
unassignedIterator.remove();
}
} else if (hasReplicaData == false) {
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation
// of the replica shard needs to be delayed, and if so, add it to the ignore unassigned list
// note: we only care about replica in delayed allocation, since if we have an unassigned primary it
// will anyhow wait to find an existing copy of the shard to be allocated
// note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings());
if (delay > 0) {
logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueMillis(delay));
/**
* mark it as changed, since we want to kick a publishing to schedule future allocation,
* see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}).
*/
changed = true;
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
}
}
}
return changed; return changed;
} }
@ -558,4 +143,54 @@ public class GatewayAllocator extends AbstractComponent {
} }
} }
class InternalPrimaryShardAllocator extends PrimaryShardAllocator {
private final TransportNodesListGatewayStartedShards startedAction;
public InternalPrimaryShardAllocator(Settings settings, TransportNodesListGatewayStartedShards startedAction) {
super(settings);
this.startedAction = startedAction;
}
@Override
protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation) {
AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetch = asyncFetchStarted.get(shard.shardId());
if (fetch == null) {
fetch = new InternalAsyncFetch<>(logger, "shard_started", shard.shardId(), startedAction);
asyncFetchStarted.put(shard.shardId(), fetch);
}
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState =
fetch.fetchData(allocation.nodes(), allocation.metaData(), allocation.getIgnoreNodes(shard.shardId()));
if (shardState.hasData() == true) {
shardState.processAllocation(allocation);
}
return shardState;
}
}
class InternalReplicaShardAllocator extends ReplicaShardAllocator {
private final TransportNodesListShardStoreMetaData storeAction;
public InternalReplicaShardAllocator(Settings settings, TransportNodesListShardStoreMetaData storeAction) {
super(settings);
this.storeAction = storeAction;
}
@Override
protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation) {
AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetch = asyncFetchStore.get(shard.shardId());
if (fetch == null) {
fetch = new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction);
asyncFetchStore.put(shard.shardId(), fetch);
}
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores =
fetch.fetchData(allocation.nodes(), allocation.metaData(), allocation.getIgnoreNodes(shard.shardId()));
if (shardStores.hasData() == true) {
shardStores.processAllocation(allocation);
}
return shardStores;
}
}
} }

View File

@ -0,0 +1,298 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import java.util.*;
/**
* The primary shard allocator allocates primary shard that were not created as
* a result of an API to a node that held them last to be recovered.
*/
public abstract class PrimaryShardAllocator extends AbstractComponent {
public static final String INDEX_RECOVERY_INITIAL_SHARDS = "index.recovery.initial_shards";
private final String initialShards;
public PrimaryShardAllocator(Settings settings) {
super(settings);
this.initialShards = settings.get("gateway.initial_shards", settings.get("gateway.local.initial_shards", "quorum"));
logger.debug("using initial_shards [{}]", initialShards);
}
public boolean allocateUnassigned(RoutingAllocation allocation) {
boolean changed = false;
final RoutingNodes routingNodes = allocation.routingNodes();
final MetaData metaData = routingNodes.metaData();
final Iterator<ShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) {
ShardRouting shard = unassignedIterator.next();
if (needToFindPrimaryCopy(shard, routingNodes.routingTable().index(shard.index()).shard(shard.id())) == false) {
continue;
}
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation);
if (shardState.hasData() == false) {
logger.trace("{}: ignoring allocation, still fetching shard started state", shard);
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
continue;
}
IndexMetaData indexMetaData = metaData.index(shard.getIndex());
NodesAndVersions nodesAndVersions = buildNodesAndVersions(shard, recoverOnAnyNode(indexMetaData.settings()), allocation.getIgnoreNodes(shard.shardId()), shardState);
logger.debug("[{}][{}] found {} allocations of {}, highest version: [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound, shard, nodesAndVersions.highestVersion);
if (isEnoughAllocationsFound(shard, indexMetaData, nodesAndVersions) == false) {
// if we are restoring this shard we still can allocate
if (shard.restoreSource() == null) {
// we can't really allocate, so ignore it and continue
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound);
} else {
logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource());
}
continue;
}
NodesToAllocate nodesToAllocate = buildNodesToAllocate(shard, allocation, nodesAndVersions);
if (nodesToAllocate.yesNodes.isEmpty() == false) {
DiscoveryNode node = nodesToAllocate.yesNodes.get(0);
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
changed = true;
routingNodes.initialize(new ShardRouting(shard, nodesAndVersions.highestVersion), node.id());
unassignedIterator.remove();
} else if (nodesToAllocate.throttleNodes.isEmpty() == true && nodesToAllocate.noNodes.isEmpty() == false) {
DiscoveryNode node = nodesToAllocate.noNodes.get(0);
logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
changed = true;
routingNodes.initialize(new ShardRouting(shard, nodesAndVersions.highestVersion), node.id());
unassignedIterator.remove();
} else {
// we are throttling this, but we have enough to allocate to this node, ignore it for now
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodes);
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
}
}
return changed;
}
/**
* Does the shard need to find a primary copy?
*/
boolean needToFindPrimaryCopy(ShardRouting shard, IndexShardRoutingTable indexShardRoutingTable) {
if (shard.primary() == false) {
return false;
}
// this is an API allocation, ignore since we know there is no data...
if (indexShardRoutingTable.primaryAllocatedPostApi() == false) {
return false;
}
return true;
}
private boolean isEnoughAllocationsFound(ShardRouting shard, IndexMetaData indexMetaData, NodesAndVersions nodesAndVersions) {
// check if the counts meets the minimum set
int requiredAllocation = 1;
// if we restore from a repository one copy is more then enough
if (shard.restoreSource() == null) {
try {
String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards));
if ("quorum".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1;
}
} else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 2) {
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2);
}
} else if ("one".equals(initialShards)) {
requiredAllocation = 1;
} else if ("full".equals(initialShards) || "all".equals(initialShards)) {
requiredAllocation = indexMetaData.numberOfReplicas() + 1;
} else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
requiredAllocation = indexMetaData.numberOfReplicas();
}
} else {
requiredAllocation = Integer.parseInt(initialShards);
}
} catch (Exception e) {
logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard);
}
}
return nodesAndVersions.allocationsFound >= requiredAllocation;
}
/**
* Based on the nodes and versions, build the list of yes/no/throttle nodes that the shard applies to.
*/
private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, NodesAndVersions nodesAndVersions) {
List<DiscoveryNode> yesNodes = new ArrayList<>();
List<DiscoveryNode> throttledNodes = new ArrayList<>();
List<DiscoveryNode> noNodes = new ArrayList<>();
for (DiscoveryNode discoNode : nodesAndVersions.nodes) {
RoutingNode node = allocation.routingNodes().node(discoNode.id());
if (node == null) {
continue;
}
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
throttledNodes.add(discoNode);
} else if (decision.type() == Decision.Type.NO) {
noNodes.add(discoNode);
} else {
yesNodes.add(discoNode);
}
}
return new NodesToAllocate(Collections.unmodifiableList(yesNodes), Collections.unmodifiableList(throttledNodes), Collections.unmodifiableList(noNodes));
}
/**
* Builds a list of nodes and version
*/
private NodesAndVersions buildNodesAndVersions(ShardRouting shard, boolean recoveryOnAnyNode, Set<String> ignoreNodes,
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState) {
final Map<DiscoveryNode, Long> nodesWithVersion = Maps.newHashMap();
int numberOfAllocationsFound = 0;
long highestVersion = -1;
for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
long version = nodeShardState.version();
DiscoveryNode node = nodeShardState.getNode();
if (ignoreNodes.contains(node.id())) {
continue;
}
// -1 version means it does not exists, which is what the API returns, and what we expect to
if (nodeShardState.storeException() == null) {
logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version);
} else {
// when there is an store exception, we disregard the reported version and assign it as -1 (same as shard does not exist)
logger.trace("[{}] on node [{}] has version [{}] but the store can not be opened, treating as version -1", nodeShardState.storeException(), shard, nodeShardState.getNode(), version);
version = -1;
}
if (recoveryOnAnyNode) {
numberOfAllocationsFound++;
if (version > highestVersion) {
highestVersion = version;
}
// We always put the node without clearing the map
nodesWithVersion.put(node, version);
} else if (version != -1) {
numberOfAllocationsFound++;
// If we've found a new "best" candidate, clear the
// current candidates and add it
if (version > highestVersion) {
highestVersion = version;
nodesWithVersion.clear();
nodesWithVersion.put(node, version);
} else if (version == highestVersion) {
// If the candidate is the same, add it to the
// list, but keep the current candidate
nodesWithVersion.put(node, version);
}
}
}
// Now that we have a map of nodes to versions along with the
// number of allocations found (and not ignored), we need to sort
// it so the node with the highest version is at the beginning
List<DiscoveryNode> nodesWithHighestVersion = Lists.newArrayList();
nodesWithHighestVersion.addAll(nodesWithVersion.keySet());
CollectionUtil.timSort(nodesWithHighestVersion, new Comparator<DiscoveryNode>() {
@Override
public int compare(DiscoveryNode o1, DiscoveryNode o2) {
return Long.compare(nodesWithVersion.get(o2), nodesWithVersion.get(o1));
}
});
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("[");
for (DiscoveryNode n : nodesWithVersion.keySet()) {
sb.append("[").append(n.getName()).append("]").append(" -> ").append(nodesWithVersion.get(n)).append(", ");
}
sb.append("]");
logger.trace("{} candidates for allocation: {}", shard, sb.toString());
}
return new NodesAndVersions(Collections.unmodifiableList(nodesWithHighestVersion), numberOfAllocationsFound, highestVersion);
}
/**
* Return {@code true} if the index is configured to allow shards to be
* recovered on any node
*/
private boolean recoverOnAnyNode(@IndexSettings Settings idxSettings) {
return IndexMetaData.isOnSharedFilesystem(idxSettings) &&
idxSettings.getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false);
}
protected abstract AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);
static class NodesAndVersions {
public final List<DiscoveryNode> nodes;
public final int allocationsFound;
public final long highestVersion;
public NodesAndVersions(List<DiscoveryNode> nodes, int allocationsFound, long highestVersion) {
this.nodes = nodes;
this.allocationsFound = allocationsFound;
this.highestVersion = highestVersion;
}
}
static class NodesToAllocate {
final List<DiscoveryNode> yesNodes;
final List<DiscoveryNode> throttleNodes;
final List<DiscoveryNode> noNodes;
public NodesToAllocate(List<DiscoveryNode> yesNodes, List<DiscoveryNode> throttleNodes, List<DiscoveryNode> noNodes) {
this.yesNodes = yesNodes;
this.throttleNodes = throttleNodes;
this.noNodes = noNodes;
}
}
}

View File

@ -0,0 +1,212 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import java.util.Iterator;
import java.util.Map;
/**
*/
public abstract class ReplicaShardAllocator extends AbstractComponent {
public ReplicaShardAllocator(Settings settings) {
super(settings);
}
public boolean allocateUnassigned(RoutingAllocation allocation) {
boolean changed = false;
final RoutingNodes routingNodes = allocation.routingNodes();
final MetaData metaData = routingNodes.metaData();
final Iterator<ShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) {
ShardRouting shard = unassignedIterator.next();
if (shard.primary()) {
continue;
}
// pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing
boolean canBeAllocatedToAtLeastOneNode = false;
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().dataNodes().values()) {
RoutingNode node = routingNodes.node(cursor.value.id());
if (node == null) {
continue;
}
// if we can't allocate it on a node, ignore it, for example, this handles
// cases for only allocating a replica after a primary
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.YES) {
canBeAllocatedToAtLeastOneNode = true;
break;
}
}
if (!canBeAllocatedToAtLeastOneNode) {
logger.trace("{}: ignoring allocation, can't be allocated on any node", shard);
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
continue;
}
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetchData(shard, allocation);
if (shardStores.hasData() == false) {
logger.trace("{}: ignoring allocation, still fetching shard stores", shard);
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
continue; // still fetching
}
long lastSizeMatched = 0;
DiscoveryNode lastDiscoNodeMatched = null;
RoutingNode lastNodeMatched = null;
boolean hasReplicaData = false;
IndexMetaData indexMetaData = metaData.index(shard.getIndex());
for (Map.Entry<DiscoveryNode, TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> nodeStoreEntry : shardStores.getData().entrySet()) {
DiscoveryNode discoNode = nodeStoreEntry.getKey();
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData();
logger.trace("{}: checking node [{}]", shard, discoNode);
if (storeFilesMetaData == null) {
// already allocated on that node...
continue;
}
RoutingNode node = routingNodes.node(discoNode.id());
if (node == null) {
continue;
}
// check if we can allocate on that node...
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.NO) {
continue;
}
// if it is already allocated, we can't assign to it...
if (storeFilesMetaData.allocated()) {
continue;
}
if (!shard.primary()) {
hasReplicaData |= storeFilesMetaData.iterator().hasNext();
ShardRouting primaryShard = routingNodes.activePrimary(shard);
if (primaryShard != null) {
assert primaryShard.active();
DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
if (primaryNode != null) {
TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData primaryNodeFilesStore = shardStores.getData().get(primaryNode);
if (primaryNodeFilesStore != null) {
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryNodeStore = primaryNodeFilesStore.storeFilesMetaData();
if (primaryNodeStore != null && primaryNodeStore.allocated()) {
long sizeMatched = 0;
String primarySyncId = primaryNodeStore.syncId();
String replicaSyncId = storeFilesMetaData.syncId();
// see if we have a sync id we can make use of
if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) {
logger.trace("{}: node [{}] has same sync id {} as primary", shard, discoNode.name(), replicaSyncId);
lastNodeMatched = node;
lastSizeMatched = Long.MAX_VALUE;
lastDiscoNodeMatched = discoNode;
} else {
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
String metaDataFileName = storeFileMetaData.name();
if (primaryNodeStore.fileExists(metaDataFileName) && primaryNodeStore.file(metaDataFileName).isSame(storeFileMetaData)) {
sizeMatched += storeFileMetaData.length();
}
}
logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data",
shard, discoNode.name(), new ByteSizeValue(sizeMatched), sizeMatched);
if (sizeMatched > lastSizeMatched) {
lastSizeMatched = sizeMatched;
lastDiscoNodeMatched = discoNode;
lastNodeMatched = node;
}
}
}
}
}
}
}
}
if (lastNodeMatched != null) {
// we only check on THROTTLE since we checked before before on NO
Decision decision = allocation.deciders().canAllocate(shard, lastNodeMatched, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
}
// we are throttling this, but we have enough to allocate to this node, ignore it for now
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
} else {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
}
// we found a match
changed = true;
routingNodes.initialize(shard, lastNodeMatched.nodeId());
unassignedIterator.remove();
}
} else if (hasReplicaData == false) {
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation
// of the replica shard needs to be delayed, and if so, add it to the ignore unassigned list
// note: we only care about replica in delayed allocation, since if we have an unassigned primary it
// will anyhow wait to find an existing copy of the shard to be allocated
// note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings());
if (delay > 0) {
logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueMillis(delay));
/**
* mark it as changed, since we want to kick a publishing to schedule future allocation,
* see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}).
*/
changed = true;
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
}
}
}
return changed;
}
protected abstract AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation);
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.settings; package org.elasticsearch.index.settings;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
@ -74,7 +75,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_PRIORITY, Validator.NON_NEGATIVE_INTEGER); indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_PRIORITY, Validator.NON_NEGATIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(IndicesTTLService.INDEX_TTL_DISABLE_PURGE); indexDynamicSettings.addDynamicSetting(IndicesTTLService.INDEX_TTL_DISABLE_PURGE);
indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_REFRESH_INTERVAL, Validator.TIME); indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_REFRESH_INTERVAL, Validator.TIME);
indexDynamicSettings.addDynamicSetting(GatewayAllocator.INDEX_RECOVERY_INITIAL_SHARDS); indexDynamicSettings.addDynamicSetting(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN); indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME); indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN); indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);

View File

@ -0,0 +1,319 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class PrimaryShardAllocatorTests extends ElasticsearchAllocationTestCase {
private final ShardId shardId = new ShardId("test", 0);
private final DiscoveryNode node1 = newNode("node1");
private final DiscoveryNode node2 = newNode("node2");
private final DiscoveryNode node3 = newNode("node3");
private TestAllocator testAllocator;
@Before
public void buildTestAllocator() {
this.testAllocator = new TestAllocator();
}
/**
* Verifies that the canProcess method of primary allocation behaves correctly
* and processes only the applicable shard.
*/
@Test
public void testNoProcessReplica() {
ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, false, ShardRoutingState.UNASSIGNED, 0, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
assertThat(testAllocator.needToFindPrimaryCopy(shard, null), equalTo(false));
}
/**
* Tests that when async fetch returns that there is no data, the shard will not be allocated.
*/
@Test
public void testNoAsyncFetchData() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
}
/**
* Tests when the node returns that no data was found for it (-1), it will be moved to ignore unassigned.
*/
@Test
public void testNoAllocationFound() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
testAllocator.addData(node1, -1);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
}
/**
* Tests when the node returns that no data was found for it (-1), it will be moved to ignore unassigned.
*/
@Test
public void testStoreException() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
testAllocator.addData(node1, 3, new CorruptIndexException("test", "test"));
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
}
/**
* Tests that when there is a node to allocate the shard to, it will be allocated to it.
*/
@Test
public void testFoundAllocationAndAllocating() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
testAllocator.addData(node1, 10);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().ignoredUnassigned().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id()));
}
/**
* Tests that when there is a node to allocate to, but it is throttling (and it is the only one),
* it will be moved to ignore unassigned until it can be allocated to.
*/
@Test
public void testFoundAllocationButThrottlingDecider() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders());
testAllocator.addData(node1, 10);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
}
/**
* Tests that when there is a node to be allocated to, but it the decider said "no", we still
* force the allocation to it.
*/
@Test
public void testFoundAllocationButNoDecider() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders());
testAllocator.addData(node1, 10);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().ignoredUnassigned().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id()));
}
/**
* Tests that the highest version node is chosed for allocation.
*/
@Test
public void testAllocateToTheHighestVersion() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
testAllocator.addData(node1, 10).addData(node2, 12);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().ignoredUnassigned().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id()));
}
/**
* Tests that when restoring from snapshot, even if we didn't find any node to allocate on, the shard
* will remain in the unassigned list to be allocated later.
*/
@Test
public void testRestoreIgnoresNoNodesToAllocate() {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsRestore(metaData.index(shardId.getIndex()), new RestoreSource(new SnapshotId("test", "test"), Version.CURRENT, shardId.getIndex()))
.build();
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
testAllocator.addData(node1, -1).addData(node2, -1);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().ignoredUnassigned().isEmpty(), equalTo(true));
}
/**
* Tests that only when enough copies of the shard exists we are going to allocate it. This test
* verifies that with same version (1), and quorum allocation.
*/
@Test
public void testEnoughCopiesFoundForAllocation() {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsRecovery(metaData.index(shardId.getIndex()))
.build();
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
testAllocator.addData(node1, 1);
allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
testAllocator.addData(node2, 1);
allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(0));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), anyOf(equalTo(node2.id()), equalTo(node1.id())));
}
/**
* Tests that only when enough copies of the shard exists we are going to allocate it. This test
* verifies that even with different version, we treat different versions as a copy, and count them.
*/
@Test
public void testEnoughCopiesFoundForAllocationWithDifferentVersion() {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsRecovery(metaData.index(shardId.getIndex()))
.build();
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
testAllocator.addData(node1, 1);
allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(1));
assertThat(allocation.routingNodes().ignoredUnassigned().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
testAllocator.addData(node2, 2);
allocation = new RoutingAllocation(yesAllocationDeciders(), state.routingNodes(), state.nodes(), null);
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().ignoredUnassigned().size(), equalTo(0));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id()));
}
private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders) {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsRecovery(metaData.index(shardId.getIndex()))
.build();
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
return new RoutingAllocation(deciders, state.routingNodes(), state.nodes(), null);
}
class TestAllocator extends PrimaryShardAllocator {
private Map<DiscoveryNode, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> data;
public TestAllocator() {
super(Settings.EMPTY);
}
public TestAllocator clear() {
data = null;
return this;
}
public TestAllocator addData(DiscoveryNode node, long version) {
return addData(node, version, null);
}
public TestAllocator addData(DiscoveryNode node, long version, @Nullable Throwable storeException) {
if (data == null) {
data = new HashMap<>();
}
data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, version, storeException));
return this;
}
@Override
protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation) {
return new AsyncShardFetch.FetchResult<>(shardId, data, Collections.<String>emptySet(), Collections.<String>emptySet());
}
}
}

View File

@ -51,7 +51,7 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.*; import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.index.translog.TranslogService;
@ -514,7 +514,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
internalCluster().ensureAtLeastNumDataNodes(2); internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("test").setSettings(Settings.builder() assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(GatewayAllocator.INDEX_RECOVERY_INITIAL_SHARDS, "one") .put(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS, "one")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose

View File

@ -23,13 +23,16 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecidersModule; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecidersModule;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
@ -121,4 +124,41 @@ public abstract class ElasticsearchAllocationTestCase extends ElasticsearchTestC
RoutingTable routingTable = strategy.applyStartedShards(clusterState, newArrayList(initializingShards.get(randomInt(initializingShards.size() - 1)))).routingTable(); RoutingTable routingTable = strategy.applyStartedShards(clusterState, newArrayList(initializingShards.get(randomInt(initializingShards.size() - 1)))).routingTable();
return ClusterState.builder(clusterState).routingTable(routingTable).build(); return ClusterState.builder(clusterState).routingTable(routingTable).build();
} }
public static AllocationDeciders yesAllocationDeciders() {
return new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {new TestAllocateDecision(Decision.YES)});
}
public static AllocationDeciders noAllocationDeciders() {
return new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {new TestAllocateDecision(Decision.NO)});
}
public static AllocationDeciders throttleAllocationDeciders() {
return new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {new TestAllocateDecision(Decision.THROTTLE)});
}
static class TestAllocateDecision extends AllocationDecider {
private final Decision decision;
public TestAllocateDecision(Decision decision) {
super(Settings.EMPTY);
this.decision = decision;
}
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return decision;
}
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
return decision;
}
@Override
public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
return decision;
}
}
} }