diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 2ff8c37db3b..ad0d64766d0 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -41,6 +41,8 @@ import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.support.LoggerMessageFormat; import org.elasticsearch.common.lucene.Lucene; @@ -717,11 +719,20 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett public Engine.Searcher acquireSearcher(String source) { readAllowed(); - Engine engine = getEngine(); + final Engine engine = getEngine(); + final Engine.Searcher searcher = engine.acquireSearcher(source); + boolean success = false; try { - return searcherWrapper == null ? engine.acquireSearcher(source) : searcherWrapper.wrap(engineConfig, engine.acquireSearcher(source)); + final Engine.Searcher wrappedSearcher = searcherWrapper == null ? searcher : searcherWrapper.wrap(engineConfig, searcher); + assert wrappedSearcher != null; + success = true; + return wrappedSearcher; } catch (IOException ex) { throw new ElasticsearchException("failed to wrap searcher", ex); + } finally { + if (success == false) { + Releasables.close(success, searcher); + } } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index f7b14192b49..baf4ee60775 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -95,6 +95,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.metadata.IndexMetaData.*; import static org.elasticsearch.common.settings.Settings.settingsBuilder; @@ -911,10 +912,6 @@ public class IndexShardTests extends ESSingleNodeTestCase { search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10); assertEquals(search.totalHits, 1); } - - ShardRouting routing = new ShardRouting(shard.routingEntry()); - shard.close("simon says", true); - IndexServicesProvider indexServices = indexService.getIndexServices(); IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { @Override public DirectoryReader wrap(DirectoryReader reader) throws IOException { @@ -927,16 +924,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { } }; - IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController()); - IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider); + IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper); try { - ShardRoutingHelper.reinit(routing); - newShard.updateRoutingEntry(routing, false); - DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); - assertTrue(newShard.recoverFromStore(routing, localNode)); - routing = new ShardRouting(routing); - ShardRoutingHelper.moveToStarted(routing); - newShard.updateRoutingEntry(routing, true); try (Engine.Searcher searcher = newShard.acquireSearcher("test")) { TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10); assertEquals(search.totalHits, 0); @@ -948,7 +937,34 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader); getResult.release(); + } finally { + newShard.close("just do it", randomBoolean()); + } + } + public void testSearcherWrapperWorksWithGlobaOrdinals() throws IOException { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexService("test"); + IndexShard shard = indexService.getShardOrNull(0); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get(); + client().prepareIndex("test", "test", "1").setSource("{\"foobar\" : \"bar\"}").setRefresh(true).get(); + + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { + @Override + public DirectoryReader wrap(DirectoryReader reader) throws IOException { + return new FieldMaskingReader("foo", reader); + } + + @Override + public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException { + return searcher; + } + }; + + IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper); + try { // test global ordinals are evicted MappedFieldType foo = newShard.mapperService().indexName("foo"); IndexFieldData.Global ifd = shard.indexFieldDataService().getForField(foo); @@ -970,7 +986,53 @@ public class IndexShardTests extends ESSingleNodeTestCase { } finally { newShard.close("just do it", randomBoolean()); } + } + public void testSearchIsReleaseIfWrapperFails() throws IOException { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexService("test"); + IndexShard shard = indexService.getShardOrNull(0); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get(); + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { + @Override + public DirectoryReader wrap(DirectoryReader reader) throws IOException { + throw new RuntimeException("boom"); + } + + @Override + public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException { + return searcher; + } + }; + + IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper); + try { + newShard.acquireSearcher("test"); + fail("exception expected"); + } catch (RuntimeException ex) { + // + } finally { + newShard.close("just do it", randomBoolean()); + } + // test will fail due to unclosed searchers if the searcher is not released + } + + private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper) throws IOException { + ShardRouting routing = new ShardRouting(shard.routingEntry()); + shard.close("simon says", true); + IndexServicesProvider indexServices = indexService.getIndexServices(); + IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController()); + IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider); + ShardRoutingHelper.reinit(routing); + newShard.updateRoutingEntry(routing, false); + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); + assertTrue(newShard.recoverFromStore(routing, localNode)); + routing = new ShardRouting(routing); + ShardRoutingHelper.moveToStarted(routing); + newShard.updateRoutingEntry(routing, true); + return newShard; } private static class FieldMaskingReader extends FilterDirectoryReader {