[GATEWAY] When using `recover_on_any_node`, respect Deciders

When in a shared filesystem environment and recovering the primary to
any node. We should respect the allocation deciders if possible (still
force-allocting to another node if there aren't any "YES" decisions).

The AllocationDeciders should take precedence over the shard state
version when force-allocating an unassigned primary shard.
This commit is contained in:
Lee Hinman 2015-05-14 12:13:59 -06:00
parent f241236af0
commit 22ae561bfd
2 changed files with 145 additions and 32 deletions

View File

@ -23,8 +23,10 @@ import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.predicates.ObjectPredicate;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -45,14 +47,13 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.transport.ConnectTransportException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
/**
@ -101,6 +102,15 @@ public class GatewayAllocator extends AbstractComponent {
}
}
/**
* Return {@code true} if the index is configured to allow shards to be
* recovered on any node
*/
private boolean recoverOnAnyNode(@IndexSettings Settings idxSettings) {
return IndexMetaData.isOnSharedFilesystem(idxSettings) &&
idxSettings.getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false);
}
public boolean allocateUnassigned(RoutingAllocation allocation) {
boolean changed = false;
DiscoveryNodes nodes = allocation.nodes();
@ -125,11 +135,13 @@ public class GatewayAllocator extends AbstractComponent {
int numberOfAllocationsFound = 0;
long highestVersion = -1;
Set<DiscoveryNode> nodesWithHighestVersion = Sets.newHashSet();
final Map<DiscoveryNode, Long> nodesWithVersion = Maps.newHashMap();
assert !nodesState.containsKey(null);
final Object[] keys = nodesState.keys;
final long[] values = nodesState.values;
IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index());
Settings idxSettings = indexMetaData.settings();
for (int i = 0; i < keys.length; i++) {
if (keys[i] == null) {
continue;
@ -141,21 +153,56 @@ public class GatewayAllocator extends AbstractComponent {
if (allocation.shouldIgnoreShardForNode(shard.shardId(), node.id())) {
continue;
}
if (version != -1) {
if (recoverOnAnyNode(idxSettings)) {
numberOfAllocationsFound++;
if (highestVersion == -1) {
nodesWithHighestVersion.add(node);
highestVersion = version;
} else {
if (version > highestVersion) {
nodesWithHighestVersion.clear();
nodesWithHighestVersion.add(node);
highestVersion = version;
}
// We always put the node without clearing the map
nodesWithVersion.put(node, version);
} else if (version != -1) {
numberOfAllocationsFound++;
// If we've found a new "best" candidate, clear the
// current candidates and add it
if (version > highestVersion) {
highestVersion = version;
nodesWithVersion.clear();
nodesWithVersion.put(node, version);
} else if (version == highestVersion) {
nodesWithHighestVersion.add(node);
// If the candidate is the same, add it to the
// list, but keep the current candidate
nodesWithVersion.put(node, version);
}
}
}
// Now that we have a map of nodes to versions along with the
// number of allocations found (and not ignored), we need to sort
// it so the node with the highest version is at the beginning
List<DiscoveryNode> nodesWithHighestVersion = Lists.newArrayList();
nodesWithHighestVersion.addAll(nodesWithVersion.keySet());
CollectionUtil.timSort(nodesWithHighestVersion, new Comparator<DiscoveryNode>() {
@Override
public int compare(DiscoveryNode o1, DiscoveryNode o2) {
return Long.compare(nodesWithVersion.get(o2), nodesWithVersion.get(o1));
}
});
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] found {} allocations of {}, highest version: [{}]",
shard.index(), shard.id(), numberOfAllocationsFound, shard, highestVersion);
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("[");
for (DiscoveryNode n : nodesWithHighestVersion) {
sb.append("[");
sb.append(n.getName());
sb.append("]");
sb.append(" -> ");
sb.append(nodesWithVersion.get(n));
sb.append(", ");
}
sb.append("]");
logger.trace("{} candidates for allocation: {}", shard, sb.toString());
}
// check if the counts meets the minimum set
@ -163,7 +210,6 @@ public class GatewayAllocator extends AbstractComponent {
// if we restore from a repository one copy is more then enough
if (shard.restoreSource() == null) {
try {
IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index());
String initialShards = indexMetaData.settings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards));
if ("quorum".equals(initialShards)) {
if (indexMetaData.numberOfReplicas() > 1) {
@ -415,13 +461,6 @@ public class GatewayAllocator extends AbstractComponent {
for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : response) {
long version = nodeShardState.version();
Settings idxSettings = indexMetaData.settings();
if (IndexMetaData.isOnSharedFilesystem(idxSettings) &&
idxSettings.getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false)) {
// Shared filesystems use 0 as a minimum shard state, which
// means that the shard can be allocated to any node
version = Math.max(0, version);
}
// -1 version means it does not exists, which is what the API returns, and what we expect to
logger.trace("[{}] on node [{}] has version [{}] of shard",
shard, nodeShardState.getNode(), version);

View File

@ -52,6 +52,7 @@ import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -612,20 +613,67 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
assertThat(hits[3].field("foo").getValue().toString(), equalTo("foo"));
}
/** wait until none of the nodes have shards allocated on them */
private void assertNoShardsOn(final List<String> nodeList) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
RoutingNodes nodes = resp.getState().getRoutingNodes();
for (RoutingNode node : nodes) {
logger.info("--> node {} has {} shards", node.node().getName(), node.numberOfOwningShards());
if (nodeList.contains(node.node().getName())) {
assertThat("no shards on node", node.numberOfOwningShards(), equalTo(0));
}
}
}
});
}
/** wait until the node has the specified number of shards allocated on it */
private void assertShardCountOn(final String nodeName, final int shardCount) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
RoutingNodes nodes = resp.getState().getRoutingNodes();
for (RoutingNode node : nodes) {
logger.info("--> node {} has {} shards", node.node().getName(), node.numberOfOwningShards());
if (nodeName.equals(node.node().getName())) {
assertThat(node.numberOfOwningShards(), equalTo(shardCount));
}
}
}
});
}
@Test
public void testIndexOnSharedFSRecoversToAnyNode() throws Exception {
Settings nodeSettings = nodeSettings();
Settings fooSettings = ImmutableSettings.builder().put(nodeSettings).put("node.affinity", "foo").build();
Settings barSettings = ImmutableSettings.builder().put(nodeSettings).put("node.affinity", "bar").build();
internalCluster().startNode(nodeSettings);
final Future<List<String>> fooNodes = internalCluster().startNodesAsync(2, fooSettings);
final Future<List<String>> barNodes = internalCluster().startNodesAsync(2, barSettings);
fooNodes.get();
barNodes.get();
Path dataPath = createTempDir();
String IDX = "test";
Settings includeFoo = ImmutableSettings.builder()
.put("index.routing.allocation.include.affinity", "foo")
.build();
Settings includeBar = ImmutableSettings.builder()
.put("index.routing.allocation.include.affinity", "bar")
.build();
Settings idxSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true)
.put(includeFoo) // start with requiring the shards on "foo"
.build();
// only one node, so all primaries will end up on node1
@ -637,17 +685,43 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "3").setSource("foo", "baz").get();
client().prepareIndex(IDX, "doc", "4").setSource("foo", "eggplant").get();
flushAndRefresh(IDX);
// start a second node
internalCluster().startNode(nodeSettings);
// put shards on "bar"
client().admin().indices().prepareUpdateSettings(IDX).setSettings(includeBar).get();
// node1 is master, stop that one, since we only have primaries,
// usually this would mean data loss, but not on shared fs!
internalCluster().stopCurrentMasterNode();
// wait for the shards to move from "foo" nodes to "bar" nodes
assertNoShardsOn(fooNodes.get());
// put shards back on "foo"
client().admin().indices().prepareUpdateSettings(IDX).setSettings(includeFoo).get();
// wait for the shards to move from "bar" nodes to "foo" nodes
assertNoShardsOn(barNodes.get());
// Stop a foo node
logger.info("--> stopping first 'foo' node");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(fooNodes.get().get(0)));
// Ensure that the other foo node has all the shards now
assertShardCountOn(fooNodes.get().get(1), 5);
// Assert no shards on the "bar" nodes
assertNoShardsOn(barNodes.get());
// Stop the second "foo" node
logger.info("--> stopping second 'foo' node");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(fooNodes.get().get(1)));
// The index should still be able to be allocated (on the "bar" nodes),
// all the "foo" nodes are gone
ensureGreen(IDX);
refresh();
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).addFieldDataField("foo").addSort("foo", SortOrder.ASC).get();
assertHitCount(resp, 4);
// Start another "foo" node and make sure the index moves back
logger.info("--> starting additional 'foo' node");
String newFooNode = internalCluster().startNode(fooSettings);
assertShardCountOn(newFooNode, 5);
assertNoShardsOn(barNodes.get());
}
}