Engine.newChangesSnapshot may cause unneeded refreshes if called concurrently (#35169)

When the engine is asked for historical operations, we check if some of the requested operations
are not yet refreshed and if so we refresh before returning the operations. The refresh check is
based on capturing the local checkpoint before each refresh and comparing that value to the one
requested when `newChangesSnapshot` was called. If the requested range is above the captured
local checkpoint we issue a refresh.

This can currently cause unneeded extra refreshes if the method is called concurrently which may cause unwanted degradation in indexing performance. This is especially relevant for CCR where we always ask for a range below the global checkpoint. That range is guaranteed to be below the local
checkpoint of the shard and one refresh is enough to serve multiple changes requests.

This commit fixes this by introducing a dedicated mutex to make sure the test for whether a refresh
is needed actually wait for concurrents for concurrent refreshes that were caused by another
change refresh. 

Note that this is not a big change in semantics as refreshes are serialized by lucene anyway. I also
opted not to keep the synchronization to the changes snapshot request only even if in theory we
can apply it to all refreshes, not matter where they come from.
This commit is contained in:
Boaz Leskes 2018-11-04 13:43:33 +01:00 committed by GitHub
parent 54e1231ebd
commit 28078642b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 75 additions and 6 deletions

View File

@ -2563,12 +2563,19 @@ public class InternalEngine extends Engine {
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
}
private final Object refreshIfNeededMutex = new Object();
/**
* Refresh this engine **internally** iff the requesting seq_no is greater than the last refreshed checkpoint.
*/
protected final void refreshIfNeeded(String source, long requestingSeqNo) {
if (lastRefreshedCheckpoint() < requestingSeqNo) {
refresh(source, SearcherScope.INTERNAL);
synchronized (refreshIfNeededMutex) {
if (lastRefreshedCheckpoint() < requestingSeqNo) {
refresh(source, SearcherScope.INTERNAL);
}
}
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.engine;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -5049,6 +5048,60 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(engine.lastRefreshedCheckpoint(), equalTo(engine.getLocalCheckpoint()));
}
public void testLuceneSnapshotRefreshesOnlyOnce() throws Exception {
final MapperService mapperService = createMapperService("test");
final long maxSeqNo = randomLongBetween(10, 50);
final AtomicLong refreshCounter = new AtomicLong();
try (Store store = createStore();
InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(),
null,
new ReferenceManager.RefreshListener() {
@Override
public void beforeRefresh() throws IOException {
refreshCounter.incrementAndGet();
}
@Override
public void afterRefresh(boolean didRefresh) throws IOException {
}
}, null, () -> SequenceNumbers.NO_OPS_PERFORMED))) {
for (long seqNo = 0; seqNo <= maxSeqNo; seqNo++) {
final ParsedDocument doc = testParsedDocument("id_" + seqNo, null, testDocumentWithTextField("test"),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
engine.index(replicaIndexForDoc(doc, 1, seqNo, randomBoolean()));
}
final long initialRefreshCount = refreshCounter.get();
final Thread[] snapshotThreads = new Thread[between(1, 3)];
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < snapshotThreads.length; i++) {
final long min = randomLongBetween(0, maxSeqNo - 5);
final long max = randomLongBetween(min, maxSeqNo);
snapshotThreads[i] = new Thread(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
@Override
protected void doRun() throws Exception {
latch.await();
Translog.Snapshot changes = engine.newChangesSnapshot("test", mapperService, min, max, true);
changes.close();
}
});
snapshotThreads[i].start();
}
latch.countDown();
for (Thread thread : snapshotThreads) {
thread.join();
}
assertThat(refreshCounter.get(), equalTo(initialRefreshCount + 1L));
assertThat(engine.lastRefreshedCheckpoint(), equalTo(maxSeqNo));
}
}
public void testAcquireSearcherOnClosingEngine() throws Exception {
engine.close();
expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test"));

View File

@ -573,7 +573,14 @@ public abstract class EngineTestCase extends ESTestCase {
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) {
IndexWriterConfig iwc = newIndexWriterConfig();
return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, indexSort, globalCheckpointSupplier);
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
ReferenceManager.RefreshListener externalRefreshListener,
ReferenceManager.RefreshListener internalRefreshListener,
Sort indexSort, LongSupplier globalCheckpointSupplier) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
Engine.EventListener listener = new Engine.EventListener() {
@Override
@ -581,12 +588,14 @@ public abstract class EngineTestCase extends ESTestCase {
// we don't need to notify anybody in this test
}
};
final List<ReferenceManager.RefreshListener> refreshListenerList =
refreshListener == null ? emptyList() : Collections.singletonList(refreshListener);
final List<ReferenceManager.RefreshListener> extRefreshListenerList =
externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener);
final List<ReferenceManager.RefreshListener> intRefreshListenerList =
internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener);
EngineConfig config = new EngineConfig(shardId, allocationId.getId(), threadPool, indexSettings, null, store,
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort,
TimeValue.timeValueMinutes(5), extRefreshListenerList, intRefreshListenerList, indexSort,
new NoneCircuitBreakerService(),
globalCheckpointSupplier == null ?
new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) :