diff --git a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java index 972a42ea8de..6933e629cae 100644 --- a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java @@ -78,6 +78,7 @@ public class ClusterDynamicSettingsModule extends AbstractModule { clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, Validator.POSITIVE_INTEGER); clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, Validator.POSITIVE_INTEGER); clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE); + clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY, Validator.TIME_NON_NEGATIVE); clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_SIZE_PER_SEC, Validator.BYTES_SIZE); clusterDynamicSettings.addDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*"); clusterDynamicSettings.addDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, Validator.INTEGER); diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 6d59e9733b6..6f964c873f1 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; @@ -45,6 +46,7 @@ public class RecoverySettings extends AbstractComponent { public static final String INDICES_RECOVERY_CONCURRENT_STREAMS = "indices.recovery.concurrent_streams"; public static final String INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS = "indices.recovery.concurrent_small_file_streams"; public static final String INDICES_RECOVERY_MAX_BYTES_PER_SEC = "indices.recovery.max_bytes_per_sec"; + public static final String INDICES_RECOVERY_RETRY_DELAY = "indices.recovery.retry_delay"; public static final long SMALL_FILE_CUTOFF_BYTES = ByteSizeValue.parseBytesSizeValue("5mb").bytes(); @@ -67,6 +69,7 @@ public class RecoverySettings extends AbstractComponent { private volatile ByteSizeValue maxBytesPerSec; private volatile SimpleRateLimiter rateLimiter; + private volatile TimeValue retryDelay; @Inject public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) { @@ -76,6 +79,7 @@ public class RecoverySettings extends AbstractComponent { this.translogOps = componentSettings.getAsInt("translog_ops", settings.getAsInt("index.shard.recovery.translog_ops", 1000)); this.translogSize = componentSettings.getAsBytesSize("translog_size", settings.getAsBytesSize("index.shard.recovery.translog_size", new ByteSizeValue(512, ByteSizeUnit.KB))); this.compress = componentSettings.getAsBoolean("compress", true); + this.retryDelay = componentSettings.getAsTime("retry_delay", TimeValue.timeValueMillis(500)); this.concurrentStreams = componentSettings.getAsInt("concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 3)); this.concurrentStreamPool = EsExecutors.newScaling(0, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); @@ -131,6 +135,8 @@ public class RecoverySettings extends AbstractComponent { return rateLimiter; } + public TimeValue retryDelay() { return retryDelay; } + class ApplySettings implements NodeSettingsService.Listener { @Override public void onRefreshSettings(Settings settings) { @@ -184,6 +190,11 @@ public class RecoverySettings extends AbstractComponent { RecoverySettings.this.concurrentSmallFileStreams = concurrentSmallFileStreams; RecoverySettings.this.concurrentSmallFileStreamPool.setMaximumPoolSize(concurrentSmallFileStreams); } + final TimeValue retryDelay = settings.getAsTime(INDICES_RECOVERY_RETRY_DELAY, RecoverySettings.this.retryDelay); + if (retryDelay.equals(RecoverySettings.this.retryDelay) == false) { + logger.info("updating [] from [{}] to [{}]",INDICES_RECOVERY_RETRY_DELAY, RecoverySettings.this.retryDelay, retryDelay); + RecoverySettings.this.retryDelay = retryDelay; + } } } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 99cb2e7dd6e..e886523632f 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -218,12 +218,12 @@ public class RecoveryTarget extends AbstractComponent { if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) { // if the target is not ready yet, retry - retryRecovery(recoveryStatus.recoveryId(), TimeValue.timeValueMillis(500)); + retryRecovery(recoveryStatus.recoveryId(), recoverySettings.retryDelay()); return; } if (cause instanceof DelayRecoveryException) { - retryRecovery(recoveryStatus.recoveryId(), TimeValue.timeValueMillis(500)); + retryRecovery(recoveryStatus.recoveryId(), recoverySettings.retryDelay()); return; } diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 67f318cc006..183d887f3f8 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -303,10 +303,12 @@ public final class InternalTestCluster extends TestCluster { builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, RandomInts.randomIntBetween(random, 10, 15)); builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, RandomInts.randomIntBetween(random, 10, 15)); builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, RandomInts.randomIntBetween(random, 5, 10)); + builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 10, 25))); // more shared - we need to retry more often } else if (random.nextBoolean()) { builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, RandomInts.randomIntBetween(random, 3, 6)); builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, RandomInts.randomIntBetween(random, 3, 6)); builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, RandomInts.randomIntBetween(random, 2, 5)); + builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 100))); } defaultSettings = builder.build(); executor = EsExecutors.newCached(0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName));