Discard stale node responses from async shard fetching (#24434)

Async shard fetching only uses the node id to correlate responses to requests. This can lead to a situation where a response from an earlier request is mistaken as response from a new request when a node is restarted. This commit adds unique round ids to correlate responses to requests.
This commit is contained in:
Yannick Welsch 2017-05-03 09:47:21 +02:00 committed by GitHub
parent 4a578e9c71
commit be19ccef57
6 changed files with 114 additions and 47 deletions

View File

@ -29,7 +29,6 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterShardHealth; import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -155,7 +154,7 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
} }
@Override @Override
protected synchronized void processAsyncFetch(ShardId shardId, List<NodeGatewayStartedShards> responses, List<FailedNodeException> failures) { protected synchronized void processAsyncFetch(List<NodeGatewayStartedShards> responses, List<FailedNodeException> failures, long fetchingRound) {
fetchResponses.add(new Response(shardId, responses, failures)); fetchResponses.add(new Response(shardId, responses, failures));
if (expectedOps.countDown()) { if (expectedOps.countDown()) {
finish(); finish();

View File

@ -44,6 +44,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableSet; import static java.util.Collections.unmodifiableSet;
@ -67,10 +68,11 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
protected final Logger logger; protected final Logger logger;
protected final String type; protected final String type;
private final ShardId shardId; protected final ShardId shardId;
private final Lister<BaseNodesResponse<T>, T> action; private final Lister<BaseNodesResponse<T>, T> action;
private final Map<String, NodeEntry<T>> cache = new HashMap<>(); private final Map<String, NodeEntry<T>> cache = new HashMap<>();
private final Set<String> nodesToIgnore = new HashSet<>(); private final Set<String> nodesToIgnore = new HashSet<>();
private final AtomicLong round = new AtomicLong();
private boolean closed; private boolean closed;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -112,20 +114,22 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
} }
nodesToIgnore.addAll(ignoreNodes); nodesToIgnore.addAll(ignoreNodes);
fillShardCacheWithDataNodes(cache, nodes); fillShardCacheWithDataNodes(cache, nodes);
Set<NodeEntry<T>> nodesToFetch = findNodesToFetch(cache); List<NodeEntry<T>> nodesToFetch = findNodesToFetch(cache);
if (nodesToFetch.isEmpty() == false) { if (nodesToFetch.isEmpty() == false) {
// mark all node as fetching and go ahead and async fetch them // mark all node as fetching and go ahead and async fetch them
// use a unique round id to detect stale responses in processAsyncFetch
final long fetchingRound = round.incrementAndGet();
for (NodeEntry<T> nodeEntry : nodesToFetch) { for (NodeEntry<T> nodeEntry : nodesToFetch) {
nodeEntry.markAsFetching(); nodeEntry.markAsFetching(fetchingRound);
} }
DiscoveryNode[] discoNodesToFetch = nodesToFetch.stream().map(NodeEntry::getNodeId).map(nodes::get) DiscoveryNode[] discoNodesToFetch = nodesToFetch.stream().map(NodeEntry::getNodeId).map(nodes::get)
.toArray(DiscoveryNode[]::new); .toArray(DiscoveryNode[]::new);
asyncFetch(shardId, discoNodesToFetch); asyncFetch(discoNodesToFetch, fetchingRound);
} }
// if we are still fetching, return null to indicate it // if we are still fetching, return null to indicate it
if (hasAnyNodeFetching(cache)) { if (hasAnyNodeFetching(cache)) {
return new FetchResult<>(shardId, null, emptySet(), emptySet()); return new FetchResult<>(shardId, null, emptySet());
} else { } else {
// nothing to fetch, yay, build the return value // nothing to fetch, yay, build the return value
Map<DiscoveryNode, T> fetchData = new HashMap<>(); Map<DiscoveryNode, T> fetchData = new HashMap<>();
@ -158,7 +162,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
if (failedNodes.isEmpty() == false || allIgnoreNodes.isEmpty() == false) { if (failedNodes.isEmpty() == false || allIgnoreNodes.isEmpty() == false) {
reroute(shardId, "nodes failed [" + failedNodes.size() + "], ignored [" + allIgnoreNodes.size() + "]"); reroute(shardId, "nodes failed [" + failedNodes.size() + "], ignored [" + allIgnoreNodes.size() + "]");
} }
return new FetchResult<>(shardId, fetchData, failedNodes, allIgnoreNodes); return new FetchResult<>(shardId, fetchData, allIgnoreNodes);
} }
} }
@ -168,7 +172,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
* the shard (response + failures), issuing a reroute at the end of it to make sure there will be another round * the shard (response + failures), issuing a reroute at the end of it to make sure there will be another round
* of allocations taking this new data into account. * of allocations taking this new data into account.
*/ */
protected synchronized void processAsyncFetch(ShardId shardId, List<T> responses, List<FailedNodeException> failures) { protected synchronized void processAsyncFetch(List<T> responses, List<FailedNodeException> failures, long fetchingRound) {
if (closed) { if (closed) {
// we are closed, no need to process this async fetch at all // we are closed, no need to process this async fetch at all
logger.trace("{} ignoring fetched [{}] results, already closed", shardId, type); logger.trace("{} ignoring fetched [{}] results, already closed", shardId, type);
@ -179,35 +183,48 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
if (responses != null) { if (responses != null) {
for (T response : responses) { for (T response : responses) {
NodeEntry<T> nodeEntry = cache.get(response.getNode().getId()); NodeEntry<T> nodeEntry = cache.get(response.getNode().getId());
// if the entry is there, and not marked as failed already, process it if (nodeEntry != null) {
if (nodeEntry == null) { if (nodeEntry.getFetchingRound() != fetchingRound) {
continue; assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
} logger.trace("{} received response for [{}] from node {} for an older fetching round (expected: {} but was: {})",
if (nodeEntry.isFailed()) { shardId, nodeEntry.getNodeId(), type, nodeEntry.getFetchingRound(), fetchingRound);
logger.trace("{} node {} has failed for [{}] (failure [{}])", shardId, nodeEntry.getNodeId(), type, nodeEntry.getFailure()); } else if (nodeEntry.isFailed()) {
logger.trace("{} node {} has failed for [{}] (failure [{}])", shardId, nodeEntry.getNodeId(), type,
nodeEntry.getFailure());
} else { } else {
// if the entry is there, for the right fetching round and not marked as failed already, process it
logger.trace("{} marking {} as done for [{}], result is [{}]", shardId, nodeEntry.getNodeId(), type, response); logger.trace("{} marking {} as done for [{}], result is [{}]", shardId, nodeEntry.getNodeId(), type, response);
nodeEntry.doneFetching(response); nodeEntry.doneFetching(response);
} }
} }
} }
}
if (failures != null) { if (failures != null) {
for (FailedNodeException failure : failures) { for (FailedNodeException failure : failures) {
logger.trace("{} processing failure {} for [{}]", shardId, failure, type); logger.trace("{} processing failure {} for [{}]", shardId, failure, type);
NodeEntry<T> nodeEntry = cache.get(failure.nodeId()); NodeEntry<T> nodeEntry = cache.get(failure.nodeId());
// if the entry is there, and not marked as failed already, process it if (nodeEntry != null) {
if (nodeEntry != null && nodeEntry.isFailed() == false) { if (nodeEntry.getFetchingRound() != fetchingRound) {
assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
logger.trace("{} received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})",
shardId, nodeEntry.getNodeId(), type, nodeEntry.getFetchingRound(), fetchingRound);
} else if (nodeEntry.isFailed() == false) {
// if the entry is there, for the right fetching round and not marked as failed already, process it
Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause()); Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause());
// if the request got rejected or timed out, we need to try it again next time... // if the request got rejected or timed out, we need to try it again next time...
if (unwrappedCause instanceof EsRejectedExecutionException || unwrappedCause instanceof ReceiveTimeoutTransportException || unwrappedCause instanceof ElasticsearchTimeoutException) { if (unwrappedCause instanceof EsRejectedExecutionException ||
unwrappedCause instanceof ReceiveTimeoutTransportException ||
unwrappedCause instanceof ElasticsearchTimeoutException) {
nodeEntry.restartFetching(); nodeEntry.restartFetching();
} else { } else {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("{}: failed to list shard for {} on node [{}]", shardId, type, failure.nodeId()), failure); logger.warn((Supplier<?>) () -> new ParameterizedMessage("{}: failed to list shard for {} on node [{}]",
shardId, type, failure.nodeId()), failure);
nodeEntry.doneFetching(failure.getCause()); nodeEntry.doneFetching(failure.getCause());
} }
} }
} }
} }
}
reroute(shardId, "post_response"); reroute(shardId, "post_response");
} }
@ -241,8 +258,8 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
* Finds all the nodes that need to be fetched. Those are nodes that have no * Finds all the nodes that need to be fetched. Those are nodes that have no
* data, and are not in fetch mode. * data, and are not in fetch mode.
*/ */
private Set<NodeEntry<T>> findNodesToFetch(Map<String, NodeEntry<T>> shardCache) { private List<NodeEntry<T>> findNodesToFetch(Map<String, NodeEntry<T>> shardCache) {
Set<NodeEntry<T>> nodesToFetch = new HashSet<>(); List<NodeEntry<T>> nodesToFetch = new ArrayList<>();
for (NodeEntry<T> nodeEntry : shardCache.values()) { for (NodeEntry<T> nodeEntry : shardCache.values()) {
if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) { if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) {
nodesToFetch.add(nodeEntry); nodesToFetch.add(nodeEntry);
@ -267,12 +284,12 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
* Async fetches data for the provided shard with the set of nodes that need to be fetched from. * Async fetches data for the provided shard with the set of nodes that need to be fetched from.
*/ */
// visible for testing // visible for testing
void asyncFetch(final ShardId shardId, final DiscoveryNode[] nodes) { void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) {
logger.trace("{} fetching [{}] from {}", shardId, type, nodes); logger.trace("{} fetching [{}] from {}", shardId, type, nodes);
action.list(shardId, nodes, new ActionListener<BaseNodesResponse<T>>() { action.list(shardId, nodes, new ActionListener<BaseNodesResponse<T>>() {
@Override @Override
public void onResponse(BaseNodesResponse<T> response) { public void onResponse(BaseNodesResponse<T> response) {
processAsyncFetch(shardId, response.getNodes(), response.failures()); processAsyncFetch(response.getNodes(), response.failures(), fetchingRound);
} }
@Override @Override
@ -281,7 +298,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
for (final DiscoveryNode node: nodes) { for (final DiscoveryNode node: nodes) {
failures.add(new FailedNodeException(node.getId(), "total failure in fetching", e)); failures.add(new FailedNodeException(node.getId(), "total failure in fetching", e));
} }
processAsyncFetch(shardId, null, failures); processAsyncFetch(null, failures, fetchingRound);
} }
}); });
} }
@ -294,13 +311,11 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
private final ShardId shardId; private final ShardId shardId;
private final Map<DiscoveryNode, T> data; private final Map<DiscoveryNode, T> data;
private final Set<String> failedNodes;
private final Set<String> ignoreNodes; private final Set<String> ignoreNodes;
public FetchResult(ShardId shardId, Map<DiscoveryNode, T> data, Set<String> failedNodes, Set<String> ignoreNodes) { public FetchResult(ShardId shardId, Map<DiscoveryNode, T> data, Set<String> ignoreNodes) {
this.shardId = shardId; this.shardId = shardId;
this.data = data; this.data = data;
this.failedNodes = failedNodes;
this.ignoreNodes = ignoreNodes; this.ignoreNodes = ignoreNodes;
} }
@ -342,6 +357,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
private T value; private T value;
private boolean valueSet; private boolean valueSet;
private Throwable failure; private Throwable failure;
private long fetchingRound;
NodeEntry(String nodeId) { NodeEntry(String nodeId) {
this.nodeId = nodeId; this.nodeId = nodeId;
@ -355,9 +371,10 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
return fetching; return fetching;
} }
void markAsFetching() { void markAsFetching(long fetchingRound) {
assert fetching == false : "double marking a node as fetching"; assert fetching == false : "double marking a node as fetching";
fetching = true; this.fetching = true;
this.fetchingRound = fetchingRound;
} }
void doneFetching(T value) { void doneFetching(T value) {
@ -402,5 +419,9 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
assert valueSet : "value is not set, hasn't been fetched yet"; assert valueSet : "value is not set, hasn't been fetched yet";
return value; return value;
} }
long getFetchingRound() {
return fetchingRound;
}
} }
} }

View File

@ -140,6 +140,55 @@ public class AsyncShardFetchTests extends ESTestCase {
assertThat(fetchData.getData().get(node1), sameInstance(response1)); assertThat(fetchData.getData().get(node1), sameInstance(response1));
} }
public void testIgnoreResponseFromDifferentRound() throws Exception {
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).build();
test.addSimulation(node1.getId(), response1);
// first fetch, no data, still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
// handle a response with incorrect round id, wait on reroute incrementing
test.processAsyncFetch(Collections.singletonList(response1), Collections.emptyList(), 0);
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(1));
// fire a response (with correct round id), wait on reroute incrementing
test.fireSimulationAndWait(node1.getId());
// verify we get back the data node
assertThat(test.reroute.get(), equalTo(2));
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
}
public void testIgnoreFailureFromDifferentRound() throws Exception {
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).build();
// add a failed response for node1
test.addSimulation(node1.getId(), failure1);
// first fetch, no data, still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
// handle a failure with incorrect round id, wait on reroute incrementing
test.processAsyncFetch(Collections.emptyList(), Collections.singletonList(
new FailedNodeException(node1.getId(), "dummy failure", failure1)), 0);
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(1));
// fire a response, wait on reroute incrementing
test.fireSimulationAndWait(node1.getId());
// failure, fetched data exists, but has no data
assertThat(test.reroute.get(), equalTo(2));
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(0));
}
public void testTwoNodesOnSetup() throws Exception { public void testTwoNodesOnSetup() throws Exception {
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).build(); DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).build();
test.addSimulation(node1.getId(), response1); test.addSimulation(node1.getId(), response1);
@ -267,7 +316,7 @@ public class AsyncShardFetchTests extends ESTestCase {
} }
@Override @Override
protected void asyncFetch(final ShardId shardId, DiscoveryNode[] nodes) { protected void asyncFetch(DiscoveryNode[] nodes, long fetchingRound) {
for (final DiscoveryNode node : nodes) { for (final DiscoveryNode node : nodes) {
final String nodeId = node.getId(); final String nodeId = node.getId();
threadPool.generic().execute(new Runnable() { threadPool.generic().execute(new Runnable() {
@ -283,11 +332,10 @@ public class AsyncShardFetchTests extends ESTestCase {
assert entry != null; assert entry != null;
entry.executeLatch.await(); entry.executeLatch.await();
if (entry.failure != null) { if (entry.failure != null) {
processAsyncFetch(shardId, null, Collections.singletonList(new FailedNodeException(nodeId, processAsyncFetch(null,
"unexpected", Collections.singletonList(new FailedNodeException(nodeId, "unexpected", entry.failure)), fetchingRound);
entry.failure)));
} else { } else {
processAsyncFetch(shardId, Collections.singletonList(entry.response), null); processAsyncFetch(Collections.singletonList(entry.response), null, fetchingRound);
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("unexpected failure", e); logger.error("unexpected failure", e);

View File

@ -481,7 +481,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
@Override @Override
protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation) { protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation) {
return new AsyncShardFetch.FetchResult<>(shardId, data, Collections.<String>emptySet(), Collections.<String>emptySet()); return new AsyncShardFetch.FetchResult<>(shardId, data, Collections.<String>emptySet());
} }
} }
} }

View File

@ -389,7 +389,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
tData.put(entry.getKey(), new TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData(entry.getKey(), entry.getValue())); tData.put(entry.getKey(), new TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData(entry.getKey(), entry.getValue()));
} }
} }
return new AsyncShardFetch.FetchResult<>(shardId, tData, Collections.<String>emptySet(), Collections.<String>emptySet()); return new AsyncShardFetch.FetchResult<>(shardId, tData, Collections.emptySet());
} }
@Override @Override

View File

@ -77,7 +77,7 @@ public class TestGatewayAllocator extends GatewayAllocator {
new NodeGatewayStartedShards( new NodeGatewayStartedShards(
currentNodes.get(routing.currentNodeId()), routing.allocationId().getId(), routing.primary()))); currentNodes.get(routing.currentNodeId()), routing.allocationId().getId(), routing.primary())));
return new AsyncShardFetch.FetchResult<>(shardId, foundShards, Collections.emptySet(), ignoreNodes); return new AsyncShardFetch.FetchResult<>(shardId, foundShards, ignoreNodes);
} }
}; };
@ -86,8 +86,7 @@ public class TestGatewayAllocator extends GatewayAllocator {
protected AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation) { protected AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation) {
// for now, just pretend no node has data // for now, just pretend no node has data
final ShardId shardId = shard.shardId(); final ShardId shardId = shard.shardId();
return new AsyncShardFetch.FetchResult<>(shardId, Collections.emptyMap(), Collections.emptySet(), return new AsyncShardFetch.FetchResult<>(shardId, Collections.emptyMap(), allocation.getIgnoreNodes(shardId));
allocation.getIgnoreNodes(shardId));
} }
@Override @Override