diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index a8bdfbf6d77..60c5d4d9e86 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -272,7 +272,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin try { this.indexWriter = createWriter(); mergeScheduler.removeListener(this.throttle); - this.throttle = new IndexThrottle(mergeScheduler.getMaxMerges(), logger); + this.throttle = new IndexThrottle(mergeScheduler, logger); mergeScheduler.addListener(throttle); } catch (IOException e) { maybeFailEngine(e, "start"); @@ -844,7 +844,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin currentIndexWriter().close(false); indexWriter = createWriter(); mergeScheduler.removeListener(this.throttle); - this.throttle = new IndexThrottle(mergeScheduler.getMaxMerges(), this.logger); + this.throttle = new IndexThrottle(mergeScheduler, this.logger); mergeScheduler.addListener(throttle); // commit on a just opened writer will commit even if there are no changes done to it // we rely on that for the commit data translog id key @@ -1722,13 +1722,13 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin private final InternalLock lockReference = new InternalLock(new ReentrantLock()); private final AtomicInteger numMergesInFlight = new AtomicInteger(0); private final AtomicBoolean isThrottling = new AtomicBoolean(); - private final int maxNumMerges; + private final MergeSchedulerProvider mergeScheduler; private final ESLogger logger; private volatile InternalLock lock = NOOP_LOCK; - public IndexThrottle(int maxNumMerges, ESLogger logger) { - this.maxNumMerges = maxNumMerges; + public IndexThrottle(MergeSchedulerProvider mergeScheduler, ESLogger logger) { + this.mergeScheduler = mergeScheduler; this.logger = logger; } @@ -1738,6 +1738,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin @Override public synchronized void beforeMerge(OnGoingMerge merge) { + int maxNumMerges = mergeScheduler.getMaxMerges(); if (numMergesInFlight.incrementAndGet() > maxNumMerges) { if (isThrottling.getAndSet(true) == false) { logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); @@ -1748,6 +1749,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin @Override public synchronized void afterMerge(OnGoingMerge merge) { + int maxNumMerges = mergeScheduler.getMaxMerges(); if (numMergesInFlight.decrementAndGet() < maxNumMerges) { if (isThrottling.getAndSet(false)) { logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);