From 84ec47242807f5449edae60591e12c4fd9f55117 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 4 Dec 2017 16:33:47 +0100 Subject: [PATCH] Include internal refreshes in refresh stats (#27615) Today we exclude internal refreshes in the refresh stats. Yet, it's very much confusing to not take these into account. This change includes internal refreshes into the stats until we have a dedicated stats for this. --- .../index/engine/EngineConfig.java | 22 ++++++++++++++----- .../index/engine/InternalEngine.java | 5 ++++- .../elasticsearch/index/shard/IndexShard.java | 5 +++-- .../index/engine/InternalEngineTests.java | 11 +++++----- .../index/shard/IndexShardTests.java | 22 +++++-------------- .../index/shard/RefreshListenersTests.java | 4 ++-- .../index/engine/EngineTestCase.java | 9 ++++---- 7 files changed, 42 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index f923abc1a6c..8c134b140bd 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -69,7 +69,9 @@ public final class EngineConfig { private final QueryCache queryCache; private final QueryCachingPolicy queryCachingPolicy; @Nullable - private final List refreshListeners; + private final List externalRefreshListener; + @Nullable + private final List internalRefreshListener; @Nullable private final Sort indexSort; private final boolean forceNewHistoryUUID; @@ -120,7 +122,8 @@ public final class EngineConfig { Similarity similarity, CodecService codecService, Engine.EventListener eventListener, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter, - List refreshListeners, Sort indexSort, + List externalRefreshListener, + List internalRefreshListener, Sort indexSort, TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService) { if (openMode == null) { throw new IllegalArgumentException("openMode must not be null"); @@ -147,7 +150,8 @@ public final class EngineConfig { this.flushMergesAfter = flushMergesAfter; this.openMode = openMode; this.forceNewHistoryUUID = forceNewHistoryUUID; - this.refreshListeners = refreshListeners; + this.externalRefreshListener = externalRefreshListener; + this.internalRefreshListener = internalRefreshListener; this.indexSort = indexSort; this.translogRecoveryRunner = translogRecoveryRunner; this.circuitBreakerService = circuitBreakerService; @@ -343,12 +347,18 @@ public final class EngineConfig { } /** - * The refresh listeners to add to Lucene + * The refresh listeners to add to Lucene for externally visible refreshes */ - public List getRefreshListeners() { - return refreshListeners; + public List getExternalRefreshListener() { + return externalRefreshListener; } + /** + * The refresh listeners to add to Lucene for internally visible refreshes. These listeners will also be invoked on external refreshes + */ + public List getInternalRefreshListener() { return internalRefreshListener;} + + /** * returns true if the engine is allowed to optimize indexing operations with an auto-generated ID */ diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index fe8a33f5ecd..53747b063df 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -232,9 +232,12 @@ public class InternalEngine extends Engine { assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; // don't allow commits until we are done with recovering pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); - for (ReferenceManager.RefreshListener listener: engineConfig.getRefreshListeners()) { + for (ReferenceManager.RefreshListener listener: engineConfig.getExternalRefreshListener()) { this.externalSearcherManager.addListener(listener); } + for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) { + this.internalSearcherManager.addListener(listener); + } success = true; } finally { if (success == false) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 12da974645c..f0246060acf 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2194,8 +2194,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, indexCache.query(), cachingPolicy, forceNewHistoryUUID, translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), - Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort, - this::runTranslogRecovery, circuitBreakerService); + Collections.singletonList(refreshListeners), + Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), + indexSort, this::runTranslogRecovery, circuitBreakerService); } /** diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 026a01a23c3..1b700b80086 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -105,7 +105,6 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; -import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; @@ -2547,8 +2546,8 @@ public class InternalEngineTests extends EngineTestCase { threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5), - config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), new NoneCircuitBreakerService()); - + config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), + new NoneCircuitBreakerService()); try { InternalEngine internalEngine = new InternalEngine(brokenConfig); fail("translog belongs to a different engine"); @@ -2601,7 +2600,8 @@ public class InternalEngineTests extends EngineTestCase { threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), - config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), new NoneCircuitBreakerService()); + config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), + new NoneCircuitBreakerService()); engine = new InternalEngine(newConfig); if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { engine.recoverFromTranslog(); @@ -2631,7 +2631,8 @@ public class InternalEngineTests extends EngineTestCase { threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), true, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), - config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), new NoneCircuitBreakerService()); + config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), + new NoneCircuitBreakerService()); engine = new InternalEngine(newConfig); if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { engine.recoverFromTranslog(); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 96bcb9382ee..dc4294f30f5 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -92,6 +92,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; @@ -1215,26 +1216,15 @@ public class IndexShardTests extends IndexShardTestCase { long refreshCount = shard.refreshStats().getTotal(); indexDoc(shard, "test", "test"); try (Engine.GetResult ignored = shard.get(new Engine.Get(true, "test", "test", - new Term(IdFieldMapper.NAME, Uid.encodeId("test"))))) { - assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount)); + new Term(IdFieldMapper.NAME, Uid.encodeId("test"))))) { + assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount+1)); } + indexDoc(shard, "test", "test"); + shard.writeIndexingBuffer(); + assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount+2)); closeShards(shard); } - private ParsedDocument testParsedDocument(String id, String type, String routing, - ParseContext.Document document, BytesReference source, Mapping mappingUpdate) { - Field idField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE); - Field versionField = new NumericDocValuesField("_version", 0); - SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); - document.add(idField); - document.add(versionField); - document.add(seqID.seqNo); - document.add(seqID.seqNoDocValue); - document.add(seqID.primaryTerm); - return new ParsedDocument(versionField, seqID, id, type, routing, Arrays.asList(document), source, XContentType.JSON, - mappingUpdate); - } - public void testIndexingOperationsListeners() throws IOException { IndexShard shard = newStartedShard(true); indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}"); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index e3158a21853..125f45fd007 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -55,7 +55,6 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.Scheduler.Cancellable; @@ -123,7 +122,8 @@ public class RefreshListenersTests extends ESTestCase { EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, allocationId, threadPool, indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, - TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null, new NoneCircuitBreakerService()); + TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null, null, + new NoneCircuitBreakerService()); engine = new InternalEngine(config); listeners.setTranslog(engine.getTranslog()); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 9ba6f64d74c..bfc44f71d80 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -162,8 +162,9 @@ public abstract class EngineTestCase extends ESTestCase { return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), - config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), - config.getIndexSort(), config.getTranslogRecoveryRunner(), config.getCircuitBreakerService()); + config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), + config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(), + config.getCircuitBreakerService()); } @Override @@ -402,8 +403,8 @@ public abstract class EngineTestCase extends ESTestCase { EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, - TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler, new NoneCircuitBreakerService()); - + TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, handler, + new NoneCircuitBreakerService()); return config; }