From 113ea1bb1b48ca38aa53def934fad43db5794cd6 Mon Sep 17 00:00:00 2001 From: kimchy Date: Sat, 21 Aug 2010 15:37:12 +0300 Subject: [PATCH] don't throttle gateway recovery based on peer recoveries (gateway recoveries are important to do as fast as possible). still throttle peer recoveries based on both. --- .../gateway/IndexShardGatewayService.java | 4 +- .../index/shard/recovery/RecoverySource.java | 4 +- .../index/shard/recovery/RecoveryTarget.java | 4 +- .../recovery/throttler/RecoveryThrottler.java | 67 ++++++++++++++----- 4 files changed, 57 insertions(+), 22 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 4a74435f19b..ae257684134 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -147,7 +147,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem recoveryStatus.updateStage(RecoveryStatus.Stage.INIT); // we know we are on a thread, we can spin till we can engage in recovery - while (!recoveryThrottler.tryRecovery(shardId, "gateway")) { + while (!recoveryThrottler.tryGatewayRecovery(shardId, "gateway")) { if (indexShard.state() == IndexShardState.CLOSED) { listener.onIgnoreRecovery("ignoring recovery while waiting on retry, closed"); return; @@ -215,7 +215,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem } catch (Exception e) { listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "failed recovery", e)); } finally { - recoveryThrottler.recoveryDone(shardId, "gateway"); + recoveryThrottler.recoveryGatewayDone(shardId, "gateway"); } } }); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java index ab9fe251ca7..0099a4951d7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java @@ -94,7 +94,7 @@ public class RecoverySource extends AbstractComponent { } private RecoveryResponse recover(final StartRecoveryRequest request) { - if (!recoveryThrottler.tryRecovery(request.shardId(), "peer recovery source")) { + if (!recoveryThrottler.tryPeerRecovery(request.shardId(), "peer recovery source")) { RecoveryResponse retry = new RecoveryResponse(); retry.retry = true; return retry; @@ -282,7 +282,7 @@ public class RecoverySource extends AbstractComponent { }); return response; } finally { - recoveryThrottler.recoveryDone(request.shardId(), "peer recovery source"); + recoveryThrottler.recoveryPeerDone(request.shardId(), "peer recovery source"); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java index 1429276fcd5..9160a269bc0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java @@ -167,7 +167,7 @@ public class RecoveryTarget extends AbstractComponent { onGoingRecoveries.put(request.shardId(), recovery); } - if (!recoveryThrottler.tryRecovery(shard.shardId(), "peer recovery target")) { + if (!recoveryThrottler.tryPeerRecovery(shard.shardId(), "peer recovery target")) { recovery.stage = RecoveryStatus.Stage.THROTTLE; recovery.retryTime = System.currentTimeMillis() - recovery.startTime; listener.onRetryRecovery(recoveryThrottler.throttleInterval()); @@ -252,7 +252,7 @@ public class RecoveryTarget extends AbstractComponent { listener.onRecoveryFailure(new RecoveryFailedException(request, e), true); } finally { - recoveryThrottler.recoveryDone(shard.shardId(), "peer recovery target"); + recoveryThrottler.recoveryPeerDone(shard.shardId(), "peer recovery target"); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/throttler/RecoveryThrottler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/throttler/RecoveryThrottler.java index ed79041da41..fda7729a155 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/throttler/RecoveryThrottler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/throttler/RecoveryThrottler.java @@ -26,6 +26,8 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; /** + * Recovery Throttler allows to throttle recoveries (both gateway and peer). + * * @author kimchy (shay.banon) */ public class RecoveryThrottler extends AbstractComponent { @@ -36,7 +38,9 @@ public class RecoveryThrottler extends AbstractComponent { private final TimeValue throttleInterval; - private volatile int onGoingRecoveries = 0; + private volatile int onGoingGatewayRecoveries = 0; + + private volatile int onGoingPeerRecoveries = 0; private final int concurrentStreams; @@ -47,41 +51,72 @@ public class RecoveryThrottler extends AbstractComponent { @Inject public RecoveryThrottler(Settings settings) { super(settings); - int defaultConcurrent = Runtime.getRuntime().availableProcessors() + 1; + int defaultConcurrentRecoveries = Runtime.getRuntime().availableProcessors() + 1; // tap it at 10 (is it a good number?) - if (defaultConcurrent > 10) { - defaultConcurrent = 10; - } else if (defaultConcurrent < 3) { - defaultConcurrent = 3; + if (defaultConcurrentRecoveries > 10) { + defaultConcurrentRecoveries = 10; + } else if (defaultConcurrentRecoveries < 3) { + defaultConcurrentRecoveries = 3; } - concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", defaultConcurrent); - concurrentStreams = componentSettings.getAsInt("concurrent_streams", defaultConcurrent); + concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", defaultConcurrentRecoveries); + concurrentStreams = componentSettings.getAsInt("concurrent_streams", defaultConcurrentRecoveries * 2); throttleInterval = componentSettings.getAsTime("interval", TimeValue.timeValueMillis(100)); logger.debug("concurrent_recoveries [{}], concurrent_streams [{}] interval [{}]", concurrentRecoveries, concurrentStreams, throttleInterval); } - public boolean tryRecovery(ShardId shardId, String reason) { + /** + * Try and check if gateway recovery is allowed. Only takes the on going gateway recoveries into account. Ignore + * on going peer recoveries so peer recovery will not block a much more important gateway recovery. + */ + public boolean tryGatewayRecovery(ShardId shardId, String reason) { synchronized (concurrentRecoveryMutex) { - if (onGoingRecoveries + 1 > concurrentRecoveries) { + if ((onGoingGatewayRecoveries + 1) > concurrentRecoveries) { return false; } - onGoingRecoveries++; - logger.trace("Recovery allowed for [{}], on going [{}], allowed [{}], reason [{}]", shardId, onGoingRecoveries, concurrentRecoveries, reason); + onGoingGatewayRecoveries++; + logger.trace("Recovery (gateway) allowed for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason); return true; } } - public void recoveryDone(ShardId shardId, String reason) { + /** + * Mark gateway recvoery as done. + */ + public void recoveryGatewayDone(ShardId shardId, String reason) { synchronized (concurrentRecoveryMutex) { - --onGoingRecoveries; - logger.trace("Recovery done for [{}], on going [{}], allowed [{}], reason [{}]", shardId, onGoingRecoveries, concurrentRecoveries, reason); + --onGoingGatewayRecoveries; + logger.trace("Recovery (gateway) done for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason); + } + } + + /** + * Try and check if peer recovery is allowed. Takes into account both on going gateway recovery and peer recovery. + */ + public boolean tryPeerRecovery(ShardId shardId, String reason) { + synchronized (concurrentRecoveryMutex) { + if ((onGoingGatewayRecoveries + onGoingPeerRecoveries + 1) > concurrentRecoveries) { + return false; + } + onGoingPeerRecoveries++; + logger.trace("Recovery (peer) allowed for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason); + return true; + } + } + + /** + * Mark peer recovery as done. + */ + public void recoveryPeerDone(ShardId shardId, String reason) { + synchronized (concurrentRecoveryMutex) { + --onGoingPeerRecoveries; + logger.trace("Recovery (peer) done for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason); } } public int onGoingRecoveries() { - return onGoingRecoveries; + return onGoingGatewayRecoveries + onGoingPeerRecoveries; } public boolean tryStream(ShardId shardId, String streamName) {