mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 02:14:54 +00:00
Handle no such remote cluster exception in ccr (#53415)
A remote client can throw a NoSuchRemoteClusterException while fetching the cluster state from the leader cluster. We also need to handle that exception when retrying to add a retention lease to the leader shard. Closes #53225
This commit is contained in:
parent
4ecc7dcca5
commit
73d24203e7
@ -151,18 +151,9 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
||||
final Index followerIndex = params.getFollowShardId().getIndex();
|
||||
final Index leaderIndex = params.getLeaderShardId().getIndex();
|
||||
final Supplier<TimeValue> timeout = () -> isStopped() ? TimeValue.MINUS_ONE : waitForMetadataTimeOut;
|
||||
|
||||
final Client remoteClient;
|
||||
try {
|
||||
remoteClient = remoteClient(params);
|
||||
} catch (NoSuchRemoteClusterException e) {
|
||||
errorHandler.accept(e);
|
||||
return;
|
||||
}
|
||||
|
||||
CcrRequests.getIndexMetadata(remoteClient, leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap(
|
||||
final ActionListener<IndexMetadata> listener = ActionListener.wrap(
|
||||
indexMetadata -> {
|
||||
if (indexMetadata.getMappings().isEmpty()) {
|
||||
if (indexMetadata.mapping() == null) {
|
||||
assert indexMetadata.getMappingVersion() == 1;
|
||||
handler.accept(indexMetadata.getMappingVersion());
|
||||
return;
|
||||
@ -176,7 +167,12 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
||||
errorHandler));
|
||||
},
|
||||
errorHandler
|
||||
));
|
||||
);
|
||||
try {
|
||||
CcrRequests.getIndexMetadata(remoteClient(params), leaderIndex, minRequiredMappingVersion, 0L, timeout, listener);
|
||||
} catch (NoSuchRemoteClusterException e) {
|
||||
errorHandler.accept(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -445,21 +441,27 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
||||
"{} background adding retention lease [{}] while following",
|
||||
params.getFollowShardId(),
|
||||
retentionLeaseId);
|
||||
CcrRetentionLeases.asyncAddRetentionLease(
|
||||
try {
|
||||
final ActionListener<RetentionLeaseActions.Response> wrappedListener = ActionListener.wrap(
|
||||
r -> {},
|
||||
inner -> {
|
||||
/*
|
||||
* If this fails that the retention lease already exists, something highly unusual is
|
||||
* going on. Log it, and renew again after another renew interval has passed.
|
||||
*/
|
||||
final Throwable innerCause = ExceptionsHelper.unwrapCause(inner);
|
||||
logRetentionLeaseFailure(retentionLeaseId, innerCause);
|
||||
});
|
||||
CcrRetentionLeases.asyncAddRetentionLease(
|
||||
params.getLeaderShardId(),
|
||||
retentionLeaseId,
|
||||
followerGlobalCheckpoint.getAsLong(),
|
||||
remoteClient(params),
|
||||
ActionListener.wrap(
|
||||
r -> {},
|
||||
inner -> {
|
||||
/*
|
||||
* If this fails that the retention lease already exists, something highly unusual is
|
||||
* going on. Log it, and renew again after another renew interval has passed.
|
||||
*/
|
||||
final Throwable innerCause = ExceptionsHelper.unwrapCause(inner);
|
||||
logRetentionLeaseFailure(retentionLeaseId, innerCause);
|
||||
}));
|
||||
wrappedListener);
|
||||
} catch (NoSuchRemoteClusterException rce) {
|
||||
// we will attempt to renew again after another renew interval has passed
|
||||
logRetentionLeaseFailure(retentionLeaseId, rce);
|
||||
}
|
||||
} else {
|
||||
// if something else happened, we will attempt to renew again after another renew interval has passed
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user