From 8e151862935d38403c3c7ca4705468830a2bf6be Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 7 Jun 2017 10:54:10 +0200 Subject: [PATCH] Update `IndexShard#refreshMetric` via a `ReferenceManager.RefreshListener` (#25083) The PR takes a different approach to solve #24806 than currently implemented via #25052. The `refreshMetric` that IndexShard maintains is updated using the refresh listeners infrastructure in lucene. This means that we truly count all refreshes that lucene makes and not have to worry about each individual caller (like `IndexShard@refresh` and `Engine#get()`) --- .../elasticsearch/index/engine/Engine.java | 3 +- .../index/engine/EngineConfig.java | 10 +++-- .../index/engine/InternalEngine.java | 10 ++--- .../elasticsearch/index/shard/IndexShard.java | 43 +++++++++++++++--- .../index/engine/InternalEngineTests.java | 45 ++++++++----------- .../index/shard/IndexShardTests.java | 21 +++++++++ .../index/shard/RefreshListenersTests.java | 6 +-- 7 files changed, 91 insertions(+), 47 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index ec893e8abcf..0242445de5d 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -89,7 +89,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; -import java.util.function.LongConsumer; public abstract class Engine implements Closeable { @@ -486,7 +485,7 @@ public abstract class Engine implements Closeable { } } - public abstract GetResult get(Get get, Function searcherFactory, LongConsumer onRefresh) throws EngineException; + public abstract GetResult get(Get get, Function searcherFactory) throws EngineException; /** * Returns a new searcher instance. The consumer of this diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 19ec3e036e5..5016c0fcb4f 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -41,6 +41,8 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.threadpool.ThreadPool; +import java.util.List; + /* * Holds all the configuration that is used to create an {@link Engine}. * Once {@link Engine} has been created with this object, changes to this @@ -65,7 +67,7 @@ public final class EngineConfig { private final QueryCache queryCache; private final QueryCachingPolicy queryCachingPolicy; @Nullable - private final ReferenceManager.RefreshListener refreshListeners; + private final List refreshListeners; @Nullable private final Sort indexSort; @@ -111,7 +113,7 @@ public final class EngineConfig { MergePolicy mergePolicy, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.EventListener eventListener, TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, - TranslogConfig translogConfig, TimeValue flushMergesAfter, ReferenceManager.RefreshListener refreshListeners, + TranslogConfig translogConfig, TimeValue flushMergesAfter, List refreshListeners, Sort indexSort) { if (openMode == null) { throw new IllegalArgumentException("openMode must not be null"); @@ -310,9 +312,9 @@ public final class EngineConfig { } /** - * {@linkplain ReferenceManager.RefreshListener} instance to configure. + * The refresh listeners to add to Lucene */ - public ReferenceManager.RefreshListener getRefreshListeners() { + public List getRefreshListeners() { return refreshListeners; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f6e0d174057..5db0249320e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -34,6 +34,7 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherFactory; import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.TermQuery; @@ -92,7 +93,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; -import java.util.function.LongConsumer; import java.util.function.LongSupplier; public class InternalEngine extends Engine { @@ -213,8 +213,8 @@ public class InternalEngine extends Engine { assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; // don't allow commits until we are done with recovering pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); - if (engineConfig.getRefreshListeners() != null) { - searcherManager.addListener(engineConfig.getRefreshListeners()); + for (ReferenceManager.RefreshListener listener: engineConfig.getRefreshListeners()) { + searcherManager.addListener(listener); } success = true; } finally { @@ -405,7 +405,7 @@ public class InternalEngine extends Engine { } @Override - public GetResult get(Get get, Function searcherFactory, LongConsumer onRefresh) throws EngineException { + public GetResult get(Get get, Function searcherFactory) throws EngineException { assert Objects.equals(get.uid().field(), uidField) : get.uid().field(); try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); @@ -419,9 +419,7 @@ public class InternalEngine extends Engine { throw new VersionConflictEngineException(shardId, get.type(), get.id(), get.versionType().explainConflictForReads(versionValue.version, get.version())); } - long time = System.nanoTime(); refresh("realtime_get"); - onRefresh.accept(System.nanoTime() - time); } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ed08309f9a5..8a733de505e 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -26,11 +26,13 @@ import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.Term; import org.apache.lucene.search.QueryCachingPolicy; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.Sort; import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; +import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -123,6 +125,7 @@ import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.List; import java.util.Locale; @@ -660,7 +663,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public Engine.GetResult get(Engine.Get get) { readAllowed(); - return getEngine().get(get, this::acquireSearcher, (timeElapsed) -> refreshMetric.inc(timeElapsed)); + return getEngine().get(get, this::acquireSearcher); } /** @@ -676,9 +679,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl if (logger.isTraceEnabled()) { logger.trace("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes)); } - long time = System.nanoTime(); getEngine().refresh(source); - refreshMetric.inc(System.nanoTime() - time); } finally { if (logger.isTraceEnabled()) { logger.trace("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId()); @@ -689,9 +690,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl if (logger.isTraceEnabled()) { logger.trace("refresh with source [{}]", source); } - long time = System.nanoTime(); getEngine().refresh(source); - refreshMetric.inc(System.nanoTime() - time); } } @@ -1847,7 +1846,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return new EngineConfig(openMode, shardId, threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, - IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort); + IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), + Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort); } /** @@ -2123,4 +2123,35 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } + private static class RefreshMetricUpdater implements ReferenceManager.RefreshListener { + + private final MeanMetric refreshMetric; + private long currentRefreshStartTime; + private Thread callingThread = null; + + private RefreshMetricUpdater(MeanMetric refreshMetric) { + this.refreshMetric = refreshMetric; + } + + @Override + public void beforeRefresh() throws IOException { + if (Assertions.ENABLED) { + assert callingThread == null : "beforeRefresh was called by " + callingThread.getName() + + " without a corresponding call to afterRefresh"; + callingThread = Thread.currentThread(); + } + currentRefreshStartTime = System.nanoTime(); + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + if (Assertions.ENABLED) { + assert callingThread != null : "afterRefresh called but not beforeRefresh"; + assert callingThread == Thread.currentThread() : "beforeRefreshed called by a different thread. current [" + + Thread.currentThread().getName() + "], thread that called beforeRefresh [" + callingThread.getName() + "]"; + callingThread = null; + } + refreshMetric.inc(System.nanoTime() - currentRefreshStartTime); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 12a6f13b8d5..f363b310447 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -172,6 +172,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.LongStream; +import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.shuffle; import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY; @@ -429,11 +430,13 @@ public class InternalEngineTests extends ESTestCase { // we don't need to notify anybody in this test } }; + final List refreshListenerList = + refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, new TranslogHandler(xContentRegistry(), shardId.getIndexName(), indexSettings.getSettings(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), refreshListener, indexSort); + TimeValue.timeValueMinutes(5), refreshListenerList, indexSort); return config; } @@ -921,9 +924,7 @@ public class InternalEngineTests extends ESTestCase { final AtomicReference latestGetResult = new AtomicReference<>(); final Function searcherFactory = engine::acquireSearcher; - final AtomicBoolean refreshed = new AtomicBoolean(false); - latestGetResult.set(engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true))); - assertTrue("failed to refresh", refreshed.get()); + latestGetResult.set(engine.get(newGet(true, doc), searcherFactory)); final AtomicBoolean flushFinished = new AtomicBoolean(false); final CyclicBarrier barrier = new CyclicBarrier(2); Thread getThread = new Thread(() -> { @@ -937,7 +938,7 @@ public class InternalEngineTests extends ESTestCase { if (previousGetResult != null) { previousGetResult.release(); } - latestGetResult.set(engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause a flush is just done"))); + latestGetResult.set(engine.get(newGet(true, doc), searcherFactory)); if (latestGetResult.get().exists() == false) { break; } @@ -958,7 +959,6 @@ public class InternalEngineTests extends ESTestCase { searchResult.close(); final Function searcherFactory = engine::acquireSearcher; - final AtomicBoolean refreshed = new AtomicBoolean(false); // create a document Document document = testDocumentWithTextField(); @@ -973,13 +973,12 @@ public class InternalEngineTests extends ESTestCase { searchResult.close(); // but, not there non realtime - Engine.GetResult getResult = engine.get(newGet(false, doc), searcherFactory, (onRefresh) -> fail("shouldn't have a refresh")); + Engine.GetResult getResult = engine.get(newGet(false, doc), searcherFactory); assertThat(getResult.exists(), equalTo(false)); getResult.release(); // but, we can still get it (in realtime) - getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true)); - assertTrue("failed to refresh", refreshed.getAndSet(false)); + getResult = engine.get(newGet(true, doc), searcherFactory); assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); getResult.release(); @@ -994,7 +993,7 @@ public class InternalEngineTests extends ESTestCase { searchResult.close(); // also in non realtime - getResult = engine.get(newGet(false, doc), searcherFactory, (onRefresh) -> fail("shouldn't have a refresh")); + getResult = engine.get(newGet(false, doc), searcherFactory); assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); getResult.release(); @@ -1014,8 +1013,7 @@ public class InternalEngineTests extends ESTestCase { searchResult.close(); // but, we can still get it (in realtime) - getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true)); - assertTrue("failed to refresh", refreshed.get()); + getResult = engine.get(newGet(true, doc), searcherFactory); assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); getResult.release(); @@ -1040,7 +1038,7 @@ public class InternalEngineTests extends ESTestCase { searchResult.close(); // but, get should not see it (in realtime) - getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause the document is deleted")); + getResult = engine.get(newGet(true, doc), searcherFactory); assertThat(getResult.exists(), equalTo(false)); getResult.release(); @@ -1080,7 +1078,7 @@ public class InternalEngineTests extends ESTestCase { engine.flush(); // and, verify get (in real time) - getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause a flush is just done")); + getResult = engine.get(newGet(true, doc), searcherFactory); assertThat(getResult.exists(), equalTo(true)); assertThat(getResult.docIdAndVersion(), notNullValue()); getResult.release(); @@ -1867,7 +1865,6 @@ public class InternalEngineTests extends ESTestCase { final Term uidTerm = newUid(doc); engine.index(indexForDoc(doc)); final Function searcherFactory = engine::acquireSearcher; - final AtomicBoolean refreshed = new AtomicBoolean(false); for (int i = 0; i < thread.length; i++) { thread[i] = new Thread(() -> { startGun.countDown(); @@ -1877,7 +1874,7 @@ public class InternalEngineTests extends ESTestCase { throw new AssertionError(e); } for (int op = 0; op < opsPerThread; op++) { - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory, (onRefresh) -> refreshed.set(true))) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory)) { FieldsVisitor visitor = new FieldsVisitor(true); get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor); List values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString())); @@ -1905,7 +1902,6 @@ public class InternalEngineTests extends ESTestCase { for (int i = 0; i < thread.length; i++) { thread[i].join(); } - assertTrue("failed to refresh", refreshed.getAndSet(false)); List sortedHistory = new ArrayList<>(history); sortedHistory.sort(Comparator.comparing(o -> o.version)); Set currentValues = new HashSet<>(); @@ -1920,8 +1916,7 @@ public class InternalEngineTests extends ESTestCase { assertTrue(op.added + " should not exist", exists); } - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory, (onRefresh) -> refreshed.set(true))) { - assertTrue("failed to refresh", refreshed.get()); + try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory)) { FieldsVisitor visitor = new FieldsVisitor(true); get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor); List values = Arrays.asList(Strings.commaDelimitedListToStringArray(visitor.source().utf8ToString())); @@ -2287,7 +2282,7 @@ public class InternalEngineTests extends ESTestCase { engine.delete(new Engine.Delete("test", "1", newUid(doc), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime())); // Get should not find the document - Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause the document is deleted")); + Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory); assertThat(getResult.exists(), equalTo(false)); // Give the gc pruning logic a chance to kick in @@ -2301,7 +2296,7 @@ public class InternalEngineTests extends ESTestCase { engine.delete(new Engine.Delete("test", "2", newUid("2"), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime())); // Get should not find the document (we never indexed uid=2): - getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2")), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists")); + getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2")), searcherFactory); assertThat(getResult.exists(), equalTo(false)); // Try to index uid=1 with a too-old version, should fail: @@ -2311,7 +2306,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // Get should still not find the document - getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists")); + getResult = engine.get(newGet(true, doc), searcherFactory); assertThat(getResult.exists(), equalTo(false)); // Try to index uid=2 with a too-old version, should fail: @@ -2321,7 +2316,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // Get should not find the document - getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists")); + getResult = engine.get(newGet(true, doc), searcherFactory); assertThat(getResult.exists(), equalTo(false)); } } @@ -3654,7 +3649,6 @@ public class InternalEngineTests extends ESTestCase { final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null); final Term uid = newUid(doc); final Function searcherFactory = engine::acquireSearcher; - final AtomicBoolean refreshed = new AtomicBoolean(false); for (int i = 0; i < numberOfOperations; i++) { if (randomBoolean()) { final Engine.Index index = new Engine.Index( @@ -3716,8 +3710,7 @@ public class InternalEngineTests extends ESTestCase { } assertThat(engine.seqNoService().getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); - try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid), searcherFactory, (onRefresh) -> refreshed.set(exists))) { - assertEquals("failed to refresh", exists, refreshed.get()); + try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid), searcherFactory)) { assertThat(result.exists(), equalTo(exists)); } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 38cac70b5e3..ebde407d33d 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -139,6 +139,7 @@ import static org.elasticsearch.test.hamcrest.RegexMatcher.matches; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; @@ -880,6 +881,26 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(shard); } + public void testRefreshMetric() throws IOException { + IndexShard shard = newStartedShard(); + assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // one refresh on end of recovery, one on starting shard + long initialTotalTime = shard.refreshStats().getTotalTimeInMillis(); + // check time advances + for (int i = 1; shard.refreshStats().getTotalTimeInMillis() == initialTotalTime; i++) { + indexDoc(shard, "test", "test"); + assertThat(shard.refreshStats().getTotal(), equalTo(2L + i - 1)); + shard.refresh("test"); + assertThat(shard.refreshStats().getTotal(), equalTo(2L + i)); + assertThat(shard.refreshStats().getTotalTimeInMillis(), greaterThanOrEqualTo(initialTotalTime)); + } + long refreshCount = shard.refreshStats().getTotal(); + indexDoc(shard, "test", "test"); + try (Engine.GetResult ignored = shard.get(new Engine.Get(true, "test", "test", new Term("_id", "test")))) { + assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount + 1)); + } + closeShards(shard); + } + private ParsedDocument testParsedDocument(String id, String type, String routing, ParseContext.Document document, BytesReference source, Mapping mappingUpdate) { Field idField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 99114b4819e..7c396fd6693 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -64,6 +64,7 @@ import org.junit.Before; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; @@ -120,7 +121,7 @@ public class RefreshListenersTests extends ESTestCase { store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), eventListener, translogHandler, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), listeners, null); + TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null); engine = new InternalEngine(config); listeners.setTranslog(engine.getTranslog()); } @@ -298,8 +299,7 @@ public class RefreshListenersTests extends ESTestCase { listener.assertNoError(); Engine.Get get = new Engine.Get(false, "test", threadId, new Term(IdFieldMapper.NAME, threadId)); - try (Engine.GetResult getResult = engine.get(get, engine::acquireSearcher, - onRefresh -> fail("shouldn't have a refresh"))) { + try (Engine.GetResult getResult = engine.get(get, engine::acquireSearcher)) { assertTrue("document not found", getResult.exists()); assertEquals(iteration, getResult.version()); SingleFieldsVisitor visitor = new SingleFieldsVisitor("test");