Merge pull request #14303 from s1monw/open_up_extension_points
Open up QueryCache and SearcherWrapper extension points
This commit is contained in:
commit
82ebd28c14
|
@ -19,10 +19,9 @@
|
||||||
|
|
||||||
package org.elasticsearch.index;
|
package org.elasticsearch.index;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.SetOnce;
|
||||||
import org.elasticsearch.common.inject.AbstractModule;
|
import org.elasticsearch.common.inject.AbstractModule;
|
||||||
import org.elasticsearch.common.inject.util.Providers;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.ExtensionPoint;
|
|
||||||
import org.elasticsearch.index.cache.IndexCache;
|
import org.elasticsearch.index.cache.IndexCache;
|
||||||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
||||||
import org.elasticsearch.index.cache.query.QueryCache;
|
import org.elasticsearch.index.cache.query.QueryCache;
|
||||||
|
@ -58,7 +57,7 @@ import java.util.function.Consumer;
|
||||||
* <li>Settings update listener - Custom settings update listener can be registered via {@link #addIndexSettingsListener(Consumer)}</li>
|
* <li>Settings update listener - Custom settings update listener can be registered via {@link #addIndexSettingsListener(Consumer)}</li>
|
||||||
* </ul>
|
* </ul>
|
||||||
*/
|
*/
|
||||||
public class IndexModule extends AbstractModule {
|
public final class IndexModule extends AbstractModule {
|
||||||
|
|
||||||
public static final String STORE_TYPE = "index.store.type";
|
public static final String STORE_TYPE = "index.store.type";
|
||||||
public static final String SIMILARITY_SETTINGS_PREFIX = "index.similarity";
|
public static final String SIMILARITY_SETTINGS_PREFIX = "index.similarity";
|
||||||
|
@ -72,7 +71,7 @@ public class IndexModule extends AbstractModule {
|
||||||
private final IndicesQueryCache indicesQueryCache;
|
private final IndicesQueryCache indicesQueryCache;
|
||||||
// pkg private so tests can mock
|
// pkg private so tests can mock
|
||||||
Class<? extends EngineFactory> engineFactoryImpl = InternalEngineFactory.class;
|
Class<? extends EngineFactory> engineFactoryImpl = InternalEngineFactory.class;
|
||||||
Class<? extends IndexSearcherWrapper> indexSearcherWrapper = null;
|
private SetOnce<IndexSearcherWrapperFactory> indexSearcherWrapper = new SetOnce<>();
|
||||||
private final Set<Consumer<Settings>> settingsConsumers = new HashSet<>();
|
private final Set<Consumer<Settings>> settingsConsumers = new HashSet<>();
|
||||||
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
|
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
|
||||||
private IndexEventListener listener;
|
private IndexEventListener listener;
|
||||||
|
@ -180,7 +179,7 @@ public class IndexModule extends AbstractModule {
|
||||||
* @param name the providers / caches name
|
* @param name the providers / caches name
|
||||||
* @param provider the provider instance
|
* @param provider the provider instance
|
||||||
*/
|
*/
|
||||||
void registerQueryCache(String name, BiFunction<IndexSettings, IndicesQueryCache, QueryCache> provider) { // pkg private - no need to expose this
|
public void registerQueryCache(String name, BiFunction<IndexSettings, IndicesQueryCache, QueryCache> provider) {
|
||||||
if (provider == null) {
|
if (provider == null) {
|
||||||
throw new IllegalArgumentException("provider must not be null");
|
throw new IllegalArgumentException("provider must not be null");
|
||||||
}
|
}
|
||||||
|
@ -190,6 +189,14 @@ public class IndexModule extends AbstractModule {
|
||||||
queryCaches.put(name, provider);
|
queryCaches.put(name, provider);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets a {@link org.elasticsearch.index.IndexModule.IndexSearcherWrapperFactory} that is called once the IndexService is fully constructed.
|
||||||
|
* Note: this method can only be called once per index. Multiple wrappers are not supported.
|
||||||
|
*/
|
||||||
|
public void setSearcherWrapper(IndexSearcherWrapperFactory indexSearcherWrapperFactory) {
|
||||||
|
this.indexSearcherWrapper.set(indexSearcherWrapperFactory);
|
||||||
|
}
|
||||||
|
|
||||||
public IndexEventListener freeze() {
|
public IndexEventListener freeze() {
|
||||||
// TODO somehow we need to make this pkg private...
|
// TODO somehow we need to make this pkg private...
|
||||||
if (listener == null) {
|
if (listener == null) {
|
||||||
|
@ -210,11 +217,7 @@ public class IndexModule extends AbstractModule {
|
||||||
@Override
|
@Override
|
||||||
protected void configure() {
|
protected void configure() {
|
||||||
bind(EngineFactory.class).to(engineFactoryImpl).asEagerSingleton();
|
bind(EngineFactory.class).to(engineFactoryImpl).asEagerSingleton();
|
||||||
if (indexSearcherWrapper == null) {
|
bind(IndexSearcherWrapperFactory.class).toInstance(indexSearcherWrapper.get() == null ? (shard) -> null : indexSearcherWrapper.get());
|
||||||
bind(IndexSearcherWrapper.class).toProvider(Providers.of(null));
|
|
||||||
} else {
|
|
||||||
bind(IndexSearcherWrapper.class).to(indexSearcherWrapper).asEagerSingleton();
|
|
||||||
}
|
|
||||||
bind(IndexEventListener.class).toInstance(freeze());
|
bind(IndexEventListener.class).toInstance(freeze());
|
||||||
bind(IndexService.class).asEagerSingleton();
|
bind(IndexService.class).asEagerSingleton();
|
||||||
bind(IndexServicesProvider.class).asEagerSingleton();
|
bind(IndexServicesProvider.class).asEagerSingleton();
|
||||||
|
@ -267,4 +270,14 @@ public class IndexModule extends AbstractModule {
|
||||||
return getSettingsKey().equals(setting);
|
return getSettingsKey().equals(setting);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Factory for creating new {@link IndexSearcherWrapper} instances
|
||||||
|
*/
|
||||||
|
public interface IndexSearcherWrapperFactory {
|
||||||
|
/**
|
||||||
|
* Returns a new IndexSearcherWrapper. This method is called once per index per node
|
||||||
|
*/
|
||||||
|
IndexSearcherWrapper newWrapper(final IndexService indexService);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,6 +75,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
||||||
private final IndicesService indicesServices;
|
private final IndicesService indicesServices;
|
||||||
private final IndexServicesProvider indexServicesProvider;
|
private final IndexServicesProvider indexServicesProvider;
|
||||||
private final IndexStore indexStore;
|
private final IndexStore indexStore;
|
||||||
|
private final IndexSearcherWrapper searcherWrapper;
|
||||||
private volatile Map<Integer, IndexShard> shards = emptyMap();
|
private volatile Map<Integer, IndexShard> shards = emptyMap();
|
||||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||||
private final AtomicBoolean deleted = new AtomicBoolean(false);
|
private final AtomicBoolean deleted = new AtomicBoolean(false);
|
||||||
|
@ -88,7 +89,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
||||||
IndicesService indicesServices,
|
IndicesService indicesServices,
|
||||||
IndexServicesProvider indexServicesProvider,
|
IndexServicesProvider indexServicesProvider,
|
||||||
IndexStore indexStore,
|
IndexStore indexStore,
|
||||||
IndexEventListener eventListener) {
|
IndexEventListener eventListener,
|
||||||
|
IndexModule.IndexSearcherWrapperFactory wrapperFactory) {
|
||||||
super(indexSettings);
|
super(indexSettings);
|
||||||
this.indexSettings = indexSettings;
|
this.indexSettings = indexSettings;
|
||||||
this.analysisService = analysisService;
|
this.analysisService = analysisService;
|
||||||
|
@ -101,6 +103,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
||||||
this.indexStore = indexStore;
|
this.indexStore = indexStore;
|
||||||
indexFieldData.setListener(new FieldDataCacheListener(this));
|
indexFieldData.setListener(new FieldDataCacheListener(this));
|
||||||
bitSetFilterCache.setListener(new BitsetCacheListener(this));
|
bitSetFilterCache.setListener(new BitsetCacheListener(this));
|
||||||
|
this.searcherWrapper = wrapperFactory.newWrapper(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int numberOfShards() {
|
public int numberOfShards() {
|
||||||
|
@ -265,9 +268,9 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
||||||
(primary && IndexMetaData.isOnSharedFilesystem(indexSettings));
|
(primary && IndexMetaData.isOnSharedFilesystem(indexSettings));
|
||||||
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> indexServicesProvider.getIndicesQueryCache().onClose(shardId)));
|
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> indexServicesProvider.getIndicesQueryCache().onClose(shardId)));
|
||||||
if (useShadowEngine(primary, indexSettings)) {
|
if (useShadowEngine(primary, indexSettings)) {
|
||||||
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexServicesProvider);
|
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, searcherWrapper, indexServicesProvider);
|
||||||
} else {
|
} else {
|
||||||
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexServicesProvider);
|
indexShard = new IndexShard(shardId, this.indexSettings, path, store, searcherWrapper, indexServicesProvider);
|
||||||
}
|
}
|
||||||
|
|
||||||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
|
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
|
||||||
|
|
|
@ -56,12 +56,11 @@ public final class IndexServicesProvider {
|
||||||
private final SimilarityService similarityService;
|
private final SimilarityService similarityService;
|
||||||
private final EngineFactory factory;
|
private final EngineFactory factory;
|
||||||
private final BigArrays bigArrays;
|
private final BigArrays bigArrays;
|
||||||
private final IndexSearcherWrapper indexSearcherWrapper;
|
|
||||||
private final IndexingMemoryController indexingMemoryController;
|
private final IndexingMemoryController indexingMemoryController;
|
||||||
private final IndexEventListener listener;
|
private final IndexEventListener listener;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public IndexServicesProvider(IndexEventListener listener, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, BigArrays bigArrays, @Nullable IndexSearcherWrapper indexSearcherWrapper, IndexingMemoryController indexingMemoryController) {
|
public IndexServicesProvider(IndexEventListener listener, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, BigArrays bigArrays, IndexingMemoryController indexingMemoryController) {
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
this.mapperService = mapperService;
|
this.mapperService = mapperService;
|
||||||
|
@ -75,7 +74,6 @@ public final class IndexServicesProvider {
|
||||||
this.similarityService = similarityService;
|
this.similarityService = similarityService;
|
||||||
this.factory = factory;
|
this.factory = factory;
|
||||||
this.bigArrays = bigArrays;
|
this.bigArrays = bigArrays;
|
||||||
this.indexSearcherWrapper = indexSearcherWrapper;
|
|
||||||
this.indexingMemoryController = indexingMemoryController;
|
this.indexingMemoryController = indexingMemoryController;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,13 +124,7 @@ public final class IndexServicesProvider {
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
public BigArrays getBigArrays() {
|
public BigArrays getBigArrays() { return bigArrays; }
|
||||||
return bigArrays;
|
|
||||||
}
|
|
||||||
|
|
||||||
public IndexSearcherWrapper getIndexSearcherWrapper() {
|
|
||||||
return indexSearcherWrapper;
|
|
||||||
}
|
|
||||||
|
|
||||||
public IndexingMemoryController getIndexingMemoryController() {
|
public IndexingMemoryController getIndexingMemoryController() {
|
||||||
return indexingMemoryController;
|
return indexingMemoryController;
|
||||||
|
|
|
@ -196,7 +196,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
private final IndexingMemoryController indexingMemoryController;
|
private final IndexingMemoryController indexingMemoryController;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexServicesProvider provider) {
|
public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexSearcherWrapper indexSearcherWrapper, IndexServicesProvider provider) {
|
||||||
super(shardId, indexSettings);
|
super(shardId, indexSettings);
|
||||||
this.idxSettings = indexSettings;
|
this.idxSettings = indexSettings;
|
||||||
this.codecService = provider.getCodecService();
|
this.codecService = provider.getCodecService();
|
||||||
|
@ -249,7 +249,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
this.disableFlush = this.indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
|
this.disableFlush = this.indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
|
||||||
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
|
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
|
||||||
|
|
||||||
this.searcherWrapper = provider.getIndexSearcherWrapper();
|
this.searcherWrapper = indexSearcherWrapper;
|
||||||
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, provider.getQueryParserService(), indexingService, mapperService, indexFieldDataService);
|
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, provider.getQueryParserService(), indexingService, mapperService, indexFieldDataService);
|
||||||
if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) {
|
if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) {
|
||||||
percolatorQueriesRegistry.enableRealTimePercolator();
|
percolatorQueriesRegistry.enableRealTimePercolator();
|
||||||
|
|
|
@ -37,8 +37,8 @@ import org.elasticsearch.index.translog.TranslogStats;
|
||||||
*/
|
*/
|
||||||
public final class ShadowIndexShard extends IndexShard {
|
public final class ShadowIndexShard extends IndexShard {
|
||||||
|
|
||||||
public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexServicesProvider provider) throws IOException {
|
public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexSearcherWrapper wrapper, IndexServicesProvider provider) throws IOException {
|
||||||
super(shardId, indexSettings, path, store, provider);
|
super(shardId, indexSettings, path, store, wrapper, provider);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -57,9 +57,9 @@ public class IndexModuleTests extends ModuleTestCase {
|
||||||
final Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
final Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||||
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings, Collections.EMPTY_LIST);
|
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings, Collections.EMPTY_LIST);
|
||||||
IndexModule module = new IndexModule(indexSettings, null, null, warmer);
|
IndexModule module = new IndexModule(indexSettings, null, null, warmer);
|
||||||
assertInstanceBinding(module, IndexSearcherWrapper.class, (x) -> x == null);
|
assertInstanceBinding(module, IndexModule.IndexSearcherWrapperFactory.class, (x) -> x.newWrapper(null) == null);
|
||||||
module.indexSearcherWrapper = Wrapper.class;
|
module.setSearcherWrapper((s) -> new Wrapper());
|
||||||
assertBinding(module, IndexSearcherWrapper.class, Wrapper.class);
|
assertInstanceBinding(module, IndexModule.IndexSearcherWrapperFactory.class, (x) -> x.newWrapper(null) instanceof Wrapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testEngineFactoryBound() {
|
public void testEngineFactoryBound() {
|
||||||
|
|
|
@ -1026,8 +1026,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
||||||
ShardRouting routing = new ShardRouting(shard.routingEntry());
|
ShardRouting routing = new ShardRouting(shard.routingEntry());
|
||||||
shard.close("simon says", true);
|
shard.close("simon says", true);
|
||||||
IndexServicesProvider indexServices = indexService.getIndexServices();
|
IndexServicesProvider indexServices = indexService.getIndexServices();
|
||||||
IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndexEventListener(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController());
|
IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndexEventListener(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), indexServices.getIndexingMemoryController());
|
||||||
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), newProvider);
|
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), wrapper, newProvider);
|
||||||
ShardRoutingHelper.reinit(routing);
|
ShardRoutingHelper.reinit(routing);
|
||||||
newShard.updateRoutingEntry(routing, false);
|
newShard.updateRoutingEntry(routing, false);
|
||||||
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||||
|
|
Loading…
Reference in New Issue