Fix stalled send translog ops request (#57859)
Currently, the translog ops request is reentrent when there is a mapping update. The impact of this is that a translog ops ends up waiting on the pre-existing listener and it is never completed. This commit fixes this by introducing a new code path to avoid the idempotency logic.
This commit is contained in:
parent
24a50eb3af
commit
8119b96517
|
@ -348,60 +348,69 @@ public class PeerRecoveryTargetService implements IndexEventListener {
|
|||
return;
|
||||
}
|
||||
|
||||
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
|
||||
final Consumer<Exception> retryOnMappingException = exception -> {
|
||||
// in very rare cases a translog replay from primary is processed before a mapping update on this node
|
||||
// which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node.
|
||||
logger.debug("delaying recovery due to missing mapping changes", exception);
|
||||
// we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
|
||||
// canceled)
|
||||
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
try {
|
||||
messageReceived(request, channel, task);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new ElasticsearchException(
|
||||
"cluster service was closed while waiting for mapping updates"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
// note that we do not use a timeout (see comment above)
|
||||
listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates " +
|
||||
"(timeout [" + timeout + "])"));
|
||||
}
|
||||
});
|
||||
};
|
||||
final IndexMetadata indexMetadata = clusterService.state().metadata().index(request.shardId().getIndex());
|
||||
final long mappingVersionOnTarget = indexMetadata != null ? indexMetadata.getMappingVersion() : 0L;
|
||||
recoveryTarget.indexTranslogOperations(
|
||||
request.operations(),
|
||||
request.totalTranslogOps(),
|
||||
request.maxSeenAutoIdTimestampOnPrimary(),
|
||||
request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
|
||||
request.retentionLeases(),
|
||||
request.mappingVersionOnPrimary(),
|
||||
ActionListener.wrap(
|
||||
checkpoint -> listener.onResponse(null),
|
||||
e -> {
|
||||
// do not retry if the mapping on replica is at least as recent as the mapping
|
||||
// that the primary used to index the operations in the request.
|
||||
if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) {
|
||||
retryOnMappingException.accept(e);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
})
|
||||
);
|
||||
performTranslogOps(request, listener, recoveryRef);
|
||||
}
|
||||
}
|
||||
|
||||
private void performTranslogOps(final RecoveryTranslogOperationsRequest request, final ActionListener<Void> listener,
|
||||
final RecoveryRef recoveryRef) {
|
||||
final RecoveryTarget recoveryTarget = recoveryRef.target();
|
||||
|
||||
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
|
||||
final Consumer<Exception> retryOnMappingException = exception -> {
|
||||
// in very rare cases a translog replay from primary is processed before a mapping update on this node
|
||||
// which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node.
|
||||
logger.debug("delaying recovery due to missing mapping changes", exception);
|
||||
// we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
|
||||
// canceled)
|
||||
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
try {
|
||||
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
|
||||
performTranslogOps(request, listener, recoveryRef);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new ElasticsearchException(
|
||||
"cluster service was closed while waiting for mapping updates"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
// note that we do not use a timeout (see comment above)
|
||||
listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates " +
|
||||
"(timeout [" + timeout + "])"));
|
||||
}
|
||||
});
|
||||
};
|
||||
final IndexMetadata indexMetadata = clusterService.state().metadata().index(request.shardId().getIndex());
|
||||
final long mappingVersionOnTarget = indexMetadata != null ? indexMetadata.getMappingVersion() : 0L;
|
||||
recoveryTarget.indexTranslogOperations(
|
||||
request.operations(),
|
||||
request.totalTranslogOps(),
|
||||
request.maxSeenAutoIdTimestampOnPrimary(),
|
||||
request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
|
||||
request.retentionLeases(),
|
||||
request.mappingVersionOnPrimary(),
|
||||
ActionListener.wrap(
|
||||
checkpoint -> listener.onResponse(null),
|
||||
e -> {
|
||||
// do not retry if the mapping on replica is at least as recent as the mapping
|
||||
// that the primary used to index the operations in the request.
|
||||
if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) {
|
||||
retryOnMappingException.accept(e);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
class FilesInfoRequestHandler implements TransportRequestHandler<RecoveryFilesInfoRequest> {
|
||||
|
|
Loading…
Reference in New Issue