Make SearchFactory static class in InternalEngine
Now that lucene provides a way to identify if the warming reader is the first initial opened reader we can detach this class from the enclosing and make it static. This is important since it might access not fully initialized members of the enclosing class since it's initialized and used during constructor invocation.
This commit is contained in:
parent
a843008b17
commit
217fd4443a
|
@ -49,6 +49,7 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.cluster.routing.DjbHashFunction;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.lucene.LoggerInfoStream;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
|
@ -64,6 +65,7 @@ import org.elasticsearch.index.merge.policy.ElasticsearchMergePolicy;
|
|||
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
|
||||
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
||||
import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
|
@ -143,7 +145,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
throttle = new IndexThrottle();
|
||||
this.searcherFactory = new SearchFactory(engineConfig);
|
||||
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
|
||||
final Long committedTranslogId;
|
||||
try {
|
||||
writer = createWriter();
|
||||
|
@ -1050,10 +1052,19 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
|
||||
class SearchFactory extends EngineSearcherFactory {
|
||||
final static class SearchFactory extends EngineSearcherFactory {
|
||||
|
||||
SearchFactory(EngineConfig engineConfig) {
|
||||
private final IndicesWarmer warmer;
|
||||
private final ShardId shardId;
|
||||
private final ESLogger logger;
|
||||
private final AtomicBoolean isEngineClosed;
|
||||
|
||||
SearchFactory(ESLogger logger, AtomicBoolean isEngineClosed, EngineConfig engineConfig) {
|
||||
super(engineConfig);
|
||||
warmer = engineConfig.getWarmer();
|
||||
shardId = engineConfig.getShardId();
|
||||
this.logger = logger;
|
||||
this.isEngineClosed = isEngineClosed;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1065,36 +1076,34 @@ public class InternalEngine extends Engine {
|
|||
IndexSearcher newSearcher = null;
|
||||
boolean closeNewSearcher = false;
|
||||
try {
|
||||
if (searcherManager == null) {
|
||||
if (previousReader == null) {
|
||||
// we are starting up - no writer active so we can't acquire a searcher.
|
||||
newSearcher = searcher;
|
||||
} else {
|
||||
try (final Searcher currentSearcher = acquireSearcher("search_factory")) {
|
||||
// figure out the newSearcher, with only the new readers that are relevant for us
|
||||
List<IndexReader> readers = Lists.newArrayList();
|
||||
for (LeafReaderContext newReaderContext : searcher.getIndexReader().leaves()) {
|
||||
if (isMergedSegment(newReaderContext.reader())) {
|
||||
// merged segments are already handled by IndexWriterConfig.setMergedSegmentWarmer
|
||||
continue;
|
||||
}
|
||||
boolean found = false;
|
||||
for (LeafReaderContext currentReaderContext : currentSearcher.reader().leaves()) {
|
||||
if (currentReaderContext.reader().getCoreCacheKey().equals(newReaderContext.reader().getCoreCacheKey())) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
readers.add(newReaderContext.reader());
|
||||
// figure out the newSearcher, with only the new readers that are relevant for us
|
||||
List<IndexReader> readers = Lists.newArrayList();
|
||||
for (LeafReaderContext newReaderContext : reader.leaves()) {
|
||||
if (isMergedSegment(newReaderContext.reader())) {
|
||||
// merged segments are already handled by IndexWriterConfig.setMergedSegmentWarmer
|
||||
continue;
|
||||
}
|
||||
boolean found = false;
|
||||
for (LeafReaderContext currentReaderContext : previousReader.leaves()) {
|
||||
if (currentReaderContext.reader().getCoreCacheKey().equals(newReaderContext.reader().getCoreCacheKey())) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!readers.isEmpty()) {
|
||||
// we don't want to close the inner readers, just increase ref on them
|
||||
IndexReader newReader = new MultiReader(readers.toArray(new IndexReader[readers.size()]), false);
|
||||
newSearcher = super.newSearcher(newReader, null);
|
||||
closeNewSearcher = true;
|
||||
if (!found) {
|
||||
readers.add(newReaderContext.reader());
|
||||
}
|
||||
}
|
||||
if (!readers.isEmpty()) {
|
||||
// we don't want to close the inner readers, just increase ref on them
|
||||
IndexReader newReader = new MultiReader(readers.toArray(new IndexReader[readers.size()]), false);
|
||||
newSearcher = super.newSearcher(newReader, null);
|
||||
closeNewSearcher = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (newSearcher != null) {
|
||||
|
@ -1103,7 +1112,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", searcher)));
|
||||
} catch (Throwable e) {
|
||||
if (isClosed.get() == false) {
|
||||
if (isEngineClosed.get() == false) {
|
||||
logger.warn("failed to prepare/warm", e);
|
||||
}
|
||||
} finally {
|
||||
|
|
Loading…
Reference in New Issue