From 56329d0f53e944cee1dcb6085fe6e0b9d69935c5 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 6 Jan 2016 10:59:33 +0100 Subject: [PATCH] Never call a listerner under lock in InternalEngine We has a postIndex|DeleteUnderLock listener callback to load percolator queries which is entirely private to the index shard in the meanwhile. Yet, it still calls an external callback while holding an indexing lock which is scary since we have no control over how long the operation could possibly take. This commit decouples the percolator registry entirely from the ShardIndexingService by pessimistically fetching percolator documents from the the engine using realtime get. Even in situations where the same document is changed concurrently we will eventually end up in the correct state without loosing an update. This also moves the index throtteling stats directly into the engine to entirely remove the need for the dependency between InternalEngine and ShardIndexingService. --- .../elasticsearch/index/engine/Engine.java | 52 +++++++++++++- .../index/engine/EngineConfig.java | 16 +---- .../index/engine/InternalEngine.java | 12 ++-- .../indexing/IndexingOperationListener.java | 19 ----- .../index/indexing/ShardIndexingService.java | 65 ++--------------- .../percolator/PercolatorQueriesRegistry.java | 69 ++++++------------- .../elasticsearch/index/shard/IndexShard.java | 35 +++++++--- .../index/engine/InternalEngineTests.java | 6 +- .../index/engine/ShadowEngineTests.java | 3 +- 9 files changed, 110 insertions(+), 167 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 4dccd3c6f5e..c013a134e03 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -50,6 +50,8 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.ParseContext.Document; @@ -176,10 +178,10 @@ public abstract class Engine implements Closeable { * is enabled */ protected static final class IndexThrottle { - + private final CounterMetric throttleTimeMillisMetric = new CounterMetric(); + private volatile long startOfThrottleNS; private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock()); private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock()); - private volatile ReleasableLock lock = NOOP_LOCK; public Releasable acquireThrottle() { @@ -189,6 +191,7 @@ public abstract class Engine implements Closeable { /** Activate throttling, which switches the lock to be a real lock */ public void activate() { assert lock == NOOP_LOCK : "throttling activated while already active"; + startOfThrottleNS = System.nanoTime(); lock = lockReference; } @@ -196,7 +199,45 @@ public abstract class Engine implements Closeable { public void deactivate() { assert lock != NOOP_LOCK : "throttling deactivated but not active"; lock = NOOP_LOCK; + + assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS"; + long throttleTimeNS = System.nanoTime() - startOfThrottleNS; + if (throttleTimeNS >= 0) { + // Paranoia (System.nanoTime() is supposed to be monotonic): time slip may have occurred but never want to add a negative number + throttleTimeMillisMetric.inc(TimeValue.nsecToMSec(throttleTimeNS)); + } } + + long getThrottleTimeInMillis() { + long currentThrottleNS = 0; + if (isThrottled() && startOfThrottleNS != 0) { + currentThrottleNS += System.nanoTime() - startOfThrottleNS; + if (currentThrottleNS < 0) { + // Paranoia (System.nanoTime() is supposed to be monotonic): time slip must have happened, have to ignore this value + currentThrottleNS = 0; + } + } + return throttleTimeMillisMetric.count() + TimeValue.nsecToMSec(currentThrottleNS); + } + + boolean isThrottled() { + return lock != NOOP_LOCK; + } + } + + /** + * Returns the number of milliseconds this engine was under index throttling. + */ + public long getIndexThrottleTimeInMillis() { + return 0; + } + + /** + * Returns the true iff this engine is currently under index throttling. + * @see #getIndexThrottleTimeInMillis() + */ + public boolean isThrottled() { + return false; } /** A Lock implementation that always allows the lock to be acquired */ @@ -916,7 +957,7 @@ public abstract class Engine implements Closeable { } } - public static class GetResult { + public static class GetResult implements Releasable { private final boolean exists; private final long version; private final Translog.Source source; @@ -962,6 +1003,11 @@ public abstract class Engine implements Closeable { return docIdAndVersion; } + @Override + public void close() { + release(); + } + public void release() { if (searcher != null) { searcher.close(); diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index b44265f37ad..35f1e066d3a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -58,7 +58,6 @@ public final class EngineConfig { private final TimeValue flushMergesAfter; private final String codecName; private final ThreadPool threadPool; - private final ShardIndexingService indexingService; private final Engine.Warmer warmer; private final Store store; private final SnapshotDeletionPolicy deletionPolicy; @@ -107,7 +106,7 @@ public final class EngineConfig { /** * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} */ - public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService, + public EngineConfig(ShardId shardId, ThreadPool threadPool, IndexSettings indexSettings, Engine.Warmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.EventListener eventListener, @@ -116,7 +115,6 @@ public final class EngineConfig { final Settings settings = indexSettings.getSettings(); this.indexSettings = indexSettings; this.threadPool = threadPool; - this.indexingService = indexingService; this.warmer = warmer == null ? (a,b) -> {} : warmer; this.store = store; this.deletionPolicy = deletionPolicy; @@ -239,18 +237,6 @@ public final class EngineConfig { return threadPool; } - /** - * Returns a {@link org.elasticsearch.index.indexing.ShardIndexingService} used inside the engine to inform about - * pre and post index. The operations are used for statistic purposes etc. - * - * @see org.elasticsearch.index.indexing.ShardIndexingService#postIndex(Engine.Index) - * @see org.elasticsearch.index.indexing.ShardIndexingService#preIndex(Engine.Index) - * - */ - public ShardIndexingService getIndexingService() { - return indexingService; - } - /** * Returns an {@link org.elasticsearch.index.engine.Engine.Warmer} used to warm new searchers before they are used for searching. */ diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index be2b73ea6c7..1fcbbee5414 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -55,6 +55,7 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.math.MathUtils; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -95,7 +96,6 @@ public class InternalEngine extends Engine { */ private volatile long lastDeleteVersionPruneTimeMSec; - private final ShardIndexingService indexingService; private final Engine.Warmer warmer; private final Translog translog; private final ElasticsearchConcurrentMergeScheduler mergeScheduler; @@ -131,7 +131,6 @@ public class InternalEngine extends Engine { boolean success = false; try { this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis(); - this.indexingService = engineConfig.getIndexingService(); this.warmer = engineConfig.getWarmer(); mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings(), engineConfig.getMergeSchedulerConfig()); this.dirtyLocks = new Object[Runtime.getRuntime().availableProcessors() * 10]; // we multiply it to have enough... @@ -422,8 +421,6 @@ public class InternalEngine extends Engine { versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation)); index.setTranslogLocation(translogLocation); - - indexingService.postIndexUnderLock(index); return created; } } @@ -524,7 +521,6 @@ public class InternalEngine extends Engine { Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), translogLocation)); delete.setTranslogLocation(translogLocation); - indexingService.postDeleteUnderLock(delete); } } @@ -1059,6 +1055,10 @@ public class InternalEngine extends Engine { throttle.deactivate(); } + public long getIndexThrottleTimeInMillis() { + return throttle.getThrottleTimeInMillis(); + } + long getGcDeletesInMillis() { return engineConfig.getGcDeletesInMillis(); } @@ -1081,7 +1081,6 @@ public class InternalEngine extends Engine { if (numMergesInFlight.incrementAndGet() > maxNumMerges) { if (isThrottling.getAndSet(true) == false) { logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); - indexingService.throttlingActivated(); activateThrottling(); } } @@ -1093,7 +1092,6 @@ public class InternalEngine extends Engine { if (numMergesInFlight.decrementAndGet() < maxNumMerges) { if (isThrottling.getAndSet(false)) { logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); - indexingService.throttlingDeactivated(); deactivateThrottling(); } } diff --git a/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java index 651bc405a84..39f3dd602fb 100644 --- a/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java +++ b/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java @@ -32,16 +32,6 @@ public abstract class IndexingOperationListener { return operation; } - /** - * Called after the indexing occurs, under a locking scheme to maintain - * concurrent updates to the same doc. - *

- * Note, long operations should not occur under this callback. - */ - public void postIndexUnderLock(Engine.Index index) { - - } - /** * Called after the indexing operation occurred. */ @@ -63,15 +53,6 @@ public abstract class IndexingOperationListener { return delete; } - /** - * Called after the delete occurs, under a locking scheme to maintain - * concurrent updates to the same doc. - *

- * Note, long operations should not occur under this callback. - */ - public void postDeleteUnderLock(Engine.Delete delete) { - - } /** * Called after the delete operation occurred. diff --git a/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java b/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java index 5cf180c3a2e..f7175c02c59 100644 --- a/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java +++ b/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java @@ -59,19 +59,19 @@ public class ShardIndexingService extends AbstractIndexShardComponent { * is returned for them. If they are set, then only types provided will be returned, or * _all for all types. */ - public IndexingStats stats(String... types) { - IndexingStats.Stats total = totalStats.stats(); + public IndexingStats stats(boolean isThrottled, long currentThrottleInMillis, String... types) { + IndexingStats.Stats total = totalStats.stats(isThrottled, currentThrottleInMillis); Map typesSt = null; if (types != null && types.length > 0) { typesSt = new HashMap<>(typesStats.size()); if (types.length == 1 && types[0].equals("_all")) { for (Map.Entry entry : typesStats.entrySet()) { - typesSt.put(entry.getKey(), entry.getValue().stats()); + typesSt.put(entry.getKey(), entry.getValue().stats(isThrottled, currentThrottleInMillis)); } } else { for (Map.Entry entry : typesStats.entrySet()) { if (Regex.simpleMatch(types, entry.getKey())) { - typesSt.put(entry.getKey(), entry.getValue().stats()); + typesSt.put(entry.getKey(), entry.getValue().stats(isThrottled, currentThrottleInMillis)); } } } @@ -87,14 +87,6 @@ public class ShardIndexingService extends AbstractIndexShardComponent { listeners.remove(listener); } - public void throttlingActivated() { - totalStats.setThrottled(true); - } - - public void throttlingDeactivated() { - totalStats.setThrottled(false); - } - public Engine.Index preIndex(Engine.Index operation) { totalStats.indexCurrent.inc(); typeStats(operation.type()).indexCurrent.inc(); @@ -104,16 +96,6 @@ public class ShardIndexingService extends AbstractIndexShardComponent { return operation; } - public void postIndexUnderLock(Engine.Index index) { - for (IndexingOperationListener listener : listeners) { - try { - listener.postIndexUnderLock(index); - } catch (Exception e) { - logger.warn("postIndexUnderLock listener [{}] failed", e, listener); - } - } - } - public void postIndex(Engine.Index index) { long took = index.endTime() - index.startTime(); totalStats.indexMetric.inc(took); @@ -154,15 +136,6 @@ public class ShardIndexingService extends AbstractIndexShardComponent { return delete; } - public void postDeleteUnderLock(Engine.Delete delete) { - for (IndexingOperationListener listener : listeners) { - try { - listener.postDeleteUnderLock(delete); - } catch (Exception e) { - logger.warn("postDeleteUnderLock listener [{}] failed", e, listener); - } - } - } public void postDelete(Engine.Delete delete) { long took = delete.endTime() - delete.startTime(); @@ -238,38 +211,12 @@ public class ShardIndexingService extends AbstractIndexShardComponent { public final CounterMetric indexFailed = new CounterMetric(); public final CounterMetric deleteCurrent = new CounterMetric(); public final CounterMetric noopUpdates = new CounterMetric(); - public final CounterMetric throttleTimeMillisMetric = new CounterMetric(); - volatile boolean isThrottled = false; - volatile long startOfThrottleNS; - public IndexingStats.Stats stats() { - long currentThrottleNS = 0; - if (isThrottled && startOfThrottleNS != 0) { - currentThrottleNS += System.nanoTime() - startOfThrottleNS; - if (currentThrottleNS < 0) { - // Paranoia (System.nanoTime() is supposed to be monotonic): time slip must have happened, have to ignore this value - currentThrottleNS = 0; - } - } + public IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) { return new IndexingStats.Stats( indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(), indexFailed.count(), deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(), - noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(throttleTimeMillisMetric.count() + TimeValue.nsecToMSec(currentThrottleNS))); - } - - - void setThrottled(boolean isThrottled) { - if (!this.isThrottled && isThrottled) { - startOfThrottleNS = System.nanoTime(); - } else if (this.isThrottled && !isThrottled) { - assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS"; - long throttleTimeNS = System.nanoTime() - startOfThrottleNS; - if (throttleTimeNS >= 0) { - // Paranoia (System.nanoTime() is supposed to be monotonic): time slip may have occurred but never want to add a negative number - throttleTimeMillisMetric.inc(TimeValue.nsecToMSec(throttleTimeNS)); - } - } - this.isThrottled = isThrottled; + noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis)); } public long totalCurrent() { diff --git a/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java b/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java index eaf562e2127..de56e4f3242 100644 --- a/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java +++ b/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java @@ -43,7 +43,9 @@ import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentTypeListener; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.internal.TypeFieldMapper; +import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; @@ -71,29 +73,21 @@ public final class PercolatorQueriesRegistry extends AbstractIndexShardComponent private final MapperService mapperService; private final IndexFieldDataService indexFieldDataService; - private final ShardIndexingService indexingService; private final ConcurrentMap percolateQueries = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); - private final RealTimePercolatorOperationListener realTimePercolatorOperationListener = new RealTimePercolatorOperationListener(); - private final PercolateTypeListener percolateTypeListener = new PercolateTypeListener(); - private final AtomicBoolean realTimePercolatorEnabled = new AtomicBoolean(false); private final QueryShardContext queryShardContext; private boolean mapUnmappedFieldsAsString; private final MeanMetric percolateMetric = new MeanMetric(); private final CounterMetric currentMetric = new CounterMetric(); private final CounterMetric numberOfQueries = new CounterMetric(); - public PercolatorQueriesRegistry(ShardId shardId, IndexSettings indexSettings, - ShardIndexingService indexingService, MapperService mapperService, - QueryShardContext queryShardContext, - IndexFieldDataService indexFieldDataService) { + public PercolatorQueriesRegistry(ShardId shardId, IndexSettings indexSettings, MapperService mapperService, + QueryShardContext queryShardContext, IndexFieldDataService indexFieldDataService) { super(shardId, indexSettings); this.mapperService = mapperService; - this.indexingService = indexingService; this.queryShardContext = queryShardContext; this.indexFieldDataService = indexFieldDataService; this.mapUnmappedFieldsAsString = this.indexSettings.getSettings().getAsBoolean(MAP_UNMAPPED_FIELDS_AS_STRING, false); - mapperService.addTypeListener(percolateTypeListener); } public ConcurrentMap percolateQueries() { @@ -102,8 +96,6 @@ public final class PercolatorQueriesRegistry extends AbstractIndexShardComponent @Override public void close() { - mapperService.removeTypeListener(percolateTypeListener); - indexingService.removeListener(realTimePercolatorOperationListener); clear(); } @@ -111,11 +103,6 @@ public final class PercolatorQueriesRegistry extends AbstractIndexShardComponent percolateQueries.clear(); } - public void enableRealTimePercolator() { - if (realTimePercolatorEnabled.compareAndSet(false, true)) { - indexingService.addListener(realTimePercolatorOperationListener); - } - } public void addPercolateQuery(String idAsString, BytesReference source) { Query newquery = parsePercolatorDocument(idAsString, source); @@ -207,16 +194,6 @@ public final class PercolatorQueriesRegistry extends AbstractIndexShardComponent } } - private class PercolateTypeListener implements DocumentTypeListener { - - @Override - public void beforeCreate(DocumentMapper mapper) { - if (PercolatorService.TYPE_NAME.equals(mapper.type())) { - enableRealTimePercolator(); - } - } - } - public void loadQueries(IndexReader reader) { logger.trace("loading percolator queries..."); final int loadedQueries; @@ -238,30 +215,26 @@ public final class PercolatorQueriesRegistry extends AbstractIndexShardComponent logger.debug("done loading [{}] percolator queries", loadedQueries); } - private class RealTimePercolatorOperationListener extends IndexingOperationListener { - - @Override - public Engine.Index preIndex(Engine.Index operation) { - // validate the query here, before we index - if (PercolatorService.TYPE_NAME.equals(operation.type())) { - parsePercolatorDocument(operation.id(), operation.source()); - } - return operation; + public boolean isPercolatorQuery(Engine.Index operation) { + if (PercolatorService.TYPE_NAME.equals(operation.type())) { + parsePercolatorDocument(operation.id(), operation.source()); + return true; } + return false; + } - @Override - public void postIndexUnderLock(Engine.Index index) { - // add the query under a doc lock - if (PercolatorService.TYPE_NAME.equals(index.type())) { - addPercolateQuery(index.id(), index.source()); - } - } + public boolean isPercolatorQuery(Engine.Delete operation) { + return PercolatorService.TYPE_NAME.equals(operation.type()); + } - @Override - public void postDeleteUnderLock(Engine.Delete delete) { - // remove the query under a lock - if (PercolatorService.TYPE_NAME.equals(delete.type())) { - removePercolateQuery(delete.id()); + public synchronized void updatePercolateQuery(Engine engine, String id) { + // this can be called out of order as long as for every change to a percolator document it's invoked. This will always + // fetch the latest change but might fetch the same change twice if updates / deletes happen concurrently. + try (Engine.GetResult getResult = engine.get(new Engine.Get(true, new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(PercolatorService.TYPE_NAME, id))))) { + if (getResult.exists()) { + addPercolateQuery(id, getResult.source().source); + } else { + removePercolateQuery(id); } } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 84752af1e68..f5caa59c79b 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -267,12 +267,7 @@ public class IndexShard extends AbstractIndexShardComponent { this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId); this.provider = provider; this.searcherWrapper = indexSearcherWrapper; - this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, indexingService, mapperService, newQueryShardContext(), indexFieldDataService); - if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) { - percolatorQueriesRegistry.enableRealTimePercolator(); - } - - + this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, mapperService, newQueryShardContext(), indexFieldDataService); // We start up inactive active.set(false); } @@ -500,7 +495,12 @@ public class IndexShard extends AbstractIndexShardComponent { if (logger.isTraceEnabled()) { logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs()); } - created = getEngine().index(index); + final boolean isPercolatorQuery = percolatorQueriesRegistry.isPercolatorQuery(index); + Engine engine = getEngine(); + created = engine.index(index); + if (isPercolatorQuery) { + percolatorQueriesRegistry.updatePercolateQuery(engine, index.id()); + } index.endTime(System.nanoTime()); } catch (Throwable ex) { indexingService.postIndex(index, ex); @@ -537,7 +537,12 @@ public class IndexShard extends AbstractIndexShardComponent { if (logger.isTraceEnabled()) { logger.trace("delete [{}]", delete.uid().text()); } - getEngine().delete(delete); + final boolean isPercolatorQuery = percolatorQueriesRegistry.isPercolatorQuery(delete); + Engine engine = getEngine(); + engine.delete(delete); + if (isPercolatorQuery) { + percolatorQueriesRegistry.updatePercolateQuery(engine, delete.id()); + } delete.endTime(System.nanoTime()); } catch (Throwable ex) { indexingService.postDelete(delete, ex); @@ -585,7 +590,17 @@ public class IndexShard extends AbstractIndexShardComponent { } public IndexingStats indexingStats(String... types) { - return indexingService.stats(types); + Engine engine = getEngineOrNull(); + final boolean throttled; + final long throttleTimeInMillis; + if (engine == null) { + throttled = false; + throttleTimeInMillis = 0; + } else { + throttled = engine.isThrottled(); + throttleTimeInMillis = engine.getIndexThrottleTimeInMillis(); + } + return indexingService.stats(throttled, throttleTimeInMillis, types); } public SearchStats searchStats(String... groups) { @@ -1470,7 +1485,7 @@ public class IndexShard extends AbstractIndexShardComponent { }; final Engine.Warmer engineWarmer = (searcher, toLevel) -> warmer.warm(searcher, this, idxSettings, toLevel); return new EngineConfig(shardId, - threadPool, indexingService, indexSettings, engineWarmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, + threadPool, indexSettings, engineWarmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, inactiveTime); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 889cf74a7b4..6905cc015b0 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -71,11 +71,9 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Engine.Searcher; -import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapperForType; -import org.elasticsearch.index.mapper.DocumentMapperParser; import org.elasticsearch.index.mapper.Mapper.BuilderContext; import org.elasticsearch.index.mapper.MapperBuilders; import org.elasticsearch.index.mapper.MapperService; @@ -272,7 +270,7 @@ public class InternalEngineTests extends ESTestCase { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); - EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, INDEX_SETTINGS), indexSettings + EngineConfig config = new EngineConfig(shardId, threadPool, indexSettings , null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), new Engine.EventListener() { @Override @@ -1977,7 +1975,7 @@ public class InternalEngineTests extends ESTestCase { /* create a TranslogConfig that has been created with a different UUID */ TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE); - EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings() + EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexSettings() , null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener() , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5)); diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index 214bc343a8a..3e20a1f58d7 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -48,7 +48,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; @@ -224,7 +223,7 @@ public class ShadowEngineTests extends ESTestCase { public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); - EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, indexSettings), indexSettings + EngineConfig config = new EngineConfig(shardId, threadPool, indexSettings , null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig, iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(null, logger), new Engine.EventListener() { @Override