From c133bec4bd378766837e5c63acc2859a44f2183f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 14 Oct 2015 10:44:57 +0200 Subject: [PATCH] Ensure searcher is release if wrapping fails Today we leak an index searcher if we fail to wrap the seacher in IndexShard. This commit ensures that the seacher is released if the wrapper throws an exception. This commit also restructures some test to be more atomic and only test a single feature / attribute of the wrapper. --- .../elasticsearch/index/shard/IndexShard.java | 15 +++- .../index/shard/IndexShardTests.java | 88 ++++++++++++++++--- 2 files changed, 88 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 2ff8c37db3b..ad0d64766d0 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -41,6 +41,8 @@ import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.support.LoggerMessageFormat; import org.elasticsearch.common.lucene.Lucene; @@ -717,11 +719,20 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett public Engine.Searcher acquireSearcher(String source) { readAllowed(); - Engine engine = getEngine(); + final Engine engine = getEngine(); + final Engine.Searcher searcher = engine.acquireSearcher(source); + boolean success = false; try { - return searcherWrapper == null ? engine.acquireSearcher(source) : searcherWrapper.wrap(engineConfig, engine.acquireSearcher(source)); + final Engine.Searcher wrappedSearcher = searcherWrapper == null ? searcher : searcherWrapper.wrap(engineConfig, searcher); + assert wrappedSearcher != null; + success = true; + return wrappedSearcher; } catch (IOException ex) { throw new ElasticsearchException("failed to wrap searcher", ex); + } finally { + if (success == false) { + Releasables.close(success, searcher); + } } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index f7b14192b49..baf4ee60775 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -95,6 +95,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.metadata.IndexMetaData.*; import static org.elasticsearch.common.settings.Settings.settingsBuilder; @@ -911,10 +912,6 @@ public class IndexShardTests extends ESSingleNodeTestCase { search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10); assertEquals(search.totalHits, 1); } - - ShardRouting routing = new ShardRouting(shard.routingEntry()); - shard.close("simon says", true); - IndexServicesProvider indexServices = indexService.getIndexServices(); IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { @Override public DirectoryReader wrap(DirectoryReader reader) throws IOException { @@ -927,16 +924,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { } }; - IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController()); - IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider); + IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper); try { - ShardRoutingHelper.reinit(routing); - newShard.updateRoutingEntry(routing, false); - DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); - assertTrue(newShard.recoverFromStore(routing, localNode)); - routing = new ShardRouting(routing); - ShardRoutingHelper.moveToStarted(routing); - newShard.updateRoutingEntry(routing, true); try (Engine.Searcher searcher = newShard.acquireSearcher("test")) { TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10); assertEquals(search.totalHits, 0); @@ -948,7 +937,34 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader); getResult.release(); + } finally { + newShard.close("just do it", randomBoolean()); + } + } + public void testSearcherWrapperWorksWithGlobaOrdinals() throws IOException { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexService("test"); + IndexShard shard = indexService.getShardOrNull(0); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get(); + client().prepareIndex("test", "test", "1").setSource("{\"foobar\" : \"bar\"}").setRefresh(true).get(); + + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { + @Override + public DirectoryReader wrap(DirectoryReader reader) throws IOException { + return new FieldMaskingReader("foo", reader); + } + + @Override + public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException { + return searcher; + } + }; + + IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper); + try { // test global ordinals are evicted MappedFieldType foo = newShard.mapperService().indexName("foo"); IndexFieldData.Global ifd = shard.indexFieldDataService().getForField(foo); @@ -970,7 +986,53 @@ public class IndexShardTests extends ESSingleNodeTestCase { } finally { newShard.close("just do it", randomBoolean()); } + } + public void testSearchIsReleaseIfWrapperFails() throws IOException { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexService("test"); + IndexShard shard = indexService.getShardOrNull(0); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get(); + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { + @Override + public DirectoryReader wrap(DirectoryReader reader) throws IOException { + throw new RuntimeException("boom"); + } + + @Override + public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException { + return searcher; + } + }; + + IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper); + try { + newShard.acquireSearcher("test"); + fail("exception expected"); + } catch (RuntimeException ex) { + // + } finally { + newShard.close("just do it", randomBoolean()); + } + // test will fail due to unclosed searchers if the searcher is not released + } + + private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper) throws IOException { + ShardRouting routing = new ShardRouting(shard.routingEntry()); + shard.close("simon says", true); + IndexServicesProvider indexServices = indexService.getIndexServices(); + IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController()); + IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider); + ShardRoutingHelper.reinit(routing); + newShard.updateRoutingEntry(routing, false); + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); + assertTrue(newShard.recoverFromStore(routing, localNode)); + routing = new ShardRouting(routing); + ShardRoutingHelper.moveToStarted(routing); + newShard.updateRoutingEntry(routing, true); + return newShard; } private static class FieldMaskingReader extends FilterDirectoryReader {