diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 57cdd2b8123..d13f48929a9 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -677,7 +677,8 @@ public class InternalEngine extends Engine { trackTranslogLocation.set(true); } } - refresh("realtime_get", SearcherScope.INTERNAL, true); + assert versionValue.seqNo >= 0 : versionValue; + refreshIfNeeded("realtime_get", versionValue.seqNo); } scope = SearcherScope.INTERNAL; } else { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5619b26b7de..7a9d10d3418 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -165,6 +165,7 @@ import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Phaser; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -6094,4 +6095,58 @@ public class InternalEngineTests extends EngineTestCase { } } + public void testRealtimeGetOnlyRefreshIfNeeded() throws Exception { + final AtomicInteger refreshCount = new AtomicInteger(); + final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() { + @Override + public void beforeRefresh() { + + } + + @Override + public void afterRefresh(boolean didRefresh) { + if (didRefresh) { + refreshCount.incrementAndGet(); + } + } + }; + try (Store store = createStore()) { + final EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, + refreshListener, null, null, engine.config().getCircuitBreakerService()); + try (InternalEngine engine = createEngine(config)) { + int numDocs = randomIntBetween(10, 100); + Set ids = new HashSet<>(); + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(i); + engine.index(indexForDoc(createParsedDoc(id, null))); + ids.add(id); + } + final int refreshCountBeforeGet = refreshCount.get(); + Thread[] getters = new Thread[randomIntBetween(1, 4)]; + Phaser phaser = new Phaser(getters.length + 1); + for (int t = 0; t < getters.length; t++) { + getters[t] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + int iters = randomIntBetween(1, 10); + for (int i = 0; i < iters; i++) { + ParsedDocument doc = createParsedDoc(randomFrom(ids), null); + try (Engine.GetResult getResult = engine.get(newGet(true, doc), engine::acquireSearcher)) { + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.docIdAndVersion(), notNullValue()); + } + } + }); + getters[t].start(); + } + phaser.arriveAndAwaitAdvance(); + for (int i = 0; i < numDocs; i++) { + engine.index(indexForDoc(createParsedDoc("more-" + i, null))); + } + for (Thread getter : getters) { + getter.join(); + } + assertThat(refreshCount.get(), lessThanOrEqualTo(refreshCountBeforeGet + 1)); + } + } + } }