From e997342da48fe23fac8f4eb4982ea1d650b4ec38 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 20 Jul 2015 10:05:44 +0200 Subject: [PATCH] Added IndexSearcherWrapper extension point. This extension point allows one IndexSearcherWrapper instance to intercept the searcher from the Engine before it is used for a opertion. --- .../elasticsearch/index/engine/Engine.java | 10 +- .../index/engine/EngineConfig.java | 10 +- .../index/engine/IndexSearcherWrapper.java | 45 +++++++++ .../engine/IndexSearcherWrappingService.java | 94 +++++++++++++++++++ .../elasticsearch/index/shard/IndexShard.java | 6 +- .../index/shard/IndexShardModule.java | 7 ++ .../index/shard/ShadowIndexShard.java | 5 +- .../index/engine/InternalEngineTests.java | 40 ++++++-- .../index/engine/ShadowEngineTests.java | 3 +- 9 files changed, 197 insertions(+), 23 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java create mode 100644 core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 33ba72e65ca..14181cc4c31 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.engine; import com.google.common.base.Preconditions; - import org.apache.lucene.index.*; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; @@ -45,7 +44,6 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; -import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.Uid; @@ -57,11 +55,7 @@ import org.elasticsearch.index.translog.Translog; import java.io.Closeable; import java.io.IOException; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; @@ -288,7 +282,7 @@ public abstract class Engine implements Closeable { try { final Searcher retVal = newSearcher(source, searcher, manager); success = true; - return retVal; + return config().getWrappingService().wrap(engineConfig, retVal); } finally { if (!success) { manager.release(searcher); diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 576e6dadb0d..778509a97dd 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -22,7 +22,6 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.MergePolicy; -import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.similarities.Similarity; @@ -35,6 +34,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.indexing.ShardIndexingService; +import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; @@ -77,6 +77,7 @@ public final class EngineConfig { private final boolean forceNewTranslog; private final QueryCache queryCache; private final QueryCachingPolicy queryCachingPolicy; + private final IndexSearcherWrappingService wrappingService; /** * Index setting for index concurrency / number of threadstates in the indexwriter. @@ -143,7 +144,7 @@ public final class EngineConfig { Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener, - TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig) { + TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, IndexSearcherWrappingService wrappingService, TranslogConfig translogConfig) { this.shardId = shardId; this.indexSettings = indexSettings; this.threadPool = threadPool; @@ -157,6 +158,7 @@ public final class EngineConfig { this.similarity = similarity; this.codecService = codecService; this.failedEngineListener = failedEngineListener; + this.wrappingService = wrappingService; this.optimizeAutoGenerateId = indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false); this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush); this.indexConcurrency = indexSettings.getAsInt(EngineConfig.INDEX_CONCURRENCY_SETTING, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65))); @@ -421,6 +423,10 @@ public final class EngineConfig { return queryCachingPolicy; } + public IndexSearcherWrappingService getWrappingService() { + return wrappingService; + } + /** * Returns the translog config for this engine */ diff --git a/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java b/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java new file mode 100644 index 00000000000..c8a75f447b7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.search.IndexSearcher; + +/** + * Extension point to add custom functionality at request time to the {@link DirectoryReader} + * and {@link IndexSearcher} managed by the {@link Engine}. + */ +public interface IndexSearcherWrapper { + + /** + * @param reader The provided directory reader to be wrapped to add custom functionality + * @return a new directory reader wrapping the provided directory reader or if no wrapping was performed + * the provided directory reader + */ + DirectoryReader wrap(DirectoryReader reader); + + /** + * @param searcher The provided index searcher to be wrapped to add custom functionality + * @return a new index searcher wrapping the provided index searcher or if no wrapping was performed + * the provided index searcher + */ + IndexSearcher wrap(IndexSearcher searcher) throws EngineException; + +} diff --git a/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java b/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java new file mode 100644 index 00000000000..a0ea90e024e --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java @@ -0,0 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.search.IndexSearcher; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.engine.Engine.Searcher; + +import java.util.Set; + +/** + * Service responsible for wrapping the {@link DirectoryReader} and {@link IndexSearcher} of a {@link Searcher} via the + * configured {@link IndexSearcherWrapper} instance. This allows custom functionally to be added the {@link Searcher} + * before being used to do an operation (search, get, field stats etc.) + */ +// TODO: This needs extension point is a bit hacky now, because the IndexSearch from the engine can only be wrapped once, +// if we allowed the IndexSearcher to be wrapped multiple times then a custom IndexSearcherWrapper needs have good +// control over its location in the wrapping chain +public final class IndexSearcherWrappingService { + + private final IndexSearcherWrapper wrapper; + + // for unit tests: + IndexSearcherWrappingService() { + this.wrapper = null; + } + + @Inject + // Use a Set parameter here, because constructor parameter can't be optional + // and I prefer to keep the `wrapper` field final. + public IndexSearcherWrappingService(Set wrappers) { + if (wrappers.size() > 1) { + throw new IllegalStateException("wrapping of the index searcher by more than one wrappers is forbidden, found the following wrappers [" + wrappers + "]"); + } + if (wrappers.isEmpty()) { + this.wrapper = null; + } else { + this.wrapper = wrappers.iterator().next(); + } + } + + /** + * If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher + * gets wrapped and a new {@link Searcher} instances is returned, otherwise the provided {@link Searcher} is returned. + * + * This is invoked each time a {@link Searcher} is requested to do an operation. (for example search) + */ + public Searcher wrap(EngineConfig engineConfig, final Searcher engineSearcher) throws EngineException { + if (wrapper == null) { + return engineSearcher; + } + + DirectoryReader reader = wrapper.wrap((DirectoryReader) engineSearcher.reader()); + IndexSearcher innerIndexSearcher = new IndexSearcher(reader); + innerIndexSearcher.setQueryCache(engineConfig.getQueryCache()); + innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy()); + innerIndexSearcher.setSimilarity(engineConfig.getSimilarity()); + // TODO: Right now IndexSearcher isn't wrapper friendly, when it becomes wrapper friendly we should revise this extension point + // For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten + // This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times + IndexSearcher indexSearcher = wrapper.wrap(innerIndexSearcher); + if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) { + return engineSearcher; + } else { + return new Engine.Searcher(engineSearcher.source(), indexSearcher) { + + @Override + public void close() throws ElasticsearchException { + engineSearcher.close(); + } + }; + } + } + +} diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index fa75975b981..6ee6504bf42 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -168,6 +168,7 @@ public class IndexShard extends AbstractIndexShardComponent { protected volatile IndexShardState state; protected final AtomicReference currentEngineReference = new AtomicReference<>(); protected final EngineFactory engineFactory; + private final IndexSearcherWrappingService wrappingService; @Nullable private RecoveryState recoveryState; @@ -197,12 +198,13 @@ public class IndexShard extends AbstractIndexShardComponent { IndicesQueryCache indicesQueryCache, ShardPercolateService shardPercolateService, CodecService codecService, ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, @Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, EngineFactory factory, - ClusterService clusterService, ShardPath path, BigArrays bigArrays) { + ClusterService clusterService, ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) { super(shardId, indexSettingsService.getSettings()); this.codecService = codecService; this.warmer = warmer; this.deletionPolicy = deletionPolicy; this.similarityService = similarityService; + this.wrappingService = wrappingService; Preconditions.checkNotNull(store, "Store must be provided to the index shard"); Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard"); this.engineFactory = factory; @@ -1361,7 +1363,7 @@ public class IndexShard extends AbstractIndexShardComponent { }; return new EngineConfig(shardId, threadPool, indexingService, indexSettingsService.indexSettings(), warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, - mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig); + mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, wrappingService, translogConfig); } private static class IndexShardOperationCounter extends AbstractRefCounted { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java index c445ce442da..870cc4eee99 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java @@ -21,7 +21,10 @@ package org.elasticsearch.index.shard; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.IndexSearcherWrapper; +import org.elasticsearch.index.engine.IndexSearcherWrappingService; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.percolator.stats.ShardPercolateService; @@ -73,6 +76,10 @@ public class IndexShardModule extends AbstractModule { bind(StoreRecoveryService.class).asEagerSingleton(); bind(ShardPercolateService.class).asEagerSingleton(); bind(ShardTermVectorsService.class).asEagerSingleton(); + bind(IndexSearcherWrappingService.class).asEagerSingleton(); + // this injects an empty set in IndexSearcherWrappingService, otherwise guice can't construct IndexSearcherWrappingService + Multibinder multibinder + = Multibinder.newSetBinder(binder(), IndexSearcherWrapper.class); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 82eb458f6cb..7224e701751 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; +import org.elasticsearch.index.engine.IndexSearcherWrappingService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; @@ -66,13 +67,13 @@ public final class ShadowIndexShard extends IndexShard { IndexService indexService, @Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, EngineFactory factory, ClusterService clusterService, - ShardPath path, BigArrays bigArrays) throws IOException { + ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException { super(shardId, indexSettingsService, indicesLifecycle, store, storeRecoveryService, threadPool, mapperService, queryParserService, indexCache, indexAliasesService, indicesQueryCache, shardPercolateService, codecService, termVectorsService, indexFieldDataService, indexService, warmer, deletionPolicy, similarityService, - factory, clusterService, path, bigArrays); + factory, clusterService, path, bigArrays, wrappingService); } /** diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 45a730a50f0..a64fddb503b 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -236,15 +236,15 @@ public class InternalEngineTests extends ElasticsearchTestCase { } - protected InternalEngine createEngine(Store store, Path translogPath) { - return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy()); + protected InternalEngine createEngine(Store store, Path translogPath, IndexSearcherWrapper... wrappers) { + return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy(), wrappers); } - protected InternalEngine createEngine(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) { - return new InternalEngine(config(indexSettings, store, translogPath, mergeSchedulerConfig, mergePolicy), false); + protected InternalEngine createEngine(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy, IndexSearcherWrapper... wrappers) { + return new InternalEngine(config(indexSettings, store, translogPath, mergeSchedulerConfig, mergePolicy, wrappers), false); } - public EngineConfig config(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) { + public EngineConfig config(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy, IndexSearcherWrapper... wrappers) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool); @@ -255,7 +255,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) { // we don't need to notify anybody in this test } - }, new TranslogHandler(shardId.index().getName()), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); + }, new TranslogHandler(shardId.index().getName()), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(new HashSet<>(Arrays.asList(wrappers))), translogConfig); return config; } @@ -493,6 +493,32 @@ public class InternalEngineTests extends ElasticsearchTestCase { ; } + @Test + public void testIndexSearcherWrapper() throws Exception { + final AtomicInteger counter = new AtomicInteger(); + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { + + @Override + public DirectoryReader wrap(DirectoryReader reader) { + counter.incrementAndGet(); + return reader; + } + + @Override + public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { + counter.incrementAndGet(); + return searcher; + } + }; + Store store = createStore(); + Path translog = createTempDir("translog-test"); + InternalEngine engine = createEngine(store, translog, wrapper); + Engine.Searcher searcher = engine.acquireSearcher("test"); + assertThat(counter.get(), equalTo(2)); + searcher.close(); + IOUtils.close(store, engine); + } + @Test public void testSimpleOperations() throws Exception { Engine.Searcher searchResult = engine.acquireSearcher("test"); @@ -1985,7 +2011,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings() , null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(), config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getFailedEngineListener() - , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); + , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(), translogConfig); try { new InternalEngine(brokenConfig, false); diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index 08210004591..bf89a5a7baf 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -47,7 +47,6 @@ import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; -import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; @@ -226,7 +225,7 @@ public class ShadowEngineTests extends ElasticsearchTestCase { @Override public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) { // we don't need to notify anybody in this test - }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); + }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(), translogConfig); return config; }