From d6b1f4ce6cf91da27d175827959574307d6de80a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 2 Oct 2015 15:14:06 +0200 Subject: [PATCH] Make Percolator a first class citizen in IndexShard and prevent premature index searcher access --- .../cluster/stats/ClusterStatsIndices.java | 2 +- .../admin/indices/stats/CommonStats.java | 4 +- .../org/elasticsearch/index/IndexService.java | 4 +- .../{stats => }/PercolateStats.java | 2 +- .../percolator/PercolatorQueriesRegistry.java | 132 +++++++++--------- .../stats/ShardPercolateService.java | 93 ------------ .../elasticsearch/index/shard/IndexShard.java | 26 ++-- .../indices/NodeIndicesStats.java | 2 +- .../percolator/PercolateContext.java | 7 +- .../percolator/PercolatorService.java | 10 +- .../rest/action/cat/RestNodesAction.java | 2 +- .../test/InternalTestCluster.java | 6 +- 12 files changed, 97 insertions(+), 193 deletions(-) rename core/src/main/java/org/elasticsearch/index/percolator/{stats => }/PercolateStats.java (99%) delete mode 100644 core/src/main/java/org/elasticsearch/index/percolator/stats/ShardPercolateService.java diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java index be7a3f0d4de..ff754be2a20 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java @@ -31,7 +31,7 @@ import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.index.cache.query.QueryCacheStats; import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.fielddata.FieldDataStats; -import org.elasticsearch.index.percolator.stats.PercolateStats; +import org.elasticsearch.index.percolator.PercolateStats; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.search.suggest.completion.CompletionStats; diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java index b2f0deeceff..53c07114f96 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java @@ -34,7 +34,7 @@ import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.merge.MergeStats; -import org.elasticsearch.index.percolator.stats.PercolateStats; +import org.elasticsearch.index.percolator.PercolateStats; import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; @@ -167,7 +167,7 @@ public class CommonStats implements Streamable, ToXContent { segments = indexShard.segmentStats(); break; case Percolate: - percolate = indexShard.shardPercolateService().stats(); + percolate = indexShard.percolateStats(); break; case Translog: translog = indexShard.translogStats(); diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 22419ba23cf..574e4551ba6 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -324,9 +324,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created"); indicesLifecycle.afterIndexShardCreated(indexShard); - - shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); settingsService.addListener(indexShard); + shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); success = true; return indexShard; } catch (IOException e) { @@ -347,7 +346,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone public synchronized void removeShard(int shardId, String reason) { final ShardId sId = new ShardId(index, shardId); - final Injector shardInjector; final IndexShard indexShard; if (shards.containsKey(shardId) == false) { return; diff --git a/core/src/main/java/org/elasticsearch/index/percolator/stats/PercolateStats.java b/core/src/main/java/org/elasticsearch/index/percolator/PercolateStats.java similarity index 99% rename from core/src/main/java/org/elasticsearch/index/percolator/stats/PercolateStats.java rename to core/src/main/java/org/elasticsearch/index/percolator/PercolateStats.java index 49f2375a03a..f927a42761f 100644 --- a/core/src/main/java/org/elasticsearch/index/percolator/stats/PercolateStats.java +++ b/core/src/main/java/org/elasticsearch/index/percolator/PercolateStats.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.index.percolator.stats; +package org.elasticsearch.index.percolator; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; diff --git a/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java b/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java index 22f2b3cbe44..d811f1f6e71 100644 --- a/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java +++ b/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.percolator; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; @@ -27,6 +28,8 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -41,20 +44,18 @@ import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentTypeListener; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.internal.TypeFieldMapper; -import org.elasticsearch.index.percolator.stats.ShardPercolateService; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.percolator.PercolatorService; import java.io.Closeable; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -64,39 +65,35 @@ import java.util.concurrent.atomic.AtomicBoolean; * Once a document type has been created, the real-time percolator will start to listen to write events and update the * this registry with queries in real time. */ -public class PercolatorQueriesRegistry extends AbstractIndexShardComponent implements Closeable{ +public final class PercolatorQueriesRegistry extends AbstractIndexShardComponent implements Closeable { public final String MAP_UNMAPPED_FIELDS_AS_STRING = "index.percolator.map_unmapped_fields_as_string"; // This is a shard level service, but these below are index level service: private final IndexQueryParserService queryParserService; private final MapperService mapperService; - private final IndicesLifecycle indicesLifecycle; private final IndexFieldDataService indexFieldDataService; private final ShardIndexingService indexingService; - private final ShardPercolateService shardPercolateService; private final ConcurrentMap percolateQueries = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); - private final ShardLifecycleListener shardLifecycleListener = new ShardLifecycleListener(); private final RealTimePercolatorOperationListener realTimePercolatorOperationListener = new RealTimePercolatorOperationListener(); private final PercolateTypeListener percolateTypeListener = new PercolateTypeListener(); private final AtomicBoolean realTimePercolatorEnabled = new AtomicBoolean(false); private boolean mapUnmappedFieldsAsString; + private final MeanMetric percolateMetric = new MeanMetric(); + private final CounterMetric currentMetric = new CounterMetric(); + private final CounterMetric numberOfQueries = new CounterMetric(); public PercolatorQueriesRegistry(ShardId shardId, @IndexSettings Settings indexSettings, IndexQueryParserService queryParserService, - ShardIndexingService indexingService, IndicesLifecycle indicesLifecycle, MapperService mapperService, - IndexFieldDataService indexFieldDataService, ShardPercolateService shardPercolateService) { + ShardIndexingService indexingService, MapperService mapperService, + IndexFieldDataService indexFieldDataService) { super(shardId, indexSettings); this.queryParserService = queryParserService; this.mapperService = mapperService; - this.indicesLifecycle = indicesLifecycle; this.indexingService = indexingService; this.indexFieldDataService = indexFieldDataService; - this.shardPercolateService = shardPercolateService; this.mapUnmappedFieldsAsString = indexSettings.getAsBoolean(MAP_UNMAPPED_FIELDS_AS_STRING, false); - - indicesLifecycle.addListener(shardLifecycleListener); mapperService.addTypeListener(percolateTypeListener); } @@ -107,7 +104,6 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple @Override public void close() { mapperService.removeTypeListener(percolateTypeListener); - indicesLifecycle.removeListener(shardLifecycleListener); indexingService.removeListener(realTimePercolatorOperationListener); clear(); } @@ -116,30 +112,25 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple percolateQueries.clear(); } - void enableRealTimePercolator() { + public void enableRealTimePercolator() { if (realTimePercolatorEnabled.compareAndSet(false, true)) { indexingService.addListener(realTimePercolatorOperationListener); } } - void disableRealTimePercolator() { - if (realTimePercolatorEnabled.compareAndSet(true, false)) { - indexingService.removeListener(realTimePercolatorOperationListener); - } - } - public void addPercolateQuery(String idAsString, BytesReference source) { Query newquery = parsePercolatorDocument(idAsString, source); BytesRef id = new BytesRef(idAsString); - Query previousQuery = percolateQueries.put(id, newquery); - shardPercolateService.addedQuery(id, previousQuery, newquery); + percolateQueries.put(id, newquery); + numberOfQueries.inc(); + } public void removePercolateQuery(String idAsString) { BytesRef id = new BytesRef(idAsString); Query query = percolateQueries.remove(id); if (query != null) { - shardPercolateService.removedQuery(id, query); + numberOfQueries.dec(); } } @@ -225,55 +216,27 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple enableRealTimePercolator(); } } - } - private class ShardLifecycleListener extends IndicesLifecycle.Listener { - - @Override - public void afterIndexShardCreated(IndexShard indexShard) { - if (hasPercolatorType(indexShard)) { - enableRealTimePercolator(); + public void loadQueries(IndexReader reader) { + logger.trace("loading percolator queries..."); + final int loadedQueries; + try { + Query query = new TermQuery(new Term(TypeFieldMapper.NAME, PercolatorService.TYPE_NAME)); + QueriesLoaderCollector queryCollector = new QueriesLoaderCollector(PercolatorQueriesRegistry.this, logger, mapperService, indexFieldDataService); + IndexSearcher indexSearcher = new IndexSearcher(reader); + indexSearcher.setQueryCache(null); + indexSearcher.search(query, queryCollector); + Map queries = queryCollector.queries(); + for (Map.Entry entry : queries.entrySet()) { + percolateQueries.put(entry.getKey(), entry.getValue()); + numberOfQueries.inc(); } + loadedQueries = queries.size(); + } catch (Exception e) { + throw new PercolatorException(shardId.index(), "failed to load queries from percolator index", e); } - - @Override - public void beforeIndexShardPostRecovery(IndexShard indexShard) { - if (hasPercolatorType(indexShard)) { - // percolator index has started, fetch what we can from it and initialize the indices - // we have - logger.trace("loading percolator queries for [{}]...", shardId); - int loadedQueries = loadQueries(indexShard); - logger.debug("done loading [{}] percolator queries for [{}]", loadedQueries, shardId); - } - } - - private boolean hasPercolatorType(IndexShard indexShard) { - ShardId otherShardId = indexShard.shardId(); - return shardId.equals(otherShardId) && mapperService.hasMapping(PercolatorService.TYPE_NAME); - } - - private int loadQueries(IndexShard shard) { - shard.refresh("percolator_load_queries"); - // NOTE: we acquire the searcher via the engine directly here since this is executed right - // before the shard is marked as POST_RECOVERY - try (Engine.Searcher searcher = shard.acquireSearcher("percolator_load_queries")) { - Query query = new TermQuery(new Term(TypeFieldMapper.NAME, PercolatorService.TYPE_NAME)); - QueriesLoaderCollector queryCollector = new QueriesLoaderCollector(PercolatorQueriesRegistry.this, logger, mapperService, indexFieldDataService); - IndexSearcher indexSearcher = new IndexSearcher(searcher.reader()); - indexSearcher.setQueryCache(null); - indexSearcher.search(query, queryCollector); - Map queries = queryCollector.queries(); - for (Map.Entry entry : queries.entrySet()) { - Query previousQuery = percolateQueries.put(entry.getKey(), entry.getValue()); - shardPercolateService.addedQuery(entry.getKey(), previousQuery, entry.getValue()); - } - return queries.size(); - } catch (Exception e) { - throw new PercolatorException(shardId.index(), "failed to load queries from percolator index", e); - } - } - + logger.debug("done loading [{}] percolator queries", loadedQueries); } private class RealTimePercolatorOperationListener extends IndexingOperationListener { @@ -320,4 +283,35 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple } } } + + public void prePercolate() { + currentMetric.inc(); + } + + public void postPercolate(long tookInNanos) { + currentMetric.dec(); + percolateMetric.inc(tookInNanos); + } + + /** + * @return The current metrics + */ + public PercolateStats stats() { + return new PercolateStats(percolateMetric.count(), TimeUnit.NANOSECONDS.toMillis(percolateMetric.sum()), currentMetric.count(), -1, numberOfQueries.count()); + } + + // Enable when a more efficient manner is found for estimating the size of a Lucene query. + /*private static long computeSizeInMemory(HashedBytesRef id, Query query) { + long size = (3 * RamUsageEstimator.NUM_BYTES_INT) + RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + id.bytes.bytes.length; + size += RamEstimator.sizeOf(query); + return size; + } + + private static final class RamEstimator { + // we move this into it's own class to exclude it from the forbidden API checks + // it's fine to use here! + static long sizeOf(Query query) { + return RamUsageEstimator.sizeOf(query); + } + }*/ } diff --git a/core/src/main/java/org/elasticsearch/index/percolator/stats/ShardPercolateService.java b/core/src/main/java/org/elasticsearch/index/percolator/stats/ShardPercolateService.java deleted file mode 100644 index 80f6bd9be38..00000000000 --- a/core/src/main/java/org/elasticsearch/index/percolator/stats/ShardPercolateService.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.percolator.stats; - -import org.apache.lucene.search.Query; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.metrics.MeanMetric; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.ShardId; - -import java.util.concurrent.TimeUnit; - -/** - * Shard level percolator service that maintains percolator metrics: - *
    - *
  • total time spent in percolate api - *
  • the current number of percolate requests - *
  • number of registered percolate queries - *
- */ -public class ShardPercolateService extends AbstractIndexShardComponent { - - @Inject - public ShardPercolateService(ShardId shardId, @IndexSettings Settings indexSettings) { - super(shardId, indexSettings); - } - - private final MeanMetric percolateMetric = new MeanMetric(); - private final CounterMetric currentMetric = new CounterMetric(); - - private final CounterMetric numberOfQueries = new CounterMetric(); - - public void prePercolate() { - currentMetric.inc(); - } - - public void postPercolate(long tookInNanos) { - currentMetric.dec(); - percolateMetric.inc(tookInNanos); - } - - public void addedQuery(BytesRef id, Query previousQuery, Query newQuery) { - numberOfQueries.inc(); - } - - public void removedQuery(BytesRef id, Query query) { - numberOfQueries.dec(); - } - - /** - * @return The current metrics - */ - public PercolateStats stats() { - return new PercolateStats(percolateMetric.count(), TimeUnit.NANOSECONDS.toMillis(percolateMetric.sum()), currentMetric.count(), -1, numberOfQueries.count()); - } - - // Enable when a more efficient manner is found for estimating the size of a Lucene query. - /*private static long computeSizeInMemory(HashedBytesRef id, Query query) { - long size = (3 * RamUsageEstimator.NUM_BYTES_INT) + RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + id.bytes.bytes.length; - size += RamEstimator.sizeOf(query); - return size; - } - - private static final class RamEstimator { - // we move this into it's own class to exclude it from the forbidden API checks - // it's fine to use here! - static long sizeOf(Query query) { - return RamUsageEstimator.sizeOf(query); - } - }*/ - -} diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ec5db1db46a..3270463ed9c 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.shard; import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.index.*; -import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; import org.apache.lucene.store.AlreadyClosedException; @@ -73,8 +72,8 @@ import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.*; import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.percolator.PercolateStats; import org.elasticsearch.index.percolator.PercolatorQueriesRegistry; -import org.elasticsearch.index.percolator.stats.ShardPercolateService; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; @@ -102,6 +101,7 @@ import org.elasticsearch.indices.InternalIndicesLifecycle; import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.percolator.PercolatorService; import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.threadpool.ThreadPool; @@ -134,7 +134,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett private final ShardRequestCache shardQueryCache; private final ShardFieldData shardFieldData; private final PercolatorQueriesRegistry percolatorQueriesRegistry; - private final ShardPercolateService shardPercolateService; private final TermVectorsService termVectorsService; private final IndexFieldDataService indexFieldDataService; private final ShardSuggestMetric shardSuggestMetric = new ShardSuggestMetric(); @@ -215,9 +214,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett this.indicesQueryCache = provider.getIndicesQueryCache(); this.shardQueryCache = new ShardRequestCache(shardId, indexSettings); this.shardFieldData = new ShardFieldData(); - this.shardPercolateService = new ShardPercolateService(shardId, indexSettings); this.indexFieldDataService = provider.getIndexFieldDataService(); - this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, indicesLifecycle, mapperService, indexFieldDataService, shardPercolateService); this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings); state = IndexShardState.CREATED; this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL); @@ -245,6 +242,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false); this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId); this.searcherWrapper = provider.getIndexSearcherWrapper(); + this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, mapperService, indexFieldDataService); + if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) { + percolatorQueriesRegistry.enableRealTimePercolator(); + } } public Store store() { @@ -614,10 +615,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett return percolatorQueriesRegistry; } - public ShardPercolateService shardPercolateService() { - return shardPercolateService; - } - public TranslogStats translogStats() { return engine().getTranslog().stats(); } @@ -768,8 +765,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett } } + public IndexShard postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { indicesLifecycle.beforeIndexShardPostRecovery(this); + if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) { + refresh("percolator_load_queries"); + try (Engine.Searcher searcher = engine().acquireSearcher("percolator_load_queries")) { + this.percolatorQueriesRegistry.loadQueries(searcher.reader()); + } + } synchronized (mutex) { if (state == IndexShardState.CLOSED) { throw new IndexShardClosedException(shardId); @@ -1187,6 +1191,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett return engine().getTranslog(); } + public PercolateStats percolateStats() { + return percolatorQueriesRegistry.stats(); + } + class EngineRefresher implements Runnable { @Override public void run() { diff --git a/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java b/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java index 747d15a01f9..c8142f3d37a 100644 --- a/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java +++ b/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java @@ -38,7 +38,7 @@ import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.merge.MergeStats; -import org.elasticsearch.index.percolator.stats.PercolateStats; +import org.elasticsearch.index.percolator.PercolateStats; import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; diff --git a/core/src/main/java/org/elasticsearch/percolator/PercolateContext.java b/core/src/main/java/org/elasticsearch/percolator/PercolateContext.java index 190ffc99293..8cb797cdce0 100644 --- a/core/src/main/java/org/elasticsearch/percolator/PercolateContext.java +++ b/core/src/main/java/org/elasticsearch/percolator/PercolateContext.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.object.ObjectMapper; +import org.elasticsearch.index.percolator.PercolatorQueriesRegistry; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.query.ParsedQuery; import org.elasticsearch.index.shard.IndexShard; @@ -89,6 +90,7 @@ import java.util.concurrent.ConcurrentMap; */ public class PercolateContext extends SearchContext { + private final PercolatorQueriesRegistry percolateQueryRegistry; public boolean limit; private int size; public boolean doSort; @@ -102,7 +104,6 @@ public class PercolateContext extends SearchContext { private final PageCacheRecycler pageCacheRecycler; private final BigArrays bigArrays; private final ScriptService scriptService; - private final ConcurrentMap percolateQueries; private final int numberOfShards; private final Query aliasFilter; private final long originNanoTime = System.nanoTime(); @@ -133,7 +134,7 @@ public class PercolateContext extends SearchContext { this.indexService = indexService; this.fieldDataService = indexService.fieldData(); this.searchShardTarget = searchShardTarget; - this.percolateQueries = indexShard.percolateRegistry().percolateQueries(); + this.percolateQueryRegistry = indexShard.percolateRegistry(); this.types = new String[]{request.documentType()}; this.pageCacheRecycler = pageCacheRecycler; this.bigArrays = bigArrays.withCircuitBreaking(); @@ -179,7 +180,7 @@ public class PercolateContext extends SearchContext { } public ConcurrentMap percolateQueries() { - return percolateQueries; + return percolateQueryRegistry.percolateQueries(); } public Query percolateQuery() { diff --git a/core/src/main/java/org/elasticsearch/percolator/PercolatorService.java b/core/src/main/java/org/elasticsearch/percolator/PercolatorService.java index ba4ccaeb25e..ef33e10a810 100644 --- a/core/src/main/java/org/elasticsearch/percolator/PercolatorService.java +++ b/core/src/main/java/org/elasticsearch/percolator/PercolatorService.java @@ -71,7 +71,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.internal.UidFieldMapper; -import org.elasticsearch.index.percolator.stats.ShardPercolateService; +import org.elasticsearch.index.percolator.PercolatorQueriesRegistry; import org.elasticsearch.index.query.ParsedQuery; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; @@ -86,7 +86,6 @@ import org.elasticsearch.search.aggregations.AggregationPhase; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import org.elasticsearch.search.highlight.HighlightField; import org.elasticsearch.search.highlight.HighlightPhase; @@ -179,9 +178,8 @@ public class PercolatorService extends AbstractComponent { IndexService percolateIndexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard indexShard = percolateIndexService.shardSafe(request.shardId().id()); indexShard.readAllowed(); // check if we can read the shard... - - ShardPercolateService shardPercolateService = indexShard.shardPercolateService(); - shardPercolateService.prePercolate(); + PercolatorQueriesRegistry percolateQueryRegistry = indexShard.percolateRegistry(); + percolateQueryRegistry.prePercolate(); long startTime = System.nanoTime(); // TODO: The filteringAliases should be looked up at the coordinating node and serialized with all shard request, @@ -255,7 +253,7 @@ public class PercolatorService extends AbstractComponent { } finally { SearchContext.removeCurrent(); context.close(); - shardPercolateService.postPercolate(System.nanoTime() - startTime); + percolateQueryRegistry.postPercolate(System.nanoTime() - startTime); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java index 8ccf2017a81..337dd41b403 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java @@ -43,7 +43,7 @@ import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.merge.MergeStats; -import org.elasticsearch.index.percolator.stats.PercolateStats; +import org.elasticsearch.index.percolator.PercolateStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.suggest.stats.SuggestStats; diff --git a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java index ab273000ed6..db11c97bb19 100644 --- a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -1047,8 +1047,8 @@ public final class InternalTestCluster extends TestCluster { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - try { - CommitStats commitStats = indexShard.commitStats(); + CommitStats commitStats = indexShard.commitStats(); + if (commitStats != null) { // null if the engine is closed or if the shard is recovering String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID); if (syncId != null) { long liveDocsOnShard = commitStats.getNumDocs(); @@ -1058,8 +1058,6 @@ public final class InternalTestCluster extends TestCluster { docsOnShards.put(syncId, liveDocsOnShard); } } - } catch (EngineClosedException e) { - // nothing to do, shard is closed } } }