Core: fix index throttling to notice a change to max_merge_count on the next merge start/finish
Previously such changes were never noticed, and index throttling kept kicking in at the original setting from startup. Closes #8136 Closes #8132
This commit is contained in:
parent
37e606819c
commit
f444678dba
|
@ -272,7 +272,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
try {
|
try {
|
||||||
this.indexWriter = createWriter();
|
this.indexWriter = createWriter();
|
||||||
mergeScheduler.removeListener(this.throttle);
|
mergeScheduler.removeListener(this.throttle);
|
||||||
this.throttle = new IndexThrottle(mergeScheduler.getMaxMerges(), logger);
|
this.throttle = new IndexThrottle(mergeScheduler, logger);
|
||||||
mergeScheduler.addListener(throttle);
|
mergeScheduler.addListener(throttle);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
maybeFailEngine(e, "start");
|
maybeFailEngine(e, "start");
|
||||||
|
@ -844,7 +844,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
currentIndexWriter().close(false);
|
currentIndexWriter().close(false);
|
||||||
indexWriter = createWriter();
|
indexWriter = createWriter();
|
||||||
mergeScheduler.removeListener(this.throttle);
|
mergeScheduler.removeListener(this.throttle);
|
||||||
this.throttle = new IndexThrottle(mergeScheduler.getMaxMerges(), this.logger);
|
this.throttle = new IndexThrottle(mergeScheduler, this.logger);
|
||||||
mergeScheduler.addListener(throttle);
|
mergeScheduler.addListener(throttle);
|
||||||
// commit on a just opened writer will commit even if there are no changes done to it
|
// commit on a just opened writer will commit even if there are no changes done to it
|
||||||
// we rely on that for the commit data translog id key
|
// we rely on that for the commit data translog id key
|
||||||
|
@ -1722,13 +1722,13 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
private final InternalLock lockReference = new InternalLock(new ReentrantLock());
|
private final InternalLock lockReference = new InternalLock(new ReentrantLock());
|
||||||
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
|
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
|
||||||
private final AtomicBoolean isThrottling = new AtomicBoolean();
|
private final AtomicBoolean isThrottling = new AtomicBoolean();
|
||||||
private final int maxNumMerges;
|
private final MergeSchedulerProvider mergeScheduler;
|
||||||
private final ESLogger logger;
|
private final ESLogger logger;
|
||||||
|
|
||||||
private volatile InternalLock lock = NOOP_LOCK;
|
private volatile InternalLock lock = NOOP_LOCK;
|
||||||
|
|
||||||
public IndexThrottle(int maxNumMerges, ESLogger logger) {
|
public IndexThrottle(MergeSchedulerProvider mergeScheduler, ESLogger logger) {
|
||||||
this.maxNumMerges = maxNumMerges;
|
this.mergeScheduler = mergeScheduler;
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1738,6 +1738,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void beforeMerge(OnGoingMerge merge) {
|
public synchronized void beforeMerge(OnGoingMerge merge) {
|
||||||
|
int maxNumMerges = mergeScheduler.getMaxMerges();
|
||||||
if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
|
if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
|
||||||
if (isThrottling.getAndSet(true) == false) {
|
if (isThrottling.getAndSet(true) == false) {
|
||||||
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
|
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
|
||||||
|
@ -1748,6 +1749,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void afterMerge(OnGoingMerge merge) {
|
public synchronized void afterMerge(OnGoingMerge merge) {
|
||||||
|
int maxNumMerges = mergeScheduler.getMaxMerges();
|
||||||
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
|
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
|
||||||
if (isThrottling.getAndSet(false)) {
|
if (isThrottling.getAndSet(false)) {
|
||||||
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
|
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
|
||||||
|
|
Loading…
Reference in New Issue