Update `IndexShard#refreshMetric` via a `ReferenceManager.RefreshListener` (#25083)

The PR takes a different approach to solve #24806 than currently implemented via #25052. The `refreshMetric` that IndexShard maintains is updated using the refresh listeners infrastructure in lucene. This means that we truly count all refreshes that lucene makes and not have to worry about each individual caller (like `IndexShard@refresh` and `Engine#get()`)
This commit is contained in:
Boaz Leskes 2017-06-07 10:54:10 +02:00 committed by GitHub
parent db8aa8e94e
commit 8e15186293
7 changed files with 91 additions and 47 deletions

View File

@ -89,7 +89,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.LongConsumer;
public abstract class Engine implements Closeable {
@ -486,7 +485,7 @@ public abstract class Engine implements Closeable {
}
}
public abstract GetResult get(Get get, Function<String, Searcher> searcherFactory, LongConsumer onRefresh) throws EngineException;
public abstract GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException;
/**
* Returns a new searcher instance. The consumer of this

View File

@ -41,6 +41,8 @@ import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
/*
* Holds all the configuration that is used to create an {@link Engine}.
* Once {@link Engine} has been created with this object, changes to this
@ -65,7 +67,7 @@ public final class EngineConfig {
private final QueryCache queryCache;
private final QueryCachingPolicy queryCachingPolicy;
@Nullable
private final ReferenceManager.RefreshListener refreshListeners;
private final List<ReferenceManager.RefreshListener> refreshListeners;
@Nullable
private final Sort indexSort;
@ -111,7 +113,7 @@ public final class EngineConfig {
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig, TimeValue flushMergesAfter, ReferenceManager.RefreshListener refreshListeners,
TranslogConfig translogConfig, TimeValue flushMergesAfter, List<ReferenceManager.RefreshListener> refreshListeners,
Sort indexSort) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
@ -310,9 +312,9 @@ public final class EngineConfig {
}
/**
* {@linkplain ReferenceManager.RefreshListener} instance to configure.
* The refresh listeners to add to Lucene
*/
public ReferenceManager.RefreshListener getRefreshListeners() {
public List<ReferenceManager.RefreshListener> getRefreshListeners() {
return refreshListeners;
}

View File

@ -34,6 +34,7 @@ import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.TermQuery;
@ -92,7 +93,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
public class InternalEngine extends Engine {
@ -213,8 +213,8 @@ public class InternalEngine extends Engine {
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
// don't allow commits until we are done with recovering
pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
if (engineConfig.getRefreshListeners() != null) {
searcherManager.addListener(engineConfig.getRefreshListeners());
for (ReferenceManager.RefreshListener listener: engineConfig.getRefreshListeners()) {
searcherManager.addListener(listener);
}
success = true;
} finally {
@ -405,7 +405,7 @@ public class InternalEngine extends Engine {
}
@Override
public GetResult get(Get get, Function<String, Searcher> searcherFactory, LongConsumer onRefresh) throws EngineException {
public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException {
assert Objects.equals(get.uid().field(), uidField) : get.uid().field();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
@ -419,9 +419,7 @@ public class InternalEngine extends Engine {
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
get.versionType().explainConflictForReads(versionValue.version, get.version()));
}
long time = System.nanoTime();
refresh("realtime_get");
onRefresh.accept(System.nanoTime() - time);
}
}

View File

@ -26,11 +26,13 @@ import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
@ -123,6 +125,7 @@ import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
@ -660,7 +663,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public Engine.GetResult get(Engine.Get get) {
readAllowed();
return getEngine().get(get, this::acquireSearcher, (timeElapsed) -> refreshMetric.inc(timeElapsed));
return getEngine().get(get, this::acquireSearcher);
}
/**
@ -676,9 +679,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (logger.isTraceEnabled()) {
logger.trace("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes));
}
long time = System.nanoTime();
getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
} finally {
if (logger.isTraceEnabled()) {
logger.trace("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
@ -689,9 +690,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (logger.isTraceEnabled()) {
logger.trace("refresh with source [{}]", source);
}
long time = System.nanoTime();
getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
}
}
@ -1847,7 +1846,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return new EngineConfig(openMode, shardId,
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort);
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort);
}
/**
@ -2123,4 +2123,35 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
private static class RefreshMetricUpdater implements ReferenceManager.RefreshListener {
private final MeanMetric refreshMetric;
private long currentRefreshStartTime;
private Thread callingThread = null;
private RefreshMetricUpdater(MeanMetric refreshMetric) {
this.refreshMetric = refreshMetric;
}
@Override
public void beforeRefresh() throws IOException {
if (Assertions.ENABLED) {
assert callingThread == null : "beforeRefresh was called by " + callingThread.getName() +
" without a corresponding call to afterRefresh";
callingThread = Thread.currentThread();
}
currentRefreshStartTime = System.nanoTime();
}
@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (Assertions.ENABLED) {
assert callingThread != null : "afterRefresh called but not beforeRefresh";
assert callingThread == Thread.currentThread() : "beforeRefreshed called by a different thread. current ["
+ Thread.currentThread().getName() + "], thread that called beforeRefresh [" + callingThread.getName() + "]";
callingThread = null;
}
refreshMetric.inc(System.nanoTime() - currentRefreshStartTime);
}
}
}

View File

@ -172,6 +172,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.shuffle;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY;
@ -429,11 +430,13 @@ public class InternalEngineTests extends ESTestCase {
// we don't need to notify anybody in this test
}
};
final List<ReferenceManager.RefreshListener> refreshListenerList =
refreshListener == null ? emptyList() : Collections.singletonList(refreshListener);
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store,
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
new TranslogHandler(xContentRegistry(), shardId.getIndexName(), indexSettings.getSettings(), logger),
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), refreshListener, indexSort);
TimeValue.timeValueMinutes(5), refreshListenerList, indexSort);
return config;
}
@ -921,9 +924,7 @@ public class InternalEngineTests extends ESTestCase {
final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>();
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final AtomicBoolean refreshed = new AtomicBoolean(false);
latestGetResult.set(engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true)));
assertTrue("failed to refresh", refreshed.get());
latestGetResult.set(engine.get(newGet(true, doc), searcherFactory));
final AtomicBoolean flushFinished = new AtomicBoolean(false);
final CyclicBarrier barrier = new CyclicBarrier(2);
Thread getThread = new Thread(() -> {
@ -937,7 +938,7 @@ public class InternalEngineTests extends ESTestCase {
if (previousGetResult != null) {
previousGetResult.release();
}
latestGetResult.set(engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause a flush is just done")));
latestGetResult.set(engine.get(newGet(true, doc), searcherFactory));
if (latestGetResult.get().exists() == false) {
break;
}
@ -958,7 +959,6 @@ public class InternalEngineTests extends ESTestCase {
searchResult.close();
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final AtomicBoolean refreshed = new AtomicBoolean(false);
// create a document
Document document = testDocumentWithTextField();
@ -973,13 +973,12 @@ public class InternalEngineTests extends ESTestCase {
searchResult.close();
// but, not there non realtime
Engine.GetResult getResult = engine.get(newGet(false, doc), searcherFactory, (onRefresh) -> fail("shouldn't have a refresh"));
Engine.GetResult getResult = engine.get(newGet(false, doc), searcherFactory);
assertThat(getResult.exists(), equalTo(false));
getResult.release();
// but, we can still get it (in realtime)
getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true));
assertTrue("failed to refresh", refreshed.getAndSet(false));
getResult = engine.get(newGet(true, doc), searcherFactory);
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
@ -994,7 +993,7 @@ public class InternalEngineTests extends ESTestCase {
searchResult.close();
// also in non realtime
getResult = engine.get(newGet(false, doc), searcherFactory, (onRefresh) -> fail("shouldn't have a refresh"));
getResult = engine.get(newGet(false, doc), searcherFactory);
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
@ -1014,8 +1013,7 @@ public class InternalEngineTests extends ESTestCase {
searchResult.close();
// but, we can still get it (in realtime)
getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> refreshed.set(true));
assertTrue("failed to refresh", refreshed.get());
getResult = engine.get(newGet(true, doc), searcherFactory);
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
@ -1040,7 +1038,7 @@ public class InternalEngineTests extends ESTestCase {
searchResult.close();
// but, get should not see it (in realtime)
getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause the document is deleted"));
getResult = engine.get(newGet(true, doc), searcherFactory);
assertThat(getResult.exists(), equalTo(false));
getResult.release();
@ -1080,7 +1078,7 @@ public class InternalEngineTests extends ESTestCase {
engine.flush();
// and, verify get (in real time)
getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause a flush is just done"));
getResult = engine.get(newGet(true, doc), searcherFactory);
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
@ -1867,7 +1865,6 @@ public class InternalEngineTests extends ESTestCase {
final Term uidTerm = newUid(doc);
engine.index(indexForDoc(doc));
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final AtomicBoolean refreshed = new AtomicBoolean(false);
for (int i = 0; i < thread.length; i++) {
thread[i] = new Thread(() -> {
startGun.countDown();
@ -1877,7 +1874,7 @@ public class InternalEngineTests extends ESTestCase {
throw new AssertionError(e);
}
for (int op = 0; op < opsPerThread; op++) {
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory, (onRefresh) -> refreshed.set(true))) {
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory)) {
FieldsVisitor visitor = new FieldsVisitor(true);
get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor);
List<String> values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString()));
@ -1905,7 +1902,6 @@ public class InternalEngineTests extends ESTestCase {
for (int i = 0; i < thread.length; i++) {
thread[i].join();
}
assertTrue("failed to refresh", refreshed.getAndSet(false));
List<OpAndVersion> sortedHistory = new ArrayList<>(history);
sortedHistory.sort(Comparator.comparing(o -> o.version));
Set<String> currentValues = new HashSet<>();
@ -1920,8 +1916,7 @@ public class InternalEngineTests extends ESTestCase {
assertTrue(op.added + " should not exist", exists);
}
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory, (onRefresh) -> refreshed.set(true))) {
assertTrue("failed to refresh", refreshed.get());
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory)) {
FieldsVisitor visitor = new FieldsVisitor(true);
get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor);
List<String> values = Arrays.asList(Strings.commaDelimitedListToStringArray(visitor.source().utf8ToString()));
@ -2287,7 +2282,7 @@ public class InternalEngineTests extends ESTestCase {
engine.delete(new Engine.Delete("test", "1", newUid(doc), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
// Get should not find the document
Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause the document is deleted"));
Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory);
assertThat(getResult.exists(), equalTo(false));
// Give the gc pruning logic a chance to kick in
@ -2301,7 +2296,7 @@ public class InternalEngineTests extends ESTestCase {
engine.delete(new Engine.Delete("test", "2", newUid("2"), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
// Get should not find the document (we never indexed uid=2):
getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2")), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists"));
getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2")), searcherFactory);
assertThat(getResult.exists(), equalTo(false));
// Try to index uid=1 with a too-old version, should fail:
@ -2311,7 +2306,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
// Get should still not find the document
getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists"));
getResult = engine.get(newGet(true, doc), searcherFactory);
assertThat(getResult.exists(), equalTo(false));
// Try to index uid=2 with a too-old version, should fail:
@ -2321,7 +2316,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
// Get should not find the document
getResult = engine.get(newGet(true, doc), searcherFactory, (onRefresh) -> fail("shouldn't have refreshed cause document doesn't exists"));
getResult = engine.get(newGet(true, doc), searcherFactory);
assertThat(getResult.exists(), equalTo(false));
}
}
@ -3654,7 +3649,6 @@ public class InternalEngineTests extends ESTestCase {
final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null);
final Term uid = newUid(doc);
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final AtomicBoolean refreshed = new AtomicBoolean(false);
for (int i = 0; i < numberOfOperations; i++) {
if (randomBoolean()) {
final Engine.Index index = new Engine.Index(
@ -3716,8 +3710,7 @@ public class InternalEngineTests extends ESTestCase {
}
assertThat(engine.seqNoService().getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid), searcherFactory, (onRefresh) -> refreshed.set(exists))) {
assertEquals("failed to refresh", exists, refreshed.get());
try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid), searcherFactory)) {
assertThat(result.exists(), equalTo(exists));
}
}

View File

@ -139,6 +139,7 @@ import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
@ -880,6 +881,26 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(shard);
}
public void testRefreshMetric() throws IOException {
IndexShard shard = newStartedShard();
assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // one refresh on end of recovery, one on starting shard
long initialTotalTime = shard.refreshStats().getTotalTimeInMillis();
// check time advances
for (int i = 1; shard.refreshStats().getTotalTimeInMillis() == initialTotalTime; i++) {
indexDoc(shard, "test", "test");
assertThat(shard.refreshStats().getTotal(), equalTo(2L + i - 1));
shard.refresh("test");
assertThat(shard.refreshStats().getTotal(), equalTo(2L + i));
assertThat(shard.refreshStats().getTotalTimeInMillis(), greaterThanOrEqualTo(initialTotalTime));
}
long refreshCount = shard.refreshStats().getTotal();
indexDoc(shard, "test", "test");
try (Engine.GetResult ignored = shard.get(new Engine.Get(true, "test", "test", new Term("_id", "test")))) {
assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount + 1));
}
closeShards(shard);
}
private ParsedDocument testParsedDocument(String id, String type, String routing,
ParseContext.Document document, BytesReference source, Mapping mappingUpdate) {
Field idField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);

View File

@ -64,6 +64,7 @@ import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
@ -120,7 +121,7 @@ public class RefreshListenersTests extends ESTestCase {
store, newMergePolicy(), iwc.getAnalyzer(),
iwc.getSimilarity(), new CodecService(null, logger), eventListener, translogHandler,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), listeners, null);
TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null);
engine = new InternalEngine(config);
listeners.setTranslog(engine.getTranslog());
}
@ -298,8 +299,7 @@ public class RefreshListenersTests extends ESTestCase {
listener.assertNoError();
Engine.Get get = new Engine.Get(false, "test", threadId, new Term(IdFieldMapper.NAME, threadId));
try (Engine.GetResult getResult = engine.get(get, engine::acquireSearcher,
onRefresh -> fail("shouldn't have a refresh"))) {
try (Engine.GetResult getResult = engine.get(get, engine::acquireSearcher)) {
assertTrue("document not found", getResult.exists());
assertEquals(iteration, getResult.version());
SingleFieldsVisitor visitor = new SingleFieldsVisitor("test");