Added IndexSearcherWrapper extension point.
This extension point allows one IndexSearcherWrapper instance to intercept the searcher from the Engine before it is used for a opertion.
This commit is contained in:
parent
52113e7527
commit
e997342da4
|
@ -20,7 +20,6 @@
|
||||||
package org.elasticsearch.index.engine;
|
package org.elasticsearch.index.engine;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import org.apache.lucene.index.*;
|
import org.apache.lucene.index.*;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
import org.apache.lucene.search.Query;
|
import org.apache.lucene.search.Query;
|
||||||
|
@ -45,7 +44,6 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||||
import org.elasticsearch.index.VersionType;
|
import org.elasticsearch.index.VersionType;
|
||||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
|
||||||
import org.elasticsearch.index.mapper.ParseContext.Document;
|
import org.elasticsearch.index.mapper.ParseContext.Document;
|
||||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||||
import org.elasticsearch.index.mapper.Uid;
|
import org.elasticsearch.index.mapper.Uid;
|
||||||
|
@ -57,11 +55,7 @@ import org.elasticsearch.index.translog.Translog;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.*;
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
|
@ -288,7 +282,7 @@ public abstract class Engine implements Closeable {
|
||||||
try {
|
try {
|
||||||
final Searcher retVal = newSearcher(source, searcher, manager);
|
final Searcher retVal = newSearcher(source, searcher, manager);
|
||||||
success = true;
|
success = true;
|
||||||
return retVal;
|
return config().getWrappingService().wrap(engineConfig, retVal);
|
||||||
} finally {
|
} finally {
|
||||||
if (!success) {
|
if (!success) {
|
||||||
manager.release(searcher);
|
manager.release(searcher);
|
||||||
|
|
|
@ -22,7 +22,6 @@ import org.apache.lucene.analysis.Analyzer;
|
||||||
import org.apache.lucene.codecs.Codec;
|
import org.apache.lucene.codecs.Codec;
|
||||||
import org.apache.lucene.index.IndexWriterConfig;
|
import org.apache.lucene.index.IndexWriterConfig;
|
||||||
import org.apache.lucene.index.MergePolicy;
|
import org.apache.lucene.index.MergePolicy;
|
||||||
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
|
||||||
import org.apache.lucene.search.QueryCache;
|
import org.apache.lucene.search.QueryCache;
|
||||||
import org.apache.lucene.search.QueryCachingPolicy;
|
import org.apache.lucene.search.QueryCachingPolicy;
|
||||||
import org.apache.lucene.search.similarities.Similarity;
|
import org.apache.lucene.search.similarities.Similarity;
|
||||||
|
@ -35,6 +34,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.index.codec.CodecService;
|
import org.elasticsearch.index.codec.CodecService;
|
||||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||||
import org.elasticsearch.index.indexing.ShardIndexingService;
|
import org.elasticsearch.index.indexing.ShardIndexingService;
|
||||||
|
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
|
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
|
@ -77,6 +77,7 @@ public final class EngineConfig {
|
||||||
private final boolean forceNewTranslog;
|
private final boolean forceNewTranslog;
|
||||||
private final QueryCache queryCache;
|
private final QueryCache queryCache;
|
||||||
private final QueryCachingPolicy queryCachingPolicy;
|
private final QueryCachingPolicy queryCachingPolicy;
|
||||||
|
private final IndexSearcherWrappingService wrappingService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Index setting for index concurrency / number of threadstates in the indexwriter.
|
* Index setting for index concurrency / number of threadstates in the indexwriter.
|
||||||
|
@ -143,7 +144,7 @@ public final class EngineConfig {
|
||||||
Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
|
Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
|
||||||
MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer,
|
MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer,
|
||||||
Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener,
|
Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener,
|
||||||
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig) {
|
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, IndexSearcherWrappingService wrappingService, TranslogConfig translogConfig) {
|
||||||
this.shardId = shardId;
|
this.shardId = shardId;
|
||||||
this.indexSettings = indexSettings;
|
this.indexSettings = indexSettings;
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
|
@ -157,6 +158,7 @@ public final class EngineConfig {
|
||||||
this.similarity = similarity;
|
this.similarity = similarity;
|
||||||
this.codecService = codecService;
|
this.codecService = codecService;
|
||||||
this.failedEngineListener = failedEngineListener;
|
this.failedEngineListener = failedEngineListener;
|
||||||
|
this.wrappingService = wrappingService;
|
||||||
this.optimizeAutoGenerateId = indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false);
|
this.optimizeAutoGenerateId = indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false);
|
||||||
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
|
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
|
||||||
this.indexConcurrency = indexSettings.getAsInt(EngineConfig.INDEX_CONCURRENCY_SETTING, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65)));
|
this.indexConcurrency = indexSettings.getAsInt(EngineConfig.INDEX_CONCURRENCY_SETTING, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65)));
|
||||||
|
@ -421,6 +423,10 @@ public final class EngineConfig {
|
||||||
return queryCachingPolicy;
|
return queryCachingPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public IndexSearcherWrappingService getWrappingService() {
|
||||||
|
return wrappingService;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the translog config for this engine
|
* Returns the translog config for this engine
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.index.engine;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extension point to add custom functionality at request time to the {@link DirectoryReader}
|
||||||
|
* and {@link IndexSearcher} managed by the {@link Engine}.
|
||||||
|
*/
|
||||||
|
public interface IndexSearcherWrapper {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param reader The provided directory reader to be wrapped to add custom functionality
|
||||||
|
* @return a new directory reader wrapping the provided directory reader or if no wrapping was performed
|
||||||
|
* the provided directory reader
|
||||||
|
*/
|
||||||
|
DirectoryReader wrap(DirectoryReader reader);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param searcher The provided index searcher to be wrapped to add custom functionality
|
||||||
|
* @return a new index searcher wrapping the provided index searcher or if no wrapping was performed
|
||||||
|
* the provided index searcher
|
||||||
|
*/
|
||||||
|
IndexSearcher wrap(IndexSearcher searcher) throws EngineException;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,94 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.index.engine;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.index.engine.Engine.Searcher;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Service responsible for wrapping the {@link DirectoryReader} and {@link IndexSearcher} of a {@link Searcher} via the
|
||||||
|
* configured {@link IndexSearcherWrapper} instance. This allows custom functionally to be added the {@link Searcher}
|
||||||
|
* before being used to do an operation (search, get, field stats etc.)
|
||||||
|
*/
|
||||||
|
// TODO: This needs extension point is a bit hacky now, because the IndexSearch from the engine can only be wrapped once,
|
||||||
|
// if we allowed the IndexSearcher to be wrapped multiple times then a custom IndexSearcherWrapper needs have good
|
||||||
|
// control over its location in the wrapping chain
|
||||||
|
public final class IndexSearcherWrappingService {
|
||||||
|
|
||||||
|
private final IndexSearcherWrapper wrapper;
|
||||||
|
|
||||||
|
// for unit tests:
|
||||||
|
IndexSearcherWrappingService() {
|
||||||
|
this.wrapper = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
// Use a Set parameter here, because constructor parameter can't be optional
|
||||||
|
// and I prefer to keep the `wrapper` field final.
|
||||||
|
public IndexSearcherWrappingService(Set<IndexSearcherWrapper> wrappers) {
|
||||||
|
if (wrappers.size() > 1) {
|
||||||
|
throw new IllegalStateException("wrapping of the index searcher by more than one wrappers is forbidden, found the following wrappers [" + wrappers + "]");
|
||||||
|
}
|
||||||
|
if (wrappers.isEmpty()) {
|
||||||
|
this.wrapper = null;
|
||||||
|
} else {
|
||||||
|
this.wrapper = wrappers.iterator().next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher
|
||||||
|
* gets wrapped and a new {@link Searcher} instances is returned, otherwise the provided {@link Searcher} is returned.
|
||||||
|
*
|
||||||
|
* This is invoked each time a {@link Searcher} is requested to do an operation. (for example search)
|
||||||
|
*/
|
||||||
|
public Searcher wrap(EngineConfig engineConfig, final Searcher engineSearcher) throws EngineException {
|
||||||
|
if (wrapper == null) {
|
||||||
|
return engineSearcher;
|
||||||
|
}
|
||||||
|
|
||||||
|
DirectoryReader reader = wrapper.wrap((DirectoryReader) engineSearcher.reader());
|
||||||
|
IndexSearcher innerIndexSearcher = new IndexSearcher(reader);
|
||||||
|
innerIndexSearcher.setQueryCache(engineConfig.getQueryCache());
|
||||||
|
innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy());
|
||||||
|
innerIndexSearcher.setSimilarity(engineConfig.getSimilarity());
|
||||||
|
// TODO: Right now IndexSearcher isn't wrapper friendly, when it becomes wrapper friendly we should revise this extension point
|
||||||
|
// For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten
|
||||||
|
// This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times
|
||||||
|
IndexSearcher indexSearcher = wrapper.wrap(innerIndexSearcher);
|
||||||
|
if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) {
|
||||||
|
return engineSearcher;
|
||||||
|
} else {
|
||||||
|
return new Engine.Searcher(engineSearcher.source(), indexSearcher) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws ElasticsearchException {
|
||||||
|
engineSearcher.close();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -168,6 +168,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
protected volatile IndexShardState state;
|
protected volatile IndexShardState state;
|
||||||
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
|
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
|
||||||
protected final EngineFactory engineFactory;
|
protected final EngineFactory engineFactory;
|
||||||
|
private final IndexSearcherWrappingService wrappingService;
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private RecoveryState recoveryState;
|
private RecoveryState recoveryState;
|
||||||
|
@ -197,12 +198,13 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
IndicesQueryCache indicesQueryCache, ShardPercolateService shardPercolateService, CodecService codecService,
|
IndicesQueryCache indicesQueryCache, ShardPercolateService shardPercolateService, CodecService codecService,
|
||||||
ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService,
|
ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService,
|
||||||
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, EngineFactory factory,
|
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, EngineFactory factory,
|
||||||
ClusterService clusterService, ShardPath path, BigArrays bigArrays) {
|
ClusterService clusterService, ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) {
|
||||||
super(shardId, indexSettingsService.getSettings());
|
super(shardId, indexSettingsService.getSettings());
|
||||||
this.codecService = codecService;
|
this.codecService = codecService;
|
||||||
this.warmer = warmer;
|
this.warmer = warmer;
|
||||||
this.deletionPolicy = deletionPolicy;
|
this.deletionPolicy = deletionPolicy;
|
||||||
this.similarityService = similarityService;
|
this.similarityService = similarityService;
|
||||||
|
this.wrappingService = wrappingService;
|
||||||
Preconditions.checkNotNull(store, "Store must be provided to the index shard");
|
Preconditions.checkNotNull(store, "Store must be provided to the index shard");
|
||||||
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard");
|
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard");
|
||||||
this.engineFactory = factory;
|
this.engineFactory = factory;
|
||||||
|
@ -1361,7 +1363,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
};
|
};
|
||||||
return new EngineConfig(shardId,
|
return new EngineConfig(shardId,
|
||||||
threadPool, indexingService, indexSettingsService.indexSettings(), warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
|
threadPool, indexingService, indexSettingsService.indexSettings(), warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
|
||||||
mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
|
mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, wrappingService, translogConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class IndexShardOperationCounter extends AbstractRefCounted {
|
private static class IndexShardOperationCounter extends AbstractRefCounted {
|
||||||
|
|
|
@ -21,7 +21,10 @@ package org.elasticsearch.index.shard;
|
||||||
|
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.inject.AbstractModule;
|
import org.elasticsearch.common.inject.AbstractModule;
|
||||||
|
import org.elasticsearch.common.inject.multibindings.Multibinder;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.index.engine.IndexSearcherWrapper;
|
||||||
|
import org.elasticsearch.index.engine.IndexSearcherWrappingService;
|
||||||
import org.elasticsearch.index.engine.EngineFactory;
|
import org.elasticsearch.index.engine.EngineFactory;
|
||||||
import org.elasticsearch.index.engine.InternalEngineFactory;
|
import org.elasticsearch.index.engine.InternalEngineFactory;
|
||||||
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
|
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
|
||||||
|
@ -73,6 +76,10 @@ public class IndexShardModule extends AbstractModule {
|
||||||
bind(StoreRecoveryService.class).asEagerSingleton();
|
bind(StoreRecoveryService.class).asEagerSingleton();
|
||||||
bind(ShardPercolateService.class).asEagerSingleton();
|
bind(ShardPercolateService.class).asEagerSingleton();
|
||||||
bind(ShardTermVectorsService.class).asEagerSingleton();
|
bind(ShardTermVectorsService.class).asEagerSingleton();
|
||||||
|
bind(IndexSearcherWrappingService.class).asEagerSingleton();
|
||||||
|
// this injects an empty set in IndexSearcherWrappingService, otherwise guice can't construct IndexSearcherWrappingService
|
||||||
|
Multibinder<IndexSearcherWrapper> multibinder
|
||||||
|
= Multibinder.newSetBinder(binder(), IndexSearcherWrapper.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.index.aliases.IndexAliasesService;
|
||||||
import org.elasticsearch.index.cache.IndexCache;
|
import org.elasticsearch.index.cache.IndexCache;
|
||||||
import org.elasticsearch.index.codec.CodecService;
|
import org.elasticsearch.index.codec.CodecService;
|
||||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||||
|
import org.elasticsearch.index.engine.IndexSearcherWrappingService;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.engine.EngineConfig;
|
import org.elasticsearch.index.engine.EngineConfig;
|
||||||
import org.elasticsearch.index.engine.EngineFactory;
|
import org.elasticsearch.index.engine.EngineFactory;
|
||||||
|
@ -66,13 +67,13 @@ public final class ShadowIndexShard extends IndexShard {
|
||||||
IndexService indexService, @Nullable IndicesWarmer warmer,
|
IndexService indexService, @Nullable IndicesWarmer warmer,
|
||||||
SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService,
|
SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService,
|
||||||
EngineFactory factory, ClusterService clusterService,
|
EngineFactory factory, ClusterService clusterService,
|
||||||
ShardPath path, BigArrays bigArrays) throws IOException {
|
ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException {
|
||||||
super(shardId, indexSettingsService, indicesLifecycle, store, storeRecoveryService,
|
super(shardId, indexSettingsService, indicesLifecycle, store, storeRecoveryService,
|
||||||
threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
|
threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
|
||||||
indicesQueryCache, shardPercolateService, codecService,
|
indicesQueryCache, shardPercolateService, codecService,
|
||||||
termVectorsService, indexFieldDataService, indexService,
|
termVectorsService, indexFieldDataService, indexService,
|
||||||
warmer, deletionPolicy, similarityService,
|
warmer, deletionPolicy, similarityService,
|
||||||
factory, clusterService, path, bigArrays);
|
factory, clusterService, path, bigArrays, wrappingService);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -236,15 +236,15 @@ public class InternalEngineTests extends ElasticsearchTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected InternalEngine createEngine(Store store, Path translogPath) {
|
protected InternalEngine createEngine(Store store, Path translogPath, IndexSearcherWrapper... wrappers) {
|
||||||
return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy());
|
return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy(), wrappers);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected InternalEngine createEngine(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
|
protected InternalEngine createEngine(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy, IndexSearcherWrapper... wrappers) {
|
||||||
return new InternalEngine(config(indexSettings, store, translogPath, mergeSchedulerConfig, mergePolicy), false);
|
return new InternalEngine(config(indexSettings, store, translogPath, mergeSchedulerConfig, mergePolicy, wrappers), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public EngineConfig config(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
|
public EngineConfig config(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy, IndexSearcherWrapper... wrappers) {
|
||||||
IndexWriterConfig iwc = newIndexWriterConfig();
|
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||||
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
|
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
|
||||||
|
|
||||||
|
@ -255,7 +255,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
|
||||||
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
|
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
|
||||||
// we don't need to notify anybody in this test
|
// we don't need to notify anybody in this test
|
||||||
}
|
}
|
||||||
}, new TranslogHandler(shardId.index().getName()), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
|
}, new TranslogHandler(shardId.index().getName()), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(new HashSet<>(Arrays.asList(wrappers))), translogConfig);
|
||||||
|
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
|
@ -493,6 +493,32 @@ public class InternalEngineTests extends ElasticsearchTestCase {
|
||||||
;
|
;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIndexSearcherWrapper() throws Exception {
|
||||||
|
final AtomicInteger counter = new AtomicInteger();
|
||||||
|
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DirectoryReader wrap(DirectoryReader reader) {
|
||||||
|
counter.incrementAndGet();
|
||||||
|
return reader;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
|
||||||
|
counter.incrementAndGet();
|
||||||
|
return searcher;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Store store = createStore();
|
||||||
|
Path translog = createTempDir("translog-test");
|
||||||
|
InternalEngine engine = createEngine(store, translog, wrapper);
|
||||||
|
Engine.Searcher searcher = engine.acquireSearcher("test");
|
||||||
|
assertThat(counter.get(), equalTo(2));
|
||||||
|
searcher.close();
|
||||||
|
IOUtils.close(store, engine);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleOperations() throws Exception {
|
public void testSimpleOperations() throws Exception {
|
||||||
Engine.Searcher searchResult = engine.acquireSearcher("test");
|
Engine.Searcher searchResult = engine.acquireSearcher("test");
|
||||||
|
@ -1985,7 +2011,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
|
||||||
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings()
|
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings()
|
||||||
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),
|
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),
|
||||||
config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getFailedEngineListener()
|
config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getFailedEngineListener()
|
||||||
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
|
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(), translogConfig);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
new InternalEngine(brokenConfig, false);
|
new InternalEngine(brokenConfig, false);
|
||||||
|
|
|
@ -47,7 +47,6 @@ import org.elasticsearch.index.mapper.ParseContext;
|
||||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||||
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
|
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
|
||||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||||
import org.elasticsearch.index.settings.IndexSettingsService;
|
|
||||||
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.shard.ShardUtils;
|
import org.elasticsearch.index.shard.ShardUtils;
|
||||||
|
@ -226,7 +225,7 @@ public class ShadowEngineTests extends ElasticsearchTestCase {
|
||||||
@Override
|
@Override
|
||||||
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
|
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
|
||||||
// we don't need to notify anybody in this test
|
// we don't need to notify anybody in this test
|
||||||
}}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
|
}}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(), translogConfig);
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue