[RECOVERY] Make recovery delay configurable
Today we wait 500ms before we retry a recovery if the target node is not ready. This happens if the source starts the recovery before the target has processed the clusterstate moving the target shard into the right state. This can cause a 500ms delay each time it happens while the shard is ready way earlier on the target node. This commit makes this delay configurable to mainly speed up test processing and shard allocation in tests.
This commit is contained in:
parent
f20f6ffe22
commit
c1edcaf388
|
@ -78,6 +78,7 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
|
|||
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, Validator.POSITIVE_INTEGER);
|
||||
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, Validator.POSITIVE_INTEGER);
|
||||
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
|
||||
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY, Validator.TIME_NON_NEGATIVE);
|
||||
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_SIZE_PER_SEC, Validator.BYTES_SIZE);
|
||||
clusterDynamicSettings.addDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*");
|
||||
clusterDynamicSettings.addDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, Validator.INTEGER);
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -45,6 +46,7 @@ public class RecoverySettings extends AbstractComponent {
|
|||
public static final String INDICES_RECOVERY_CONCURRENT_STREAMS = "indices.recovery.concurrent_streams";
|
||||
public static final String INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS = "indices.recovery.concurrent_small_file_streams";
|
||||
public static final String INDICES_RECOVERY_MAX_BYTES_PER_SEC = "indices.recovery.max_bytes_per_sec";
|
||||
public static final String INDICES_RECOVERY_RETRY_DELAY = "indices.recovery.retry_delay";
|
||||
|
||||
public static final long SMALL_FILE_CUTOFF_BYTES = ByteSizeValue.parseBytesSizeValue("5mb").bytes();
|
||||
|
||||
|
@ -67,6 +69,7 @@ public class RecoverySettings extends AbstractComponent {
|
|||
|
||||
private volatile ByteSizeValue maxBytesPerSec;
|
||||
private volatile SimpleRateLimiter rateLimiter;
|
||||
private volatile TimeValue retryDelay;
|
||||
|
||||
@Inject
|
||||
public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) {
|
||||
|
@ -76,6 +79,7 @@ public class RecoverySettings extends AbstractComponent {
|
|||
this.translogOps = componentSettings.getAsInt("translog_ops", settings.getAsInt("index.shard.recovery.translog_ops", 1000));
|
||||
this.translogSize = componentSettings.getAsBytesSize("translog_size", settings.getAsBytesSize("index.shard.recovery.translog_size", new ByteSizeValue(512, ByteSizeUnit.KB)));
|
||||
this.compress = componentSettings.getAsBoolean("compress", true);
|
||||
this.retryDelay = componentSettings.getAsTime("retry_delay", TimeValue.timeValueMillis(500));
|
||||
|
||||
this.concurrentStreams = componentSettings.getAsInt("concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 3));
|
||||
this.concurrentStreamPool = EsExecutors.newScaling(0, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
|
||||
|
@ -131,6 +135,8 @@ public class RecoverySettings extends AbstractComponent {
|
|||
return rateLimiter;
|
||||
}
|
||||
|
||||
public TimeValue retryDelay() { return retryDelay; }
|
||||
|
||||
class ApplySettings implements NodeSettingsService.Listener {
|
||||
@Override
|
||||
public void onRefreshSettings(Settings settings) {
|
||||
|
@ -184,6 +190,11 @@ public class RecoverySettings extends AbstractComponent {
|
|||
RecoverySettings.this.concurrentSmallFileStreams = concurrentSmallFileStreams;
|
||||
RecoverySettings.this.concurrentSmallFileStreamPool.setMaximumPoolSize(concurrentSmallFileStreams);
|
||||
}
|
||||
final TimeValue retryDelay = settings.getAsTime(INDICES_RECOVERY_RETRY_DELAY, RecoverySettings.this.retryDelay);
|
||||
if (retryDelay.equals(RecoverySettings.this.retryDelay) == false) {
|
||||
logger.info("updating [] from [{}] to [{}]",INDICES_RECOVERY_RETRY_DELAY, RecoverySettings.this.retryDelay, retryDelay);
|
||||
RecoverySettings.this.retryDelay = retryDelay;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -218,12 +218,12 @@ public class RecoveryTarget extends AbstractComponent {
|
|||
|
||||
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
|
||||
// if the target is not ready yet, retry
|
||||
retryRecovery(recoveryStatus.recoveryId(), TimeValue.timeValueMillis(500));
|
||||
retryRecovery(recoveryStatus.recoveryId(), recoverySettings.retryDelay());
|
||||
return;
|
||||
}
|
||||
|
||||
if (cause instanceof DelayRecoveryException) {
|
||||
retryRecovery(recoveryStatus.recoveryId(), TimeValue.timeValueMillis(500));
|
||||
retryRecovery(recoveryStatus.recoveryId(), recoverySettings.retryDelay());
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -303,10 +303,12 @@ public final class InternalTestCluster extends TestCluster {
|
|||
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, RandomInts.randomIntBetween(random, 10, 15));
|
||||
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, RandomInts.randomIntBetween(random, 10, 15));
|
||||
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, RandomInts.randomIntBetween(random, 5, 10));
|
||||
builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 10, 25))); // more shared - we need to retry more often
|
||||
} else if (random.nextBoolean()) {
|
||||
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, RandomInts.randomIntBetween(random, 3, 6));
|
||||
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, RandomInts.randomIntBetween(random, 3, 6));
|
||||
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, RandomInts.randomIntBetween(random, 2, 5));
|
||||
builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 100)));
|
||||
}
|
||||
defaultSettings = builder.build();
|
||||
executor = EsExecutors.newCached(0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName));
|
||||
|
|
Loading…
Reference in New Issue