Simplify Replica Allocator

Simplify the codebase of replica allocator and add more unit tests for it
This commit is contained in:
Shay Banon 2015-07-22 18:42:31 +02:00
parent a33cfe4b11
commit 33d2ca13a9
2 changed files with 187 additions and 107 deletions

View File

@ -19,7 +19,10 @@
package org.elasticsearch.gateway;
import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -29,6 +32,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -50,8 +54,6 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
public boolean allocateUnassigned(RoutingAllocation allocation) {
boolean changed = false;
final RoutingNodes routingNodes = allocation.routingNodes();
final MetaData metaData = routingNodes.metaData();
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) {
ShardRouting shard = unassignedIterator.next();
@ -60,22 +62,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
}
// pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing
boolean canBeAllocatedToAtLeastOneNode = false;
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().dataNodes().values()) {
RoutingNode node = routingNodes.node(cursor.value.id());
if (node == null) {
continue;
}
// if we can't allocate it on a node, ignore it, for example, this handles
// cases for only allocating a replica after a primary
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.YES) {
canBeAllocatedToAtLeastOneNode = true;
break;
}
}
if (!canBeAllocatedToAtLeastOneNode) {
if (canBeAllocatedToAtLeastOneNode(shard, allocation) == false) {
logger.trace("{}: ignoring allocation, can't be allocated on any node", shard);
unassignedIterator.removeAndIgnore();
continue;
@ -88,106 +75,41 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
continue; // still fetching
}
long lastSizeMatched = 0;
DiscoveryNode lastDiscoNodeMatched = null;
RoutingNode lastNodeMatched = null;
boolean hasReplicaData = false;
IndexMetaData indexMetaData = metaData.index(shard.getIndex());
for (Map.Entry<DiscoveryNode, TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> nodeStoreEntry : shardStores.getData().entrySet()) {
DiscoveryNode discoNode = nodeStoreEntry.getKey();
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData();
logger.trace("{}: checking node [{}]", shard, discoNode);
if (storeFilesMetaData == null) {
// already allocated on that node...
continue;
}
RoutingNode node = routingNodes.node(discoNode.id());
if (node == null) {
continue;
}
// check if we can allocate on that node...
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.NO) {
continue;
}
// if it is already allocated, we can't assign to it...
if (storeFilesMetaData.allocated()) {
continue;
}
if (!shard.primary()) {
hasReplicaData |= storeFilesMetaData.iterator().hasNext();
ShardRouting primaryShard = routingNodes.activePrimary(shard);
if (primaryShard != null) {
assert primaryShard.active();
DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
if (primaryNode != null) {
TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData primaryNodeFilesStore = shardStores.getData().get(primaryNode);
if (primaryNodeFilesStore != null) {
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryNodeStore = primaryNodeFilesStore.storeFilesMetaData();
if (primaryNodeStore != null && primaryNodeStore.allocated()) {
long sizeMatched = 0;
String primarySyncId = primaryNodeStore.syncId();
String replicaSyncId = storeFilesMetaData.syncId();
// see if we have a sync id we can make use of
if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) {
logger.trace("{}: node [{}] has same sync id {} as primary", shard, discoNode.name(), replicaSyncId);
lastNodeMatched = node;
lastSizeMatched = Long.MAX_VALUE;
lastDiscoNodeMatched = discoNode;
} else {
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
String metaDataFileName = storeFileMetaData.name();
if (primaryNodeStore.fileExists(metaDataFileName) && primaryNodeStore.file(metaDataFileName).isSame(storeFileMetaData)) {
sizeMatched += storeFileMetaData.length();
}
}
logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data",
shard, discoNode.name(), new ByteSizeValue(sizeMatched), sizeMatched);
if (sizeMatched > lastSizeMatched) {
lastSizeMatched = sizeMatched;
lastDiscoNodeMatched = discoNode;
lastNodeMatched = node;
}
}
}
}
}
}
}
ShardRouting primaryShard = routingNodes.activePrimary(shard);
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryShard, allocation, shardStores);
if (primaryStore == null || primaryStore.allocated() == false) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// we want to let the replica be allocated in order to expose the actual problem with the primary that the replica
// will try and recover from
// Note, this is the existing behavior, as exposed in running CorruptFileTest#testNoPrimaryData
logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
continue;
}
if (lastNodeMatched != null) {
MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryStore, shardStores);
if (matchingNodes.getNodeWithHighestMatch() != null) {
RoutingNode nodeWithHighestMatch = allocation.routingNodes().node(matchingNodes.getNodeWithHighestMatch().id());
// we only check on THROTTLE since we checked before before on NO
Decision decision = allocation.deciders().canAllocate(shard, lastNodeMatched, allocation);
Decision decision = allocation.deciders().canAllocate(shard, nodeWithHighestMatch, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
}
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node());
// we are throttling this, but we have enough to allocate to this node, ignore it for now
unassignedIterator.removeAndIgnore();
} else {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
}
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node());
// we found a match
changed = true;
unassignedIterator.initialize(lastNodeMatched.nodeId());
unassignedIterator.initialize(nodeWithHighestMatch.nodeId());
}
} else if (hasReplicaData == false) {
} else if (matchingNodes.hasAnyData() == false) {
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation
// of the replica shard needs to be delayed, and if so, add it to the ignore unassigned list
// note: we only care about replica in delayed allocation, since if we have an unassigned primary it
// will anyhow wait to find an existing copy of the shard to be allocated
// note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService
IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndex());
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings());
if (delay > 0) {
logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueMillis(delay));
@ -203,5 +125,134 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
return changed;
}
/**
* Can the shard be allocated on at least one node based on the allocation deciders.
*/
private boolean canBeAllocatedToAtLeastOneNode(ShardRouting shard, RoutingAllocation allocation) {
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().dataNodes().values()) {
RoutingNode node = allocation.routingNodes().node(cursor.value.id());
if (node == null) {
continue;
}
// if we can't allocate it on a node, ignore it, for example, this handles
// cases for only allocating a replica after a primary
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.YES) {
return true;
}
}
return false;
}
/**
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
*/
private TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(ShardRouting shard, RoutingAllocation allocation, AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> data) {
assert shard.currentNodeId() != null;
DiscoveryNode primaryNode = allocation.nodes().get(shard.currentNodeId());
if (primaryNode == null) {
return null;
}
TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData primaryNodeFilesStore = data.getData().get(primaryNode);
if (primaryNodeFilesStore == null) {
return null;
}
return primaryNodeFilesStore.storeFilesMetaData();
}
private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation,
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> data) {
ObjectLongMap<DiscoveryNode> nodesToSize = new ObjectLongHashMap<>();
for (Map.Entry<DiscoveryNode, TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> nodeStoreEntry : data.getData().entrySet()) {
DiscoveryNode discoNode = nodeStoreEntry.getKey();
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData();
if (storeFilesMetaData == null) {
// already allocated on that node...
continue;
}
RoutingNode node = allocation.routingNodes().node(discoNode.id());
if (node == null) {
continue;
}
// check if we can allocate on that node...
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.NO) {
continue;
}
// if it is already allocated, we can't assign to it... (and it might be primary as well)
if (storeFilesMetaData.allocated()) {
continue;
}
// we don't have any files at all, it is an empty index
if (storeFilesMetaData.iterator().hasNext() == false) {
continue;
}
String primarySyncId = primaryStore.syncId();
String replicaSyncId = storeFilesMetaData.syncId();
// see if we have a sync id we can make use of
if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) {
logger.trace("{}: node [{}] has same sync id {} as primary", shard, discoNode.name(), replicaSyncId);
nodesToSize.put(discoNode, Long.MAX_VALUE);
} else {
long sizeMatched = 0;
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
String metaDataFileName = storeFileMetaData.name();
if (primaryStore.fileExists(metaDataFileName) && primaryStore.file(metaDataFileName).isSame(storeFileMetaData)) {
sizeMatched += storeFileMetaData.length();
}
}
logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data",
shard, discoNode.name(), new ByteSizeValue(sizeMatched), sizeMatched);
nodesToSize.put(discoNode, sizeMatched);
}
}
return new MatchingNodes(nodesToSize);
}
protected abstract AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation);
static class MatchingNodes {
private final ObjectLongMap<DiscoveryNode> nodesToSize;
private final DiscoveryNode nodeWithHighestMatch;
public MatchingNodes(ObjectLongMap<DiscoveryNode> nodesToSize) {
this.nodesToSize = nodesToSize;
long highestMatchSize = 0;
DiscoveryNode highestMatchNode = null;
for (ObjectLongCursor<DiscoveryNode> cursor : nodesToSize) {
if (cursor.value > highestMatchSize) {
highestMatchSize = cursor.value;
highestMatchNode = cursor.key;
}
}
nodeWithHighestMatch = highestMatchNode;
}
/**
* Returns the node with the highest "non zero byte" match compared to
* the primary.
*/
@Nullable
public DiscoveryNode getNodeWithHighestMatch() {
return this.nodeWithHighestMatch;
}
/**
* Did we manage to find any data, regardless how well they matched or not.
*/
public boolean hasAnyData() {
return nodesToSize.isEmpty() == false;
}
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
@ -118,8 +119,9 @@ public class ReplicaShardAllocatorTests extends ElasticsearchAllocationTestCase
/**
* When we can't find primary data, but still find replica data, we go ahead and keep it unassigned
* to be allocated.
* TODO: this might be the wrong decision here, and we should restart the fetching process maybe to really find a primary copy?
* to be allocated. This is today behavior, which relies on a primary corruption identified with
* adding a replica and having that replica actually recover and cause the corruption to be identified
* See CorruptFileTest#
*/
@Test
public void testNoPrimaryData() {
@ -194,15 +196,42 @@ public class ReplicaShardAllocatorTests extends ElasticsearchAllocationTestCase
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
}
@Test
public void testDelayedAllocation() {
RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, TimeValue.timeValueHours(1)).build(), UnassignedInfo.Reason.NODE_LEFT);
testAllocator.addData(node1, true, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
if (randomBoolean()) {
// we sometime return empty list of files, make sure we test this as well
testAllocator.addData(node2, false, null);
}
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, TimeValue.timeValueHours(1)).build(), UnassignedInfo.Reason.NODE_LEFT);
testAllocator.addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id()));
}
private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders) {
return onePrimaryOnNode1And1Replica(deciders, Settings.EMPTY, UnassignedInfo.Reason.INDEX_CREATED);
}
private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders, Settings settings, UnassignedInfo.Reason reason) {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT).put(settings)).numberOfShards(1).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.add(IndexRoutingTable.builder(shardId.getIndex())
.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
.addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10))
.addShard(ShardRouting.newUnassigned(shardId.getIndex(), shardId.getId(), null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)))
.addShard(ShardRouting.newUnassigned(shardId.getIndex(), shardId.getId(), null, false, new UnassignedInfo(reason, null)))
.build())
)
.build();