From c9baadd19bf8f37db371ce65c9b657a7e910d9a8 Mon Sep 17 00:00:00 2001 From: nitin2goyal Date: Fri, 2 Oct 2020 18:40:35 +0530 Subject: [PATCH] Fix to actually throttle indexing when throttling is activated (#61768) In #22721, the decision to throttle indexing was inadvertently flipped, so that we until this commit throttle indexing during recovery but never throttle user initiated indexing requests. This commit fixes that to throttle user initiated indexing requests and never throttle recovery requests. Closes #61959 --- .../elasticsearch/index/engine/Engine.java | 7 +++++ .../index/engine/InternalEngine.java | 6 +++- .../index/engine/InternalEngineTests.java | 30 +++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index c9f9ecbe25e..4d5757db52b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -283,6 +283,13 @@ public abstract class Engine implements Closeable { boolean isThrottled() { return lock != NOOP_LOCK; } + + boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only + if(isThrottled()) { + return lock.isHeldByCurrentThread(); + } + return false; + } } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a2d27e14983..3e2c3944245 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -880,7 +880,7 @@ public class InternalEngine extends Engine { ensureOpen(); assert assertIncomingSequenceNumber(index.origin(), index.seqNo()); try (Releasable ignored = versionMap.acquireLock(index.uid().bytes()); - Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) { + Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}) { lastWriteNanos = index.startTime(); /* A NOTE ABOUT APPEND ONLY OPTIMIZATIONS: * if we have an autoGeneratedID that comes into the engine we can potentially optimize @@ -2308,6 +2308,10 @@ public class InternalEngine extends Engine { return throttle.isThrottled(); } + boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only + return throttle.throttleLockIsHeldByCurrentThread(); + } + @Override public long getIndexThrottleTimeInMillis() { return throttle.getThrottleTimeInMillis(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a2483f13857..169ec24caf8 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -210,7 +210,10 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class InternalEngineTests extends EngineTestCase { @@ -6153,6 +6156,33 @@ public class InternalEngineTests extends EngineTestCase { } } + public void testIndexThrottling() throws Exception { + final Engine.Index indexWithThrottlingCheck = spy(indexForDoc(createParsedDoc("1", null))); + final Engine.Index indexWithoutThrottlingCheck = spy(indexForDoc(createParsedDoc("2", null))); + doAnswer(invocation -> { + try { + assertTrue(engine.throttleLockIsHeldByCurrentThread()); + return invocation.callRealMethod(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + }).when(indexWithThrottlingCheck).startTime(); + doAnswer(invocation -> { + try { + assertFalse(engine.throttleLockIsHeldByCurrentThread()); + return invocation.callRealMethod(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + }).when(indexWithoutThrottlingCheck).startTime(); + engine.activateThrottling(); + engine.index(indexWithThrottlingCheck); + engine.deactivateThrottling(); + engine.index(indexWithoutThrottlingCheck); + verify(indexWithThrottlingCheck, atLeastOnce()).startTime(); + verify(indexWithoutThrottlingCheck, atLeastOnce()).startTime(); + } + public void testRealtimeGetOnlyRefreshIfNeeded() throws Exception { final AtomicInteger refreshCount = new AtomicInteger(); final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() {