From 8119b965178a00bc72457a4635978dd1162f77c7 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 9 Jun 2020 09:06:59 -0600 Subject: [PATCH] Fix stalled send translog ops request (#57859) Currently, the translog ops request is reentrent when there is a mapping update. The impact of this is that a translog ops ends up waiting on the pre-existing listener and it is never completed. This commit fixes this by introducing a new code path to avoid the idempotency logic. --- .../recovery/PeerRecoveryTargetService.java | 113 ++++++++++-------- 1 file changed, 61 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index f9787d43ce8..3921c923e50 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -348,60 +348,69 @@ public class PeerRecoveryTargetService implements IndexEventListener { return; } - final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); - final Consumer retryOnMappingException = exception -> { - // in very rare cases a translog replay from primary is processed before a mapping update on this node - // which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node. - logger.debug("delaying recovery due to missing mapping changes", exception); - // we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be - // canceled) - observer.waitForNextChange(new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - try { - messageReceived(request, channel, task); - } catch (Exception e) { - listener.onFailure(e); - } - } - - @Override - public void onClusterServiceClose() { - listener.onFailure(new ElasticsearchException( - "cluster service was closed while waiting for mapping updates")); - } - - @Override - public void onTimeout(TimeValue timeout) { - // note that we do not use a timeout (see comment above) - listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates " + - "(timeout [" + timeout + "])")); - } - }); - }; - final IndexMetadata indexMetadata = clusterService.state().metadata().index(request.shardId().getIndex()); - final long mappingVersionOnTarget = indexMetadata != null ? indexMetadata.getMappingVersion() : 0L; - recoveryTarget.indexTranslogOperations( - request.operations(), - request.totalTranslogOps(), - request.maxSeenAutoIdTimestampOnPrimary(), - request.maxSeqNoOfUpdatesOrDeletesOnPrimary(), - request.retentionLeases(), - request.mappingVersionOnPrimary(), - ActionListener.wrap( - checkpoint -> listener.onResponse(null), - e -> { - // do not retry if the mapping on replica is at least as recent as the mapping - // that the primary used to index the operations in the request. - if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) { - retryOnMappingException.accept(e); - } else { - listener.onFailure(e); - } - }) - ); + performTranslogOps(request, listener, recoveryRef); } } + + private void performTranslogOps(final RecoveryTranslogOperationsRequest request, final ActionListener listener, + final RecoveryRef recoveryRef) { + final RecoveryTarget recoveryTarget = recoveryRef.target(); + + final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); + final Consumer retryOnMappingException = exception -> { + // in very rare cases a translog replay from primary is processed before a mapping update on this node + // which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node. + logger.debug("delaying recovery due to missing mapping changes", exception); + // we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be + // canceled) + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + try { + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { + performTranslogOps(request, listener, recoveryRef); + } + } catch (Exception e) { + listener.onFailure(e); + } + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new ElasticsearchException( + "cluster service was closed while waiting for mapping updates")); + } + + @Override + public void onTimeout(TimeValue timeout) { + // note that we do not use a timeout (see comment above) + listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates " + + "(timeout [" + timeout + "])")); + } + }); + }; + final IndexMetadata indexMetadata = clusterService.state().metadata().index(request.shardId().getIndex()); + final long mappingVersionOnTarget = indexMetadata != null ? indexMetadata.getMappingVersion() : 0L; + recoveryTarget.indexTranslogOperations( + request.operations(), + request.totalTranslogOps(), + request.maxSeenAutoIdTimestampOnPrimary(), + request.maxSeqNoOfUpdatesOrDeletesOnPrimary(), + request.retentionLeases(), + request.mappingVersionOnPrimary(), + ActionListener.wrap( + checkpoint -> listener.onResponse(null), + e -> { + // do not retry if the mapping on replica is at least as recent as the mapping + // that the primary used to index the operations in the request. + if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) { + retryOnMappingException.accept(e); + } else { + listener.onFailure(e); + } + }) + ); + } } class FilesInfoRequestHandler implements TransportRequestHandler {