Make Percolator a first class citizen in IndexShard and prevent premature index searcher access

This commit is contained in:
Simon Willnauer 2015-10-02 15:14:06 +02:00
parent a892a35f40
commit d6b1f4ce6c
12 changed files with 97 additions and 193 deletions

View File

@ -31,7 +31,7 @@ import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.percolator.stats.PercolateStats;
import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;

View File

@ -34,7 +34,7 @@ import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.percolator.stats.PercolateStats;
import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
@ -167,7 +167,7 @@ public class CommonStats implements Streamable, ToXContent {
segments = indexShard.segmentStats();
break;
case Percolate:
percolate = indexShard.shardPercolateService().stats();
percolate = indexShard.percolateStats();
break;
case Translog:
translog = indexShard.translogStats();

View File

@ -324,9 +324,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created");
indicesLifecycle.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
settingsService.addListener(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
success = true;
return indexShard;
} catch (IOException e) {
@ -347,7 +346,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
public synchronized void removeShard(int shardId, String reason) {
final ShardId sId = new ShardId(index, shardId);
final Injector shardInjector;
final IndexShard indexShard;
if (shards.containsKey(shardId) == false) {
return;

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.percolator.stats;
package org.elasticsearch.index.percolator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.percolator;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
@ -27,6 +28,8 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -41,20 +44,18 @@ import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentTypeListener;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.internal.TypeFieldMapper;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.percolator.PercolatorService;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -64,39 +65,35 @@ import java.util.concurrent.atomic.AtomicBoolean;
* Once a document type has been created, the real-time percolator will start to listen to write events and update the
* this registry with queries in real time.
*/
public class PercolatorQueriesRegistry extends AbstractIndexShardComponent implements Closeable{
public final class PercolatorQueriesRegistry extends AbstractIndexShardComponent implements Closeable {
public final String MAP_UNMAPPED_FIELDS_AS_STRING = "index.percolator.map_unmapped_fields_as_string";
// This is a shard level service, but these below are index level service:
private final IndexQueryParserService queryParserService;
private final MapperService mapperService;
private final IndicesLifecycle indicesLifecycle;
private final IndexFieldDataService indexFieldDataService;
private final ShardIndexingService indexingService;
private final ShardPercolateService shardPercolateService;
private final ConcurrentMap<BytesRef, Query> percolateQueries = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
private final ShardLifecycleListener shardLifecycleListener = new ShardLifecycleListener();
private final RealTimePercolatorOperationListener realTimePercolatorOperationListener = new RealTimePercolatorOperationListener();
private final PercolateTypeListener percolateTypeListener = new PercolateTypeListener();
private final AtomicBoolean realTimePercolatorEnabled = new AtomicBoolean(false);
private boolean mapUnmappedFieldsAsString;
private final MeanMetric percolateMetric = new MeanMetric();
private final CounterMetric currentMetric = new CounterMetric();
private final CounterMetric numberOfQueries = new CounterMetric();
public PercolatorQueriesRegistry(ShardId shardId, @IndexSettings Settings indexSettings, IndexQueryParserService queryParserService,
ShardIndexingService indexingService, IndicesLifecycle indicesLifecycle, MapperService mapperService,
IndexFieldDataService indexFieldDataService, ShardPercolateService shardPercolateService) {
ShardIndexingService indexingService, MapperService mapperService,
IndexFieldDataService indexFieldDataService) {
super(shardId, indexSettings);
this.queryParserService = queryParserService;
this.mapperService = mapperService;
this.indicesLifecycle = indicesLifecycle;
this.indexingService = indexingService;
this.indexFieldDataService = indexFieldDataService;
this.shardPercolateService = shardPercolateService;
this.mapUnmappedFieldsAsString = indexSettings.getAsBoolean(MAP_UNMAPPED_FIELDS_AS_STRING, false);
indicesLifecycle.addListener(shardLifecycleListener);
mapperService.addTypeListener(percolateTypeListener);
}
@ -107,7 +104,6 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple
@Override
public void close() {
mapperService.removeTypeListener(percolateTypeListener);
indicesLifecycle.removeListener(shardLifecycleListener);
indexingService.removeListener(realTimePercolatorOperationListener);
clear();
}
@ -116,30 +112,25 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple
percolateQueries.clear();
}
void enableRealTimePercolator() {
public void enableRealTimePercolator() {
if (realTimePercolatorEnabled.compareAndSet(false, true)) {
indexingService.addListener(realTimePercolatorOperationListener);
}
}
void disableRealTimePercolator() {
if (realTimePercolatorEnabled.compareAndSet(true, false)) {
indexingService.removeListener(realTimePercolatorOperationListener);
}
}
public void addPercolateQuery(String idAsString, BytesReference source) {
Query newquery = parsePercolatorDocument(idAsString, source);
BytesRef id = new BytesRef(idAsString);
Query previousQuery = percolateQueries.put(id, newquery);
shardPercolateService.addedQuery(id, previousQuery, newquery);
percolateQueries.put(id, newquery);
numberOfQueries.inc();
}
public void removePercolateQuery(String idAsString) {
BytesRef id = new BytesRef(idAsString);
Query query = percolateQueries.remove(id);
if (query != null) {
shardPercolateService.removedQuery(id, query);
numberOfQueries.dec();
}
}
@ -225,55 +216,27 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple
enableRealTimePercolator();
}
}
}
private class ShardLifecycleListener extends IndicesLifecycle.Listener {
@Override
public void afterIndexShardCreated(IndexShard indexShard) {
if (hasPercolatorType(indexShard)) {
enableRealTimePercolator();
public void loadQueries(IndexReader reader) {
logger.trace("loading percolator queries...");
final int loadedQueries;
try {
Query query = new TermQuery(new Term(TypeFieldMapper.NAME, PercolatorService.TYPE_NAME));
QueriesLoaderCollector queryCollector = new QueriesLoaderCollector(PercolatorQueriesRegistry.this, logger, mapperService, indexFieldDataService);
IndexSearcher indexSearcher = new IndexSearcher(reader);
indexSearcher.setQueryCache(null);
indexSearcher.search(query, queryCollector);
Map<BytesRef, Query> queries = queryCollector.queries();
for (Map.Entry<BytesRef, Query> entry : queries.entrySet()) {
percolateQueries.put(entry.getKey(), entry.getValue());
numberOfQueries.inc();
}
loadedQueries = queries.size();
} catch (Exception e) {
throw new PercolatorException(shardId.index(), "failed to load queries from percolator index", e);
}
@Override
public void beforeIndexShardPostRecovery(IndexShard indexShard) {
if (hasPercolatorType(indexShard)) {
// percolator index has started, fetch what we can from it and initialize the indices
// we have
logger.trace("loading percolator queries for [{}]...", shardId);
int loadedQueries = loadQueries(indexShard);
logger.debug("done loading [{}] percolator queries for [{}]", loadedQueries, shardId);
}
}
private boolean hasPercolatorType(IndexShard indexShard) {
ShardId otherShardId = indexShard.shardId();
return shardId.equals(otherShardId) && mapperService.hasMapping(PercolatorService.TYPE_NAME);
}
private int loadQueries(IndexShard shard) {
shard.refresh("percolator_load_queries");
// NOTE: we acquire the searcher via the engine directly here since this is executed right
// before the shard is marked as POST_RECOVERY
try (Engine.Searcher searcher = shard.acquireSearcher("percolator_load_queries")) {
Query query = new TermQuery(new Term(TypeFieldMapper.NAME, PercolatorService.TYPE_NAME));
QueriesLoaderCollector queryCollector = new QueriesLoaderCollector(PercolatorQueriesRegistry.this, logger, mapperService, indexFieldDataService);
IndexSearcher indexSearcher = new IndexSearcher(searcher.reader());
indexSearcher.setQueryCache(null);
indexSearcher.search(query, queryCollector);
Map<BytesRef, Query> queries = queryCollector.queries();
for (Map.Entry<BytesRef, Query> entry : queries.entrySet()) {
Query previousQuery = percolateQueries.put(entry.getKey(), entry.getValue());
shardPercolateService.addedQuery(entry.getKey(), previousQuery, entry.getValue());
}
return queries.size();
} catch (Exception e) {
throw new PercolatorException(shardId.index(), "failed to load queries from percolator index", e);
}
}
logger.debug("done loading [{}] percolator queries", loadedQueries);
}
private class RealTimePercolatorOperationListener extends IndexingOperationListener {
@ -320,4 +283,35 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple
}
}
}
public void prePercolate() {
currentMetric.inc();
}
public void postPercolate(long tookInNanos) {
currentMetric.dec();
percolateMetric.inc(tookInNanos);
}
/**
* @return The current metrics
*/
public PercolateStats stats() {
return new PercolateStats(percolateMetric.count(), TimeUnit.NANOSECONDS.toMillis(percolateMetric.sum()), currentMetric.count(), -1, numberOfQueries.count());
}
// Enable when a more efficient manner is found for estimating the size of a Lucene query.
/*private static long computeSizeInMemory(HashedBytesRef id, Query query) {
long size = (3 * RamUsageEstimator.NUM_BYTES_INT) + RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + id.bytes.bytes.length;
size += RamEstimator.sizeOf(query);
return size;
}
private static final class RamEstimator {
// we move this into it's own class to exclude it from the forbidden API checks
// it's fine to use here!
static long sizeOf(Query query) {
return RamUsageEstimator.sizeOf(query);
}
}*/
}

View File

@ -1,93 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.percolator.stats;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import java.util.concurrent.TimeUnit;
/**
* Shard level percolator service that maintains percolator metrics:
* <ul>
* <li> total time spent in percolate api
* <li> the current number of percolate requests
* <li> number of registered percolate queries
* </ul>
*/
public class ShardPercolateService extends AbstractIndexShardComponent {
@Inject
public ShardPercolateService(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
}
private final MeanMetric percolateMetric = new MeanMetric();
private final CounterMetric currentMetric = new CounterMetric();
private final CounterMetric numberOfQueries = new CounterMetric();
public void prePercolate() {
currentMetric.inc();
}
public void postPercolate(long tookInNanos) {
currentMetric.dec();
percolateMetric.inc(tookInNanos);
}
public void addedQuery(BytesRef id, Query previousQuery, Query newQuery) {
numberOfQueries.inc();
}
public void removedQuery(BytesRef id, Query query) {
numberOfQueries.dec();
}
/**
* @return The current metrics
*/
public PercolateStats stats() {
return new PercolateStats(percolateMetric.count(), TimeUnit.NANOSECONDS.toMillis(percolateMetric.sum()), currentMetric.count(), -1, numberOfQueries.count());
}
// Enable when a more efficient manner is found for estimating the size of a Lucene query.
/*private static long computeSizeInMemory(HashedBytesRef id, Query query) {
long size = (3 * RamUsageEstimator.NUM_BYTES_INT) + RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + id.bytes.bytes.length;
size += RamEstimator.sizeOf(query);
return size;
}
private static final class RamEstimator {
// we move this into it's own class to exclude it from the forbidden API checks
// it's fine to use here!
static long sizeOf(Query query) {
return RamUsageEstimator.sizeOf(query);
}
}*/
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.shard;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
@ -73,8 +72,8 @@ import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
@ -102,6 +101,7 @@ import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.percolator.PercolatorService;
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
@ -134,7 +134,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
private final ShardRequestCache shardQueryCache;
private final ShardFieldData shardFieldData;
private final PercolatorQueriesRegistry percolatorQueriesRegistry;
private final ShardPercolateService shardPercolateService;
private final TermVectorsService termVectorsService;
private final IndexFieldDataService indexFieldDataService;
private final ShardSuggestMetric shardSuggestMetric = new ShardSuggestMetric();
@ -215,9 +214,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
this.indicesQueryCache = provider.getIndicesQueryCache();
this.shardQueryCache = new ShardRequestCache(shardId, indexSettings);
this.shardFieldData = new ShardFieldData();
this.shardPercolateService = new ShardPercolateService(shardId, indexSettings);
this.indexFieldDataService = provider.getIndexFieldDataService();
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, indicesLifecycle, mapperService, indexFieldDataService, shardPercolateService);
this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings);
state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
@ -245,6 +242,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
this.searcherWrapper = provider.getIndexSearcherWrapper();
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, mapperService, indexFieldDataService);
if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) {
percolatorQueriesRegistry.enableRealTimePercolator();
}
}
public Store store() {
@ -614,10 +615,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
return percolatorQueriesRegistry;
}
public ShardPercolateService shardPercolateService() {
return shardPercolateService;
}
public TranslogStats translogStats() {
return engine().getTranslog().stats();
}
@ -768,8 +765,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
}
}
public IndexShard postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
indicesLifecycle.beforeIndexShardPostRecovery(this);
if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) {
refresh("percolator_load_queries");
try (Engine.Searcher searcher = engine().acquireSearcher("percolator_load_queries")) {
this.percolatorQueriesRegistry.loadQueries(searcher.reader());
}
}
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
@ -1187,6 +1191,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
return engine().getTranslog();
}
public PercolateStats percolateStats() {
return percolatorQueriesRegistry.stats();
}
class EngineRefresher implements Runnable {
@Override
public void run() {

View File

@ -38,7 +38,7 @@ import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.percolator.stats.PercolateStats;
import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;

View File

@ -50,6 +50,7 @@ import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.object.ObjectMapper;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.shard.IndexShard;
@ -89,6 +90,7 @@ import java.util.concurrent.ConcurrentMap;
*/
public class PercolateContext extends SearchContext {
private final PercolatorQueriesRegistry percolateQueryRegistry;
public boolean limit;
private int size;
public boolean doSort;
@ -102,7 +104,6 @@ public class PercolateContext extends SearchContext {
private final PageCacheRecycler pageCacheRecycler;
private final BigArrays bigArrays;
private final ScriptService scriptService;
private final ConcurrentMap<BytesRef, Query> percolateQueries;
private final int numberOfShards;
private final Query aliasFilter;
private final long originNanoTime = System.nanoTime();
@ -133,7 +134,7 @@ public class PercolateContext extends SearchContext {
this.indexService = indexService;
this.fieldDataService = indexService.fieldData();
this.searchShardTarget = searchShardTarget;
this.percolateQueries = indexShard.percolateRegistry().percolateQueries();
this.percolateQueryRegistry = indexShard.percolateRegistry();
this.types = new String[]{request.documentType()};
this.pageCacheRecycler = pageCacheRecycler;
this.bigArrays = bigArrays.withCircuitBreaking();
@ -179,7 +180,7 @@ public class PercolateContext extends SearchContext {
}
public ConcurrentMap<BytesRef, Query> percolateQueries() {
return percolateQueries;
return percolateQueryRegistry.percolateQueries();
}
public Query percolateQuery() {

View File

@ -71,7 +71,7 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
@ -86,7 +86,6 @@ import org.elasticsearch.search.aggregations.AggregationPhase;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.highlight.HighlightField;
import org.elasticsearch.search.highlight.HighlightPhase;
@ -179,9 +178,8 @@ public class PercolatorService extends AbstractComponent {
IndexService percolateIndexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = percolateIndexService.shardSafe(request.shardId().id());
indexShard.readAllowed(); // check if we can read the shard...
ShardPercolateService shardPercolateService = indexShard.shardPercolateService();
shardPercolateService.prePercolate();
PercolatorQueriesRegistry percolateQueryRegistry = indexShard.percolateRegistry();
percolateQueryRegistry.prePercolate();
long startTime = System.nanoTime();
// TODO: The filteringAliases should be looked up at the coordinating node and serialized with all shard request,
@ -255,7 +253,7 @@ public class PercolatorService extends AbstractComponent {
} finally {
SearchContext.removeCurrent();
context.close();
shardPercolateService.postPercolate(System.nanoTime() - startTime);
percolateQueryRegistry.postPercolate(System.nanoTime() - startTime);
}
}

View File

@ -43,7 +43,7 @@ import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.percolator.stats.PercolateStats;
import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.suggest.stats.SuggestStats;

View File

@ -1047,8 +1047,8 @@ public final class InternalTestCluster extends TestCluster {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
try {
CommitStats commitStats = indexShard.commitStats();
CommitStats commitStats = indexShard.commitStats();
if (commitStats != null) { // null if the engine is closed or if the shard is recovering
String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID);
if (syncId != null) {
long liveDocsOnShard = commitStats.getNumDocs();
@ -1058,8 +1058,6 @@ public final class InternalTestCluster extends TestCluster {
docsOnShards.put(syncId, liveDocsOnShard);
}
}
} catch (EngineClosedException e) {
// nothing to do, shard is closed
}
}
}