From 28078642b34e1e655c761ae1e0cf37b5449890b8 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 4 Nov 2018 13:43:33 +0100 Subject: [PATCH] Engine.newChangesSnapshot may cause unneeded refreshes if called concurrently (#35169) When the engine is asked for historical operations, we check if some of the requested operations are not yet refreshed and if so we refresh before returning the operations. The refresh check is based on capturing the local checkpoint before each refresh and comparing that value to the one requested when `newChangesSnapshot` was called. If the requested range is above the captured local checkpoint we issue a refresh. This can currently cause unneeded extra refreshes if the method is called concurrently which may cause unwanted degradation in indexing performance. This is especially relevant for CCR where we always ask for a range below the global checkpoint. That range is guaranteed to be below the local checkpoint of the shard and one refresh is enough to serve multiple changes requests. This commit fixes this by introducing a dedicated mutex to make sure the test for whether a refresh is needed actually wait for concurrents for concurrent refreshes that were caused by another change refresh. Note that this is not a big change in semantics as refreshes are serialized by lucene anyway. I also opted not to keep the synchronization to the changes snapshot request only even if in theory we can apply it to all refreshes, not matter where they come from. --- .../index/engine/InternalEngine.java | 9 ++- .../index/engine/InternalEngineTests.java | 55 ++++++++++++++++++- .../index/engine/EngineTestCase.java | 17 ++++-- 3 files changed, 75 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3cbfbfaff83..c58e13d65de 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2563,12 +2563,19 @@ public class InternalEngine extends Engine { return lastRefreshedCheckpointListener.refreshedCheckpoint.get(); } + + private final Object refreshIfNeededMutex = new Object(); + /** * Refresh this engine **internally** iff the requesting seq_no is greater than the last refreshed checkpoint. */ protected final void refreshIfNeeded(String source, long requestingSeqNo) { if (lastRefreshedCheckpoint() < requestingSeqNo) { - refresh(source, SearcherScope.INTERNAL); + synchronized (refreshIfNeededMutex) { + if (lastRefreshedCheckpoint() < requestingSeqNo) { + refresh(source, SearcherScope.INTERNAL); + } + } } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index deb2f4c5496..379043fa939 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.engine; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.carrotsearch.randomizedtesting.generators.RandomNumbers; - import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -5049,6 +5048,60 @@ public class InternalEngineTests extends EngineTestCase { assertThat(engine.lastRefreshedCheckpoint(), equalTo(engine.getLocalCheckpoint())); } + public void testLuceneSnapshotRefreshesOnlyOnce() throws Exception { + final MapperService mapperService = createMapperService("test"); + final long maxSeqNo = randomLongBetween(10, 50); + final AtomicLong refreshCounter = new AtomicLong(); + try (Store store = createStore(); + InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), + null, + new ReferenceManager.RefreshListener() { + @Override + public void beforeRefresh() throws IOException { + refreshCounter.incrementAndGet(); + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + + } + }, null, () -> SequenceNumbers.NO_OPS_PERFORMED))) { + for (long seqNo = 0; seqNo <= maxSeqNo; seqNo++) { + final ParsedDocument doc = testParsedDocument("id_" + seqNo, null, testDocumentWithTextField("test"), + new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + engine.index(replicaIndexForDoc(doc, 1, seqNo, randomBoolean())); + } + + final long initialRefreshCount = refreshCounter.get(); + final Thread[] snapshotThreads = new Thread[between(1, 3)]; + CountDownLatch latch = new CountDownLatch(1); + for (int i = 0; i < snapshotThreads.length; i++) { + final long min = randomLongBetween(0, maxSeqNo - 5); + final long max = randomLongBetween(min, maxSeqNo); + snapshotThreads[i] = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + + @Override + protected void doRun() throws Exception { + latch.await(); + Translog.Snapshot changes = engine.newChangesSnapshot("test", mapperService, min, max, true); + changes.close(); + } + }); + snapshotThreads[i].start(); + } + latch.countDown(); + for (Thread thread : snapshotThreads) { + thread.join(); + } + assertThat(refreshCounter.get(), equalTo(initialRefreshCount + 1L)); + assertThat(engine.lastRefreshedCheckpoint(), equalTo(maxSeqNo)); + } + } + public void testAcquireSearcherOnClosingEngine() throws Exception { engine.close(); expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test")); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 3e563e6d538..bbe4dd268e5 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -573,7 +573,14 @@ public abstract class EngineTestCase extends ESTestCase { public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) { - IndexWriterConfig iwc = newIndexWriterConfig(); + return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, indexSort, globalCheckpointSupplier); + } + + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + ReferenceManager.RefreshListener externalRefreshListener, + ReferenceManager.RefreshListener internalRefreshListener, + Sort indexSort, LongSupplier globalCheckpointSupplier) { + IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); Engine.EventListener listener = new Engine.EventListener() { @Override @@ -581,12 +588,14 @@ public abstract class EngineTestCase extends ESTestCase { // we don't need to notify anybody in this test } }; - final List refreshListenerList = - refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); + final List extRefreshListenerList = + externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener); + final List intRefreshListenerList = + internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener); EngineConfig config = new EngineConfig(shardId, allocationId.getId(), threadPool, indexSettings, null, store, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, + TimeValue.timeValueMinutes(5), extRefreshListenerList, intRefreshListenerList, indexSort, new NoneCircuitBreakerService(), globalCheckpointSupplier == null ? new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) :