diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3cbfbfaff83..c58e13d65de 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2563,12 +2563,19 @@ public class InternalEngine extends Engine { return lastRefreshedCheckpointListener.refreshedCheckpoint.get(); } + + private final Object refreshIfNeededMutex = new Object(); + /** * Refresh this engine **internally** iff the requesting seq_no is greater than the last refreshed checkpoint. */ protected final void refreshIfNeeded(String source, long requestingSeqNo) { if (lastRefreshedCheckpoint() < requestingSeqNo) { - refresh(source, SearcherScope.INTERNAL); + synchronized (refreshIfNeededMutex) { + if (lastRefreshedCheckpoint() < requestingSeqNo) { + refresh(source, SearcherScope.INTERNAL); + } + } } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index deb2f4c5496..379043fa939 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.engine; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.carrotsearch.randomizedtesting.generators.RandomNumbers; - import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -5049,6 +5048,60 @@ public class InternalEngineTests extends EngineTestCase { assertThat(engine.lastRefreshedCheckpoint(), equalTo(engine.getLocalCheckpoint())); } + public void testLuceneSnapshotRefreshesOnlyOnce() throws Exception { + final MapperService mapperService = createMapperService("test"); + final long maxSeqNo = randomLongBetween(10, 50); + final AtomicLong refreshCounter = new AtomicLong(); + try (Store store = createStore(); + InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), + null, + new ReferenceManager.RefreshListener() { + @Override + public void beforeRefresh() throws IOException { + refreshCounter.incrementAndGet(); + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + + } + }, null, () -> SequenceNumbers.NO_OPS_PERFORMED))) { + for (long seqNo = 0; seqNo <= maxSeqNo; seqNo++) { + final ParsedDocument doc = testParsedDocument("id_" + seqNo, null, testDocumentWithTextField("test"), + new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + engine.index(replicaIndexForDoc(doc, 1, seqNo, randomBoolean())); + } + + final long initialRefreshCount = refreshCounter.get(); + final Thread[] snapshotThreads = new Thread[between(1, 3)]; + CountDownLatch latch = new CountDownLatch(1); + for (int i = 0; i < snapshotThreads.length; i++) { + final long min = randomLongBetween(0, maxSeqNo - 5); + final long max = randomLongBetween(min, maxSeqNo); + snapshotThreads[i] = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + + @Override + protected void doRun() throws Exception { + latch.await(); + Translog.Snapshot changes = engine.newChangesSnapshot("test", mapperService, min, max, true); + changes.close(); + } + }); + snapshotThreads[i].start(); + } + latch.countDown(); + for (Thread thread : snapshotThreads) { + thread.join(); + } + assertThat(refreshCounter.get(), equalTo(initialRefreshCount + 1L)); + assertThat(engine.lastRefreshedCheckpoint(), equalTo(maxSeqNo)); + } + } + public void testAcquireSearcherOnClosingEngine() throws Exception { engine.close(); expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test")); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 3e563e6d538..bbe4dd268e5 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -573,7 +573,14 @@ public abstract class EngineTestCase extends ESTestCase { public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) { - IndexWriterConfig iwc = newIndexWriterConfig(); + return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, indexSort, globalCheckpointSupplier); + } + + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + ReferenceManager.RefreshListener externalRefreshListener, + ReferenceManager.RefreshListener internalRefreshListener, + Sort indexSort, LongSupplier globalCheckpointSupplier) { + IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); Engine.EventListener listener = new Engine.EventListener() { @Override @@ -581,12 +588,14 @@ public abstract class EngineTestCase extends ESTestCase { // we don't need to notify anybody in this test } }; - final List refreshListenerList = - refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); + final List extRefreshListenerList = + externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener); + final List intRefreshListenerList = + internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener); EngineConfig config = new EngineConfig(shardId, allocationId.getId(), threadPool, indexSettings, null, store, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, + TimeValue.timeValueMinutes(5), extRefreshListenerList, intRefreshListenerList, indexSort, new NoneCircuitBreakerService(), globalCheckpointSupplier == null ? new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) :