Include internal refreshes in refresh stats (#27615)
Today we exclude internal refreshes in the refresh stats. Yet, it's very much confusing to not take these into account. This change includes internal refreshes into the stats until we have a dedicated stats for this.
This commit is contained in:
parent
e213fa033d
commit
84ec472428
|
@ -69,7 +69,9 @@ public final class EngineConfig {
|
|||
private final QueryCache queryCache;
|
||||
private final QueryCachingPolicy queryCachingPolicy;
|
||||
@Nullable
|
||||
private final List<ReferenceManager.RefreshListener> refreshListeners;
|
||||
private final List<ReferenceManager.RefreshListener> externalRefreshListener;
|
||||
@Nullable
|
||||
private final List<ReferenceManager.RefreshListener> internalRefreshListener;
|
||||
@Nullable
|
||||
private final Sort indexSort;
|
||||
private final boolean forceNewHistoryUUID;
|
||||
|
@ -120,7 +122,8 @@ public final class EngineConfig {
|
|||
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
|
||||
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
|
||||
boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter,
|
||||
List<ReferenceManager.RefreshListener> refreshListeners, Sort indexSort,
|
||||
List<ReferenceManager.RefreshListener> externalRefreshListener,
|
||||
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
|
||||
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService) {
|
||||
if (openMode == null) {
|
||||
throw new IllegalArgumentException("openMode must not be null");
|
||||
|
@ -147,7 +150,8 @@ public final class EngineConfig {
|
|||
this.flushMergesAfter = flushMergesAfter;
|
||||
this.openMode = openMode;
|
||||
this.forceNewHistoryUUID = forceNewHistoryUUID;
|
||||
this.refreshListeners = refreshListeners;
|
||||
this.externalRefreshListener = externalRefreshListener;
|
||||
this.internalRefreshListener = internalRefreshListener;
|
||||
this.indexSort = indexSort;
|
||||
this.translogRecoveryRunner = translogRecoveryRunner;
|
||||
this.circuitBreakerService = circuitBreakerService;
|
||||
|
@ -343,12 +347,18 @@ public final class EngineConfig {
|
|||
}
|
||||
|
||||
/**
|
||||
* The refresh listeners to add to Lucene
|
||||
* The refresh listeners to add to Lucene for externally visible refreshes
|
||||
*/
|
||||
public List<ReferenceManager.RefreshListener> getRefreshListeners() {
|
||||
return refreshListeners;
|
||||
public List<ReferenceManager.RefreshListener> getExternalRefreshListener() {
|
||||
return externalRefreshListener;
|
||||
}
|
||||
|
||||
/**
|
||||
* The refresh listeners to add to Lucene for internally visible refreshes. These listeners will also be invoked on external refreshes
|
||||
*/
|
||||
public List<ReferenceManager.RefreshListener> getInternalRefreshListener() { return internalRefreshListener;}
|
||||
|
||||
|
||||
/**
|
||||
* returns true if the engine is allowed to optimize indexing operations with an auto-generated ID
|
||||
*/
|
||||
|
|
|
@ -232,9 +232,12 @@ public class InternalEngine extends Engine {
|
|||
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
|
||||
// don't allow commits until we are done with recovering
|
||||
pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
|
||||
for (ReferenceManager.RefreshListener listener: engineConfig.getRefreshListeners()) {
|
||||
for (ReferenceManager.RefreshListener listener: engineConfig.getExternalRefreshListener()) {
|
||||
this.externalSearcherManager.addListener(listener);
|
||||
}
|
||||
for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
|
||||
this.internalSearcherManager.addListener(listener);
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
|
|
|
@ -2194,8 +2194,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
|
||||
indexCache.query(), cachingPolicy, forceNewHistoryUUID, translogConfig,
|
||||
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
|
||||
Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort,
|
||||
this::runTranslogRecovery, circuitBreakerService);
|
||||
Collections.singletonList(refreshListeners),
|
||||
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
|
||||
indexSort, this::runTranslogRecovery, circuitBreakerService);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -105,7 +105,6 @@ import org.elasticsearch.index.mapper.ParsedDocument;
|
|||
import org.elasticsearch.index.mapper.RootObjectMapper;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.SourceFieldMapper;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
|
@ -2547,8 +2546,8 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
|
||||
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
|
||||
IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5),
|
||||
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), new NoneCircuitBreakerService());
|
||||
|
||||
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
|
||||
new NoneCircuitBreakerService());
|
||||
try {
|
||||
InternalEngine internalEngine = new InternalEngine(brokenConfig);
|
||||
fail("translog belongs to a different engine");
|
||||
|
@ -2601,7 +2600,8 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
|
||||
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
|
||||
IndexSearcher.getDefaultQueryCachingPolicy(), false, config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
|
||||
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), new NoneCircuitBreakerService());
|
||||
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
|
||||
new NoneCircuitBreakerService());
|
||||
engine = new InternalEngine(newConfig);
|
||||
if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
|
||||
engine.recoverFromTranslog();
|
||||
|
@ -2631,7 +2631,8 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
|
||||
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
|
||||
IndexSearcher.getDefaultQueryCachingPolicy(), true, config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
|
||||
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), new NoneCircuitBreakerService());
|
||||
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
|
||||
new NoneCircuitBreakerService());
|
||||
engine = new InternalEngine(newConfig);
|
||||
if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
|
||||
engine.recoverFromTranslog();
|
||||
|
|
|
@ -92,6 +92,7 @@ import org.elasticsearch.index.mapper.ParsedDocument;
|
|||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.refresh.RefreshStats;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
|
@ -1215,26 +1216,15 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
long refreshCount = shard.refreshStats().getTotal();
|
||||
indexDoc(shard, "test", "test");
|
||||
try (Engine.GetResult ignored = shard.get(new Engine.Get(true, "test", "test",
|
||||
new Term(IdFieldMapper.NAME, Uid.encodeId("test"))))) {
|
||||
assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount));
|
||||
new Term(IdFieldMapper.NAME, Uid.encodeId("test"))))) {
|
||||
assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount+1));
|
||||
}
|
||||
indexDoc(shard, "test", "test");
|
||||
shard.writeIndexingBuffer();
|
||||
assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount+2));
|
||||
closeShards(shard);
|
||||
}
|
||||
|
||||
private ParsedDocument testParsedDocument(String id, String type, String routing,
|
||||
ParseContext.Document document, BytesReference source, Mapping mappingUpdate) {
|
||||
Field idField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);
|
||||
Field versionField = new NumericDocValuesField("_version", 0);
|
||||
SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
|
||||
document.add(idField);
|
||||
document.add(versionField);
|
||||
document.add(seqID.seqNo);
|
||||
document.add(seqID.seqNoDocValue);
|
||||
document.add(seqID.primaryTerm);
|
||||
return new ParsedDocument(versionField, seqID, id, type, routing, Arrays.asList(document), source, XContentType.JSON,
|
||||
mappingUpdate);
|
||||
}
|
||||
|
||||
public void testIndexingOperationsListeners() throws IOException {
|
||||
IndexShard shard = newStartedShard(true);
|
||||
indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}");
|
||||
|
|
|
@ -55,7 +55,6 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
|||
import org.elasticsearch.test.DummyShardLock;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.Scheduler.Cancellable;
|
||||
|
@ -123,7 +122,8 @@ public class RefreshListenersTests extends ESTestCase {
|
|||
EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, allocationId, threadPool,
|
||||
indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger),
|
||||
eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig,
|
||||
TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null, new NoneCircuitBreakerService());
|
||||
TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null, null,
|
||||
new NoneCircuitBreakerService());
|
||||
engine = new InternalEngine(config);
|
||||
listeners.setTranslog(engine.getTranslog());
|
||||
}
|
||||
|
|
|
@ -162,8 +162,9 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(),
|
||||
config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
|
||||
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
|
||||
config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(),
|
||||
config.getIndexSort(), config.getTranslogRecoveryRunner(), config.getCircuitBreakerService());
|
||||
config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(),
|
||||
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(),
|
||||
config.getCircuitBreakerService());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -402,8 +403,8 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store,
|
||||
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
|
||||
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig,
|
||||
TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler, new NoneCircuitBreakerService());
|
||||
|
||||
TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, handler,
|
||||
new NoneCircuitBreakerService());
|
||||
return config;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue