Fix to actually throttle indexing when throttling is activated (#61768)
In #22721, the decision to throttle indexing was inadvertently flipped, so that we until this commit throttle indexing during recovery but never throttle user initiated indexing requests. This commit fixes that to throttle user initiated indexing requests and never throttle recovery requests. Closes #61959
This commit is contained in:
parent
322a6b3655
commit
c9baadd19b
|
@ -283,6 +283,13 @@ public abstract class Engine implements Closeable {
|
||||||
boolean isThrottled() {
|
boolean isThrottled() {
|
||||||
return lock != NOOP_LOCK;
|
return lock != NOOP_LOCK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only
|
||||||
|
if(isThrottled()) {
|
||||||
|
return lock.isHeldByCurrentThread();
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -880,7 +880,7 @@ public class InternalEngine extends Engine {
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
|
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
|
||||||
try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
|
try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
|
||||||
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
|
Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}) {
|
||||||
lastWriteNanos = index.startTime();
|
lastWriteNanos = index.startTime();
|
||||||
/* A NOTE ABOUT APPEND ONLY OPTIMIZATIONS:
|
/* A NOTE ABOUT APPEND ONLY OPTIMIZATIONS:
|
||||||
* if we have an autoGeneratedID that comes into the engine we can potentially optimize
|
* if we have an autoGeneratedID that comes into the engine we can potentially optimize
|
||||||
|
@ -2308,6 +2308,10 @@ public class InternalEngine extends Engine {
|
||||||
return throttle.isThrottled();
|
return throttle.isThrottled();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only
|
||||||
|
return throttle.throttleLockIsHeldByCurrentThread();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getIndexThrottleTimeInMillis() {
|
public long getIndexThrottleTimeInMillis() {
|
||||||
return throttle.getThrottleTimeInMillis();
|
return throttle.getThrottleTimeInMillis();
|
||||||
|
|
|
@ -210,7 +210,10 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||||
import static org.hamcrest.Matchers.not;
|
import static org.hamcrest.Matchers.not;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
import static org.mockito.Mockito.atLeastOnce;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class InternalEngineTests extends EngineTestCase {
|
public class InternalEngineTests extends EngineTestCase {
|
||||||
|
@ -6153,6 +6156,33 @@ public class InternalEngineTests extends EngineTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testIndexThrottling() throws Exception {
|
||||||
|
final Engine.Index indexWithThrottlingCheck = spy(indexForDoc(createParsedDoc("1", null)));
|
||||||
|
final Engine.Index indexWithoutThrottlingCheck = spy(indexForDoc(createParsedDoc("2", null)));
|
||||||
|
doAnswer(invocation -> {
|
||||||
|
try {
|
||||||
|
assertTrue(engine.throttleLockIsHeldByCurrentThread());
|
||||||
|
return invocation.callRealMethod();
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
}).when(indexWithThrottlingCheck).startTime();
|
||||||
|
doAnswer(invocation -> {
|
||||||
|
try {
|
||||||
|
assertFalse(engine.throttleLockIsHeldByCurrentThread());
|
||||||
|
return invocation.callRealMethod();
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
}).when(indexWithoutThrottlingCheck).startTime();
|
||||||
|
engine.activateThrottling();
|
||||||
|
engine.index(indexWithThrottlingCheck);
|
||||||
|
engine.deactivateThrottling();
|
||||||
|
engine.index(indexWithoutThrottlingCheck);
|
||||||
|
verify(indexWithThrottlingCheck, atLeastOnce()).startTime();
|
||||||
|
verify(indexWithoutThrottlingCheck, atLeastOnce()).startTime();
|
||||||
|
}
|
||||||
|
|
||||||
public void testRealtimeGetOnlyRefreshIfNeeded() throws Exception {
|
public void testRealtimeGetOnlyRefreshIfNeeded() throws Exception {
|
||||||
final AtomicInteger refreshCount = new AtomicInteger();
|
final AtomicInteger refreshCount = new AtomicInteger();
|
||||||
final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() {
|
final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() {
|
||||||
|
|
Loading…
Reference in New Issue