[CCR] Improve shard follow task's retryable error handling ()

Improve failure handling of retryable errors by retrying remote calls in
a exponential backoff like manner. The delay between a retry would not be
longer than the configured max retry delay. Also retryable errors will be
retried indefinitely.

Relates to 
This commit is contained in:
Martijn van Groningen 2018-09-12 12:49:51 +02:00 committed by GitHub
parent c92ec1c5d7
commit 96c49e5ed0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 92 additions and 172 deletions

@ -56,9 +56,9 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
PARSER.declareLong(Request::setMaxOperationSizeInBytes, AutoFollowPattern.MAX_BATCH_SIZE_IN_BYTES);
PARSER.declareInt(Request::setMaxConcurrentWriteBatches, AutoFollowPattern.MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(Request::setMaxWriteBufferSize, AutoFollowPattern.MAX_WRITE_BUFFER_SIZE);
PARSER.declareField(Request::setRetryTimeout,
PARSER.declareField(Request::setMaxRetryDelay,
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.RETRY_TIMEOUT.getPreferredName()),
ShardFollowTask.RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
ShardFollowTask.MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
PARSER.declareField(Request::setIdleShardRetryDelay,
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.IDLE_SHARD_RETRY_DELAY.getPreferredName()),
ShardFollowTask.IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
@ -87,7 +87,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
private Long maxOperationSizeInBytes;
private Integer maxConcurrentWriteBatches;
private Integer maxWriteBufferSize;
private TimeValue retryTimeout;
private TimeValue maxRetryDelay;
private TimeValue idleShardRetryDelay;
@Override
@ -166,12 +166,12 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
this.maxWriteBufferSize = maxWriteBufferSize;
}
public TimeValue getRetryTimeout() {
return retryTimeout;
public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}
public void setRetryTimeout(TimeValue retryTimeout) {
this.retryTimeout = retryTimeout;
public void setMaxRetryDelay(TimeValue maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
}
public TimeValue getIdleShardRetryDelay() {
@ -193,7 +193,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
maxOperationSizeInBytes = in.readOptionalLong();
maxConcurrentWriteBatches = in.readOptionalVInt();
maxWriteBufferSize = in.readOptionalVInt();
retryTimeout = in.readOptionalTimeValue();
maxRetryDelay = in.readOptionalTimeValue();
idleShardRetryDelay = in.readOptionalTimeValue();
}
@ -208,7 +208,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
out.writeOptionalLong(maxOperationSizeInBytes);
out.writeOptionalVInt(maxConcurrentWriteBatches);
out.writeOptionalVInt(maxWriteBufferSize);
out.writeOptionalTimeValue(retryTimeout);
out.writeOptionalTimeValue(maxRetryDelay);
out.writeOptionalTimeValue(idleShardRetryDelay);
}
@ -236,8 +236,8 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
if (maxConcurrentWriteBatches != null) {
builder.field(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
}
if (retryTimeout != null) {
builder.field(ShardFollowTask.RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
if (maxRetryDelay != null) {
builder.field(ShardFollowTask.MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
}
if (idleShardRetryDelay != null) {
builder.field(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
@ -260,7 +260,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
Objects.equals(maxOperationSizeInBytes, request.maxOperationSizeInBytes) &&
Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) &&
Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) &&
Objects.equals(retryTimeout, request.retryTimeout) &&
Objects.equals(maxRetryDelay, request.maxRetryDelay) &&
Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay);
}
@ -275,7 +275,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
maxOperationSizeInBytes,
maxConcurrentWriteBatches,
maxWriteBufferSize,
retryTimeout,
maxRetryDelay,
idleShardRetryDelay
);
}

@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
@ -18,7 +19,6 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import java.util.ArrayList;
@ -43,11 +43,12 @@ import java.util.function.LongSupplier;
*/
public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private static final int DELAY_MILLIS = 50;
private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class);
private final String leaderIndex;
private final ShardFollowTask params;
private final TimeValue retryTimeout;
private final TimeValue maxRetryDelay;
private final TimeValue idleShardChangesRequestDelay;
private final BiConsumer<TimeValue, Runnable> scheduler;
private final LongSupplier relativeTimeProvider;
@ -79,7 +80,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
this.params = params;
this.scheduler = scheduler;
this.relativeTimeProvider = relativeTimeProvider;
this.retryTimeout = params.getRetryTimeout();
this.maxRetryDelay = params.getMaxRetryDelay();
this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
/*
* We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of
@ -357,20 +358,28 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
assert e != null;
if (shouldRetry(e)) {
if (isStopped() == false && retryCounter.incrementAndGet() <= FollowIndexAction.RETRY_LIMIT) {
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying...", params.getFollowShardId()), e);
scheduler.accept(retryTimeout, task);
} else {
markAsFailed(new ElasticsearchException("retrying failed [" + retryCounter.get() +
"] times, aborting...", e));
}
if (shouldRetry(e) && isStopped() == false) {
int currentRetry = retryCounter.incrementAndGet();
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]",
params.getFollowShardId(), currentRetry), e);
long delay = computeDelay(currentRetry, maxRetryDelay.getMillis());
scheduler.accept(TimeValue.timeValueMillis(delay), task);
} else {
markAsFailed(e);
}
}
private boolean shouldRetry(Exception e) {
static long computeDelay(int currentRetry, long maxRetryDelayInMillis) {
// Cap currentRetry to avoid overflow when computing n variable
int maxCurrentRetry = Math.min(currentRetry, 24);
long n = Math.round(Math.pow(2, maxCurrentRetry - 1));
// + 1 here, because nextInt(...) bound is exclusive and otherwise the first delay would always be zero.
int k = Randomness.get().nextInt(Math.toIntExact(n + 1));
int backOffDelay = k * DELAY_MILLIS;
return Math.min(backOffDelay, maxRetryDelayInMillis);
}
private static boolean shouldRetry(Exception e) {
return NetworkExceptionHelper.isConnectException(e) ||
NetworkExceptionHelper.isCloseConnectionException(e) ||
TransportActions.isShardNotAvailableException(e);

@ -48,7 +48,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
public static final ParseField MAX_BATCH_SIZE_IN_BYTES = new ParseField("max_batch_size_in_bytes");
public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
public static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout");
public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay");
@SuppressWarnings("unchecked")
@ -71,8 +71,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_SIZE);
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), RETRY_TIMEOUT.getPreferredName()),
RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()),
IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
@ -87,13 +87,13 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
private final long maxBatchSizeInBytes;
private final int maxConcurrentWriteBatches;
private final int maxWriteBufferSize;
private final TimeValue retryTimeout;
private final TimeValue maxRetryDelay;
private final TimeValue idleShardRetryDelay;
private final Map<String, String> headers;
ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, int maxBatchOperationCount,
int maxConcurrentReadBatches, long maxBatchSizeInBytes, int maxConcurrentWriteBatches,
int maxWriteBufferSize, TimeValue retryTimeout, TimeValue idleShardRetryDelay, Map<String, String> headers) {
int maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue idleShardRetryDelay, Map<String, String> headers) {
this.leaderClusterAlias = leaderClusterAlias;
this.followShardId = followShardId;
this.leaderShardId = leaderShardId;
@ -102,7 +102,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
this.maxWriteBufferSize = maxWriteBufferSize;
this.retryTimeout = retryTimeout;
this.maxRetryDelay = maxRetryDelay;
this.idleShardRetryDelay = idleShardRetryDelay;
this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
}
@ -116,7 +116,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
this.maxBatchSizeInBytes = in.readVLong();
this.maxConcurrentWriteBatches = in.readVInt();
this.maxWriteBufferSize = in.readVInt();
this.retryTimeout = in.readTimeValue();
this.maxRetryDelay = in.readTimeValue();
this.idleShardRetryDelay = in.readTimeValue();
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
}
@ -153,8 +153,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
return maxBatchSizeInBytes;
}
public TimeValue getRetryTimeout() {
return retryTimeout;
public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}
public TimeValue getIdleShardRetryDelay() {
@ -184,7 +184,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
out.writeVLong(maxBatchSizeInBytes);
out.writeVInt(maxConcurrentWriteBatches);
out.writeVInt(maxWriteBufferSize);
out.writeTimeValue(retryTimeout);
out.writeTimeValue(maxRetryDelay);
out.writeTimeValue(idleShardRetryDelay);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
}
@ -210,7 +210,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxBatchSizeInBytes);
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
builder.field(HEADERS.getPreferredName(), headers);
return builder.endObject();
@ -229,7 +229,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
maxConcurrentWriteBatches == that.maxConcurrentWriteBatches &&
maxBatchSizeInBytes == that.maxBatchSizeInBytes &&
maxWriteBufferSize == that.maxWriteBufferSize &&
Objects.equals(retryTimeout, that.retryTimeout) &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) &&
Objects.equals(headers, that.headers);
}
@ -237,7 +237,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
@Override
public int hashCode() {
return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxBatchOperationCount, maxConcurrentReadBatches,
maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, retryTimeout, idleShardRetryDelay, headers);
maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, maxRetryDelay, idleShardRetryDelay, headers);
}
public String toString() {

@ -175,7 +175,7 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
request.getMaxOperationSizeInBytes(),
request.getMaxConcurrentWriteBatches(),
request.getMaxWriteBufferSize(),
request.getRetryTimeout(),
request.getMaxRetryDelay(),
request.getIdleShardRetryDelay(),
filteredHeaders);
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,

@ -149,7 +149,7 @@ public class TransportPutAutoFollowPatternAction extends
request.getMaxOperationSizeInBytes(),
request.getMaxConcurrentWriteBatches(),
request.getMaxWriteBufferSize(),
request.getRetryTimeout(),
request.getMaxRetryDelay(),
request.getIdleShardRetryDelay()
);
patterns.put(request.getLeaderClusterAlias(), autoFollowPattern);

@ -131,7 +131,7 @@ public class AutoFollowTests extends ESSingleNodeTestCase {
request.setMaxOperationSizeInBytes(randomNonNegativeLong());
}
if (randomBoolean()) {
request.setRetryTimeout(TimeValue.timeValueMillis(500));
request.setMaxRetryDelay(TimeValue.timeValueMillis(500));
}
if (randomBoolean()) {
request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500));
@ -162,8 +162,8 @@ public class AutoFollowTests extends ESSingleNodeTestCase {
if (request.getMaxOperationSizeInBytes() != null) {
assertThat(shardFollowTask.getMaxBatchSizeInBytes(), equalTo(request.getMaxOperationSizeInBytes()));
}
if (request.getRetryTimeout() != null) {
assertThat(shardFollowTask.getRetryTimeout(), equalTo(request.getRetryTimeout()));
if (request.getMaxRetryDelay() != null) {
assertThat(shardFollowTask.getMaxRetryDelay(), equalTo(request.getMaxRetryDelay()));
}
if (request.getIdleShardRetryDelay() != null) {
assertThat(shardFollowTask.getIdleShardRetryDelay(), equalTo(request.getIdleShardRetryDelay()));

@ -41,7 +41,7 @@ public class PutAutoFollowPatternRequestTests extends AbstractStreamableXContent
request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500));
}
if (randomBoolean()) {
request.setRetryTimeout(TimeValue.timeValueMillis(500));
request.setMaxRetryDelay(TimeValue.timeValueMillis(500));
}
if (randomBoolean()) {
request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE));

@ -30,12 +30,13 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.sameInstance;
public class ShardFollowNodeTaskTests extends ESTestCase {
@ -177,7 +178,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
int max = randomIntBetween(1, 10);
int max = randomIntBetween(1, 30);
for (int i = 0; i < max; i++) {
readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
}
@ -223,59 +224,6 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}
public void testReceiveRetryableErrorRetriedTooManyTimes() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
int max = randomIntBetween(11, 32);
for (int i = 0; i < max; i++) {
readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
}
final AtomicLong retryCounter = new AtomicLong();
// before each retry, we assert the fetch failures; after the last retry, the fetch failure should persist
beforeSendShardChangesRequest = status -> {
assertThat(status.numberOfFailedFetches(), equalTo(retryCounter.get()));
if (retryCounter.get() > 0) {
assertThat(status.fetchExceptions().entrySet(), hasSize(1));
final Map.Entry<Long, ElasticsearchException> entry = status.fetchExceptions().entrySet().iterator().next();
assertThat(entry.getKey(), equalTo(0L));
assertThat(entry.getValue(), instanceOf(ElasticsearchException.class));
assertNotNull(entry.getValue().getCause());
assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class));
final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause();
assertThat(cause.getShardId().getIndexName(), equalTo("leader_index"));
assertThat(cause.getShardId().getId(), equalTo(0));
}
retryCounter.incrementAndGet();
};
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(11));
for (long[] shardChangesRequest : shardChangesRequests) {
assertThat(shardChangesRequest[0], equalTo(0L));
assertThat(shardChangesRequest[1], equalTo(64L));
}
assertTrue("task is stopped", task.isStopped());
assertThat(fatalError, notNullValue());
assertThat(fatalError.getMessage(), containsString("retrying failed ["));
ShardFollowNodeTaskStatus status = task.getStatus();
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.numberOfFailedFetches(), equalTo(11L));
assertThat(status.fetchExceptions().entrySet(), hasSize(1));
final Map.Entry<Long, ElasticsearchException> entry = status.fetchExceptions().entrySet().iterator().next();
assertThat(entry.getKey(), equalTo(0L));
assertThat(entry.getValue(), instanceOf(ElasticsearchException.class));
assertNotNull(entry.getValue().getCause());
assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class));
final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause();
assertThat(cause.getShardId().getIndexName(), equalTo("leader_index"));
assertThat(cause.getShardId().getId(), equalTo(0));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}
public void testReceiveNonRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
@ -455,7 +403,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
int max = randomIntBetween(1, 10);
int max = randomIntBetween(1, 30);
for (int i = 0; i < max; i++) {
mappingUpdateFailures.add(new ConnectException());
}
@ -476,31 +424,6 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testMappingUpdateRetryableErrorRetriedTooManyTimes() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
int max = randomIntBetween(11, 20);
for (int i = 0; i < max; i++) {
mappingUpdateFailures.add(new ConnectException());
}
mappingVersions.add(1L);
task.coordinateReads();
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L);
task.handleReadResponse(0L, 64L, response);
assertThat(mappingUpdateFailures.size(), equalTo(max - 11));
assertThat(mappingVersions.size(), equalTo(1));
assertThat(bulkShardOperationRequests.size(), equalTo(0));
assertThat(task.isStopped(), equalTo(true));
ShardFollowNodeTaskStatus status = task.getStatus();
assertThat(status.mappingVersion(), equalTo(0L));
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}
public void testMappingUpdateNonRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
@ -597,7 +520,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
int max = randomIntBetween(1, 10);
int max = randomIntBetween(1, 30);
for (int i = 0; i < max; i++) {
writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
}
@ -616,34 +539,6 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(status.followerGlobalCheckpoint(), equalTo(-1L));
}
public void testRetryableErrorRetriedTooManyTimes() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
int max = randomIntBetween(11, 32);
for (int i = 0; i < max; i++) {
writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
}
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 643);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 63L, response);
// Number of requests is equal to initial request + retried attempts:
assertThat(bulkShardOperationRequests.size(), equalTo(11));
for (List<Translog.Operation> operations : bulkShardOperationRequests) {
assertThat(operations, equalTo(Arrays.asList(response.getOperations())));
}
assertThat(task.isStopped(), equalTo(true));
ShardFollowNodeTaskStatus status = task.getStatus();
assertThat(status.numberOfConcurrentWrites(), equalTo(1));
assertThat(status.followerGlobalCheckpoint(), equalTo(-1L));
}
public void testNonRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
@ -712,8 +607,25 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(status.followerGlobalCheckpoint(), equalTo(63L));
}
ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBatches,
int bufferWriteLimit, long maxBatchSizeInBytes) {
public void testComputeDelay() {
long maxDelayInMillis = 1000;
assertThat(ShardFollowNodeTask.computeDelay(0, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(50L)));
assertThat(ShardFollowNodeTask.computeDelay(1, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(50L)));
assertThat(ShardFollowNodeTask.computeDelay(2, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(100L)));
assertThat(ShardFollowNodeTask.computeDelay(3, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(200L)));
assertThat(ShardFollowNodeTask.computeDelay(4, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(400L)));
assertThat(ShardFollowNodeTask.computeDelay(5, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(800L)));
assertThat(ShardFollowNodeTask.computeDelay(6, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(1000L)));
assertThat(ShardFollowNodeTask.computeDelay(7, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(1000L)));
assertThat(ShardFollowNodeTask.computeDelay(8, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(1000L)));
assertThat(ShardFollowNodeTask.computeDelay(1024, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(1000L)));
}
private ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount,
int maxConcurrentReadBatches,
int maxConcurrentWriteBatches,
int bufferWriteLimit,
long maxBatchSizeInBytes) {
AtomicBoolean stopped = new AtomicBoolean(false);
ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0),
new ShardId("leader_index", "", 0), maxBatchOperationCount, maxConcurrentReadBatches, maxBatchSizeInBytes,

@ -33,7 +33,6 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
public static final int DEFAULT_MAX_CONCURRENT_READ_BATCHES = 1;
public static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1;
public static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE;
public static final int RETRY_LIMIT = 10;
public static final TimeValue DEFAULT_RETRY_TIMEOUT = new TimeValue(500);
public static final TimeValue DEFAULT_IDLE_SHARD_RETRY_DELAY = TimeValue.timeValueSeconds(10);
@ -55,7 +54,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
private static final ParseField MAX_BATCH_SIZE_IN_BYTES = new ParseField("max_batch_size_in_bytes");
private static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
private static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
private static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout");
private static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
private static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay");
private static final ConstructingObjectParser<Request, String> PARSER = new ConstructingObjectParser<>(NAME, true,
(args, followerIndex) -> {
@ -76,8 +75,8 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_SIZE);
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), RETRY_TIMEOUT.getPreferredName()),
RETRY_TIMEOUT,
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
MAX_RETRY_DELAY,
ObjectParser.ValueType.STRING);
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
@ -143,10 +142,10 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
return maxWriteBufferSize;
}
private TimeValue retryTimeout;
private TimeValue maxRetryDelay;
public TimeValue getRetryTimeout() {
return retryTimeout;
public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}
private TimeValue idleShardRetryDelay;
@ -163,7 +162,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
final Long maxOperationSizeInBytes,
final Integer maxConcurrentWriteBatches,
final Integer maxWriteBufferSize,
final TimeValue retryTimeout,
final TimeValue maxRetryDelay,
final TimeValue idleShardRetryDelay) {
if (leaderIndex == null) {
@ -203,7 +202,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
throw new IllegalArgumentException(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0");
}
final TimeValue actualRetryTimeout = retryTimeout == null ? DEFAULT_RETRY_TIMEOUT : retryTimeout;
final TimeValue actualRetryTimeout = maxRetryDelay == null ? DEFAULT_RETRY_TIMEOUT : maxRetryDelay;
final TimeValue actualIdleShardRetryDelay = idleShardRetryDelay == null ? DEFAULT_IDLE_SHARD_RETRY_DELAY : idleShardRetryDelay;
this.leaderIndex = leaderIndex;
@ -213,7 +212,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
this.maxOperationSizeInBytes = actualMaxOperationSizeInBytes;
this.maxConcurrentWriteBatches = actualMaxConcurrentWriteBatches;
this.maxWriteBufferSize = actualMaxWriteBufferSize;
this.retryTimeout = actualRetryTimeout;
this.maxRetryDelay = actualRetryTimeout;
this.idleShardRetryDelay = actualIdleShardRetryDelay;
}
@ -236,7 +235,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
maxOperationSizeInBytes = in.readVLong();
maxConcurrentWriteBatches = in.readVInt();
maxWriteBufferSize = in.readVInt();
retryTimeout = in.readOptionalTimeValue();
maxRetryDelay = in.readOptionalTimeValue();
idleShardRetryDelay = in.readOptionalTimeValue();
}
@ -250,7 +249,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
out.writeVLong(maxOperationSizeInBytes);
out.writeVInt(maxConcurrentWriteBatches);
out.writeVInt(maxWriteBufferSize);
out.writeOptionalTimeValue(retryTimeout);
out.writeOptionalTimeValue(maxRetryDelay);
out.writeOptionalTimeValue(idleShardRetryDelay);
}
@ -265,7 +264,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
}
builder.endObject();
@ -282,7 +281,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
maxOperationSizeInBytes == request.maxOperationSizeInBytes &&
maxConcurrentWriteBatches == request.maxConcurrentWriteBatches &&
maxWriteBufferSize == request.maxWriteBufferSize &&
Objects.equals(retryTimeout, request.retryTimeout) &&
Objects.equals(maxRetryDelay, request.maxRetryDelay) &&
Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay) &&
Objects.equals(leaderIndex, request.leaderIndex) &&
Objects.equals(followerIndex, request.followerIndex);
@ -298,7 +297,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
maxOperationSizeInBytes,
maxConcurrentWriteBatches,
maxWriteBufferSize,
retryTimeout,
maxRetryDelay,
idleShardRetryDelay
);
}