don't throttle gateway recovery based on peer recoveries (gateway recoveries are important to do as fast as possible). still throttle peer recoveries based on both.
This commit is contained in:
parent
e12bdd9faf
commit
113ea1bb1b
|
@ -147,7 +147,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
|
|||
recoveryStatus.updateStage(RecoveryStatus.Stage.INIT);
|
||||
|
||||
// we know we are on a thread, we can spin till we can engage in recovery
|
||||
while (!recoveryThrottler.tryRecovery(shardId, "gateway")) {
|
||||
while (!recoveryThrottler.tryGatewayRecovery(shardId, "gateway")) {
|
||||
if (indexShard.state() == IndexShardState.CLOSED) {
|
||||
listener.onIgnoreRecovery("ignoring recovery while waiting on retry, closed");
|
||||
return;
|
||||
|
@ -215,7 +215,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
|
|||
} catch (Exception e) {
|
||||
listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "failed recovery", e));
|
||||
} finally {
|
||||
recoveryThrottler.recoveryDone(shardId, "gateway");
|
||||
recoveryThrottler.recoveryGatewayDone(shardId, "gateway");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -94,7 +94,7 @@ public class RecoverySource extends AbstractComponent {
|
|||
}
|
||||
|
||||
private RecoveryResponse recover(final StartRecoveryRequest request) {
|
||||
if (!recoveryThrottler.tryRecovery(request.shardId(), "peer recovery source")) {
|
||||
if (!recoveryThrottler.tryPeerRecovery(request.shardId(), "peer recovery source")) {
|
||||
RecoveryResponse retry = new RecoveryResponse();
|
||||
retry.retry = true;
|
||||
return retry;
|
||||
|
@ -282,7 +282,7 @@ public class RecoverySource extends AbstractComponent {
|
|||
});
|
||||
return response;
|
||||
} finally {
|
||||
recoveryThrottler.recoveryDone(request.shardId(), "peer recovery source");
|
||||
recoveryThrottler.recoveryPeerDone(request.shardId(), "peer recovery source");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -167,7 +167,7 @@ public class RecoveryTarget extends AbstractComponent {
|
|||
onGoingRecoveries.put(request.shardId(), recovery);
|
||||
}
|
||||
|
||||
if (!recoveryThrottler.tryRecovery(shard.shardId(), "peer recovery target")) {
|
||||
if (!recoveryThrottler.tryPeerRecovery(shard.shardId(), "peer recovery target")) {
|
||||
recovery.stage = RecoveryStatus.Stage.THROTTLE;
|
||||
recovery.retryTime = System.currentTimeMillis() - recovery.startTime;
|
||||
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
|
||||
|
@ -252,7 +252,7 @@ public class RecoveryTarget extends AbstractComponent {
|
|||
|
||||
listener.onRecoveryFailure(new RecoveryFailedException(request, e), true);
|
||||
} finally {
|
||||
recoveryThrottler.recoveryDone(shard.shardId(), "peer recovery target");
|
||||
recoveryThrottler.recoveryPeerDone(shard.shardId(), "peer recovery target");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,8 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
/**
|
||||
* Recovery Throttler allows to throttle recoveries (both gateway and peer).
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class RecoveryThrottler extends AbstractComponent {
|
||||
|
@ -36,7 +38,9 @@ public class RecoveryThrottler extends AbstractComponent {
|
|||
|
||||
private final TimeValue throttleInterval;
|
||||
|
||||
private volatile int onGoingRecoveries = 0;
|
||||
private volatile int onGoingGatewayRecoveries = 0;
|
||||
|
||||
private volatile int onGoingPeerRecoveries = 0;
|
||||
|
||||
private final int concurrentStreams;
|
||||
|
||||
|
@ -47,41 +51,72 @@ public class RecoveryThrottler extends AbstractComponent {
|
|||
@Inject public RecoveryThrottler(Settings settings) {
|
||||
super(settings);
|
||||
|
||||
int defaultConcurrent = Runtime.getRuntime().availableProcessors() + 1;
|
||||
int defaultConcurrentRecoveries = Runtime.getRuntime().availableProcessors() + 1;
|
||||
// tap it at 10 (is it a good number?)
|
||||
if (defaultConcurrent > 10) {
|
||||
defaultConcurrent = 10;
|
||||
} else if (defaultConcurrent < 3) {
|
||||
defaultConcurrent = 3;
|
||||
if (defaultConcurrentRecoveries > 10) {
|
||||
defaultConcurrentRecoveries = 10;
|
||||
} else if (defaultConcurrentRecoveries < 3) {
|
||||
defaultConcurrentRecoveries = 3;
|
||||
}
|
||||
|
||||
concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", defaultConcurrent);
|
||||
concurrentStreams = componentSettings.getAsInt("concurrent_streams", defaultConcurrent);
|
||||
concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", defaultConcurrentRecoveries);
|
||||
concurrentStreams = componentSettings.getAsInt("concurrent_streams", defaultConcurrentRecoveries * 2);
|
||||
throttleInterval = componentSettings.getAsTime("interval", TimeValue.timeValueMillis(100));
|
||||
|
||||
logger.debug("concurrent_recoveries [{}], concurrent_streams [{}] interval [{}]", concurrentRecoveries, concurrentStreams, throttleInterval);
|
||||
}
|
||||
|
||||
public boolean tryRecovery(ShardId shardId, String reason) {
|
||||
/**
|
||||
* Try and check if gateway recovery is allowed. Only takes the on going gateway recoveries into account. Ignore
|
||||
* on going peer recoveries so peer recovery will not block a much more important gateway recovery.
|
||||
*/
|
||||
public boolean tryGatewayRecovery(ShardId shardId, String reason) {
|
||||
synchronized (concurrentRecoveryMutex) {
|
||||
if (onGoingRecoveries + 1 > concurrentRecoveries) {
|
||||
if ((onGoingGatewayRecoveries + 1) > concurrentRecoveries) {
|
||||
return false;
|
||||
}
|
||||
onGoingRecoveries++;
|
||||
logger.trace("Recovery allowed for [{}], on going [{}], allowed [{}], reason [{}]", shardId, onGoingRecoveries, concurrentRecoveries, reason);
|
||||
onGoingGatewayRecoveries++;
|
||||
logger.trace("Recovery (gateway) allowed for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public void recoveryDone(ShardId shardId, String reason) {
|
||||
/**
|
||||
* Mark gateway recvoery as done.
|
||||
*/
|
||||
public void recoveryGatewayDone(ShardId shardId, String reason) {
|
||||
synchronized (concurrentRecoveryMutex) {
|
||||
--onGoingRecoveries;
|
||||
logger.trace("Recovery done for [{}], on going [{}], allowed [{}], reason [{}]", shardId, onGoingRecoveries, concurrentRecoveries, reason);
|
||||
--onGoingGatewayRecoveries;
|
||||
logger.trace("Recovery (gateway) done for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try and check if peer recovery is allowed. Takes into account both on going gateway recovery and peer recovery.
|
||||
*/
|
||||
public boolean tryPeerRecovery(ShardId shardId, String reason) {
|
||||
synchronized (concurrentRecoveryMutex) {
|
||||
if ((onGoingGatewayRecoveries + onGoingPeerRecoveries + 1) > concurrentRecoveries) {
|
||||
return false;
|
||||
}
|
||||
onGoingPeerRecoveries++;
|
||||
logger.trace("Recovery (peer) allowed for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark peer recovery as done.
|
||||
*/
|
||||
public void recoveryPeerDone(ShardId shardId, String reason) {
|
||||
synchronized (concurrentRecoveryMutex) {
|
||||
--onGoingPeerRecoveries;
|
||||
logger.trace("Recovery (peer) done for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason);
|
||||
}
|
||||
}
|
||||
|
||||
public int onGoingRecoveries() {
|
||||
return onGoingRecoveries;
|
||||
return onGoingGatewayRecoveries + onGoingPeerRecoveries;
|
||||
}
|
||||
|
||||
public boolean tryStream(ShardId shardId, String streamName) {
|
||||
|
|
Loading…
Reference in New Issue