[CCR] Improve retry mechanism when making remote calls from shard follow task (#31930)
Closes #31816
This commit is contained in:
parent
d0c9cf26a9
commit
006c79a80d
|
@ -67,7 +67,6 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
private volatile int numConcurrentWrites = 0;
|
||||
private volatile long followerGlobalCheckpoint = 0;
|
||||
private volatile long currentIndexMetadataVersion = 0;
|
||||
private final AtomicInteger retryCounter = new AtomicInteger(0);
|
||||
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo).reversed());
|
||||
|
||||
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
|
||||
|
@ -168,14 +167,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
return true;
|
||||
}
|
||||
|
||||
|
||||
private void sendShardChangesRequest(long from, int maxOperationCount, long maxRequiredSeqNo) {
|
||||
sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, new AtomicInteger(0));
|
||||
}
|
||||
|
||||
private void sendShardChangesRequest(long from, int maxOperationCount, long maxRequiredSeqNo, AtomicInteger retryCounter) {
|
||||
innerSendShardChangesRequest(from, maxOperationCount,
|
||||
response -> {
|
||||
retryCounter.set(0);
|
||||
handleReadResponse(from, maxRequiredSeqNo, response);
|
||||
},
|
||||
e -> handleFailure(e, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo)));
|
||||
response -> handleReadResponse(from, maxRequiredSeqNo, response),
|
||||
e -> handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter)));
|
||||
}
|
||||
|
||||
private void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
|
||||
|
@ -223,12 +222,13 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
}
|
||||
|
||||
private void sendBulkShardOperationsRequest(List<Translog.Operation> operations) {
|
||||
sendBulkShardOperationsRequest(operations, new AtomicInteger(0));
|
||||
}
|
||||
|
||||
private void sendBulkShardOperationsRequest(List<Translog.Operation> operations, AtomicInteger retryCounter) {
|
||||
innerSendBulkShardOperationsRequest(operations,
|
||||
followerLocalCheckpoint -> {
|
||||
retryCounter.set(0);
|
||||
handleWriteResponse(followerLocalCheckpoint);
|
||||
},
|
||||
e -> handleFailure(e, () -> sendBulkShardOperationsRequest(operations))
|
||||
this::handleWriteResponse,
|
||||
e -> handleFailure(e, retryCounter, () -> sendBulkShardOperationsRequest(operations, retryCounter))
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -252,14 +252,21 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
LOGGER.trace("{} updating mapping, index metadata version [{}] is lower than minimum required index metadata version [{}]",
|
||||
params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion);
|
||||
updateMapping(imdVersion -> {
|
||||
retryCounter.set(0);
|
||||
currentIndexMetadataVersion = imdVersion;
|
||||
task.run();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void handleFailure(Exception e, Runnable task) {
|
||||
private void updateMapping(LongConsumer handler) {
|
||||
updateMapping(handler, new AtomicInteger(0));
|
||||
}
|
||||
|
||||
private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) {
|
||||
innerUpdateMapping(handler, e -> handleFailure(e, retryCounter, () -> updateMapping(handler, retryCounter)));
|
||||
}
|
||||
|
||||
private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
|
||||
assert e != null;
|
||||
if (shouldRetry(e)) {
|
||||
if (isStopped() == false && retryCounter.incrementAndGet() <= RETRY_LIMIT) {
|
||||
|
@ -281,7 +288,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
}
|
||||
|
||||
// These methods are protected for testing purposes:
|
||||
protected abstract void updateMapping(LongConsumer handler);
|
||||
protected abstract void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler);
|
||||
|
||||
protected abstract void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, LongConsumer handler,
|
||||
Consumer<Exception> errorHandler);
|
||||
|
|
|
@ -94,7 +94,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
scheduler, params.getIdleShardRetryDelay(), params.getRetryTimeout()) {
|
||||
|
||||
@Override
|
||||
protected void updateMapping(LongConsumer handler) {
|
||||
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
|
||||
Index leaderIndex = params.getLeaderShardId().getIndex();
|
||||
Index followIndex = params.getFollowShardId().getIndex();
|
||||
|
||||
|
@ -114,8 +114,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
|
||||
followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(
|
||||
putMappingResponse -> handler.accept(indexMetaData.getVersion()),
|
||||
e -> handleFailure(e, () -> updateMapping(handler))));
|
||||
}, e -> handleFailure(e, () -> updateMapping(handler))));
|
||||
errorHandler));
|
||||
}, errorHandler));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -147,7 +147,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(500)) {
|
||||
|
||||
@Override
|
||||
protected void updateMapping(LongConsumer handler) {
|
||||
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
|
||||
mappingUpdateCounter.incrementAndGet();
|
||||
handler.accept(imdVersion.get());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue