Ensure searcher is release if wrapping fails

Today we leak an index searcher if we fail to wrap the seacher in
IndexShard. This commit ensures that the seacher is released if the wrapper
throws an exception.

This commit also restructures some test to be more atomic and only test a single
feature / attribute of the wrapper.
This commit is contained in:
Simon Willnauer 2015-10-14 10:44:57 +02:00
parent 9fa4cc92e5
commit c133bec4bd
2 changed files with 88 additions and 15 deletions

View File

@ -41,6 +41,8 @@ import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
import org.elasticsearch.common.lucene.Lucene;
@ -717,11 +719,20 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
public Engine.Searcher acquireSearcher(String source) {
readAllowed();
Engine engine = getEngine();
final Engine engine = getEngine();
final Engine.Searcher searcher = engine.acquireSearcher(source);
boolean success = false;
try {
return searcherWrapper == null ? engine.acquireSearcher(source) : searcherWrapper.wrap(engineConfig, engine.acquireSearcher(source));
final Engine.Searcher wrappedSearcher = searcherWrapper == null ? searcher : searcherWrapper.wrap(engineConfig, searcher);
assert wrappedSearcher != null;
success = true;
return wrappedSearcher;
} catch (IOException ex) {
throw new ElasticsearchException("failed to wrap searcher", ex);
} finally {
if (success == false) {
Releasables.close(success, searcher);
}
}
}

View File

@ -95,6 +95,7 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@ -911,10 +912,6 @@ public class IndexShardTests extends ESSingleNodeTestCase {
search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10);
assertEquals(search.totalHits, 1);
}
ShardRouting routing = new ShardRouting(shard.routingEntry());
shard.close("simon says", true);
IndexServicesProvider indexServices = indexService.getIndexServices();
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
@Override
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
@ -927,16 +924,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
}
};
IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController());
IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider);
IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
try {
ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(newShard.recoverFromStore(routing, localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
try (Engine.Searcher searcher = newShard.acquireSearcher("test")) {
TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10);
assertEquals(search.totalHits, 0);
@ -948,7 +937,34 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader
assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader);
getResult.release();
} finally {
newShard.close("just do it", randomBoolean());
}
}
public void testSearcherWrapperWorksWithGlobaOrdinals() throws IOException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexService("test");
IndexShard shard = indexService.getShardOrNull(0);
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
client().prepareIndex("test", "test", "1").setSource("{\"foobar\" : \"bar\"}").setRefresh(true).get();
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
@Override
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
return new FieldMaskingReader("foo", reader);
}
@Override
public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException {
return searcher;
}
};
IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
try {
// test global ordinals are evicted
MappedFieldType foo = newShard.mapperService().indexName("foo");
IndexFieldData.Global ifd = shard.indexFieldDataService().getForField(foo);
@ -970,7 +986,53 @@ public class IndexShardTests extends ESSingleNodeTestCase {
} finally {
newShard.close("just do it", randomBoolean());
}
}
public void testSearchIsReleaseIfWrapperFails() throws IOException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexService("test");
IndexShard shard = indexService.getShardOrNull(0);
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
@Override
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
throw new RuntimeException("boom");
}
@Override
public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException {
return searcher;
}
};
IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
try {
newShard.acquireSearcher("test");
fail("exception expected");
} catch (RuntimeException ex) {
//
} finally {
newShard.close("just do it", randomBoolean());
}
// test will fail due to unclosed searchers if the searcher is not released
}
private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper) throws IOException {
ShardRouting routing = new ShardRouting(shard.routingEntry());
shard.close("simon says", true);
IndexServicesProvider indexServices = indexService.getIndexServices();
IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController());
IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider);
ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(newShard.recoverFromStore(routing, localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
return newShard;
}
private static class FieldMaskingReader extends FilterDirectoryReader {