Merge pull request #14107 from s1monw/release_searcher_on_wrapper_failure
Ensure searcher is release if wrapping fails
This commit is contained in:
commit
461ce98ff5
|
@ -41,6 +41,8 @@ import org.elasticsearch.common.Booleans;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
|
@ -717,11 +719,20 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||
|
||||
public Engine.Searcher acquireSearcher(String source) {
|
||||
readAllowed();
|
||||
Engine engine = getEngine();
|
||||
final Engine engine = getEngine();
|
||||
final Engine.Searcher searcher = engine.acquireSearcher(source);
|
||||
boolean success = false;
|
||||
try {
|
||||
return searcherWrapper == null ? engine.acquireSearcher(source) : searcherWrapper.wrap(engineConfig, engine.acquireSearcher(source));
|
||||
final Engine.Searcher wrappedSearcher = searcherWrapper == null ? searcher : searcherWrapper.wrap(engineConfig, searcher);
|
||||
assert wrappedSearcher != null;
|
||||
success = true;
|
||||
return wrappedSearcher;
|
||||
} catch (IOException ex) {
|
||||
throw new ElasticsearchException("failed to wrap searcher", ex);
|
||||
} finally {
|
||||
if (success == false) {
|
||||
Releasables.close(success, searcher);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -95,6 +95,7 @@ import java.util.concurrent.BrokenBarrierException;
|
|||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
|
||||
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
||||
|
@ -911,10 +912,6 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10);
|
||||
assertEquals(search.totalHits, 1);
|
||||
}
|
||||
|
||||
ShardRouting routing = new ShardRouting(shard.routingEntry());
|
||||
shard.close("simon says", true);
|
||||
IndexServicesProvider indexServices = indexService.getIndexServices();
|
||||
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
|
||||
@Override
|
||||
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
|
||||
|
@ -927,16 +924,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
}
|
||||
};
|
||||
|
||||
IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController());
|
||||
IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider);
|
||||
IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
|
||||
try {
|
||||
ShardRoutingHelper.reinit(routing);
|
||||
newShard.updateRoutingEntry(routing, false);
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||
assertTrue(newShard.recoverFromStore(routing, localNode));
|
||||
routing = new ShardRouting(routing);
|
||||
ShardRoutingHelper.moveToStarted(routing);
|
||||
newShard.updateRoutingEntry(routing, true);
|
||||
try (Engine.Searcher searcher = newShard.acquireSearcher("test")) {
|
||||
TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10);
|
||||
assertEquals(search.totalHits, 0);
|
||||
|
@ -948,7 +937,34 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader
|
||||
assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader);
|
||||
getResult.release();
|
||||
} finally {
|
||||
newShard.close("just do it", randomBoolean());
|
||||
}
|
||||
}
|
||||
|
||||
public void testSearcherWrapperWorksWithGlobaOrdinals() throws IOException {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService indexService = indicesService.indexService("test");
|
||||
IndexShard shard = indexService.getShardOrNull(0);
|
||||
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
|
||||
client().prepareIndex("test", "test", "1").setSource("{\"foobar\" : \"bar\"}").setRefresh(true).get();
|
||||
|
||||
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
|
||||
@Override
|
||||
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
|
||||
return new FieldMaskingReader("foo", reader);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException {
|
||||
return searcher;
|
||||
}
|
||||
};
|
||||
|
||||
IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
|
||||
try {
|
||||
// test global ordinals are evicted
|
||||
MappedFieldType foo = newShard.mapperService().indexName("foo");
|
||||
IndexFieldData.Global ifd = shard.indexFieldDataService().getForField(foo);
|
||||
|
@ -970,7 +986,53 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
} finally {
|
||||
newShard.close("just do it", randomBoolean());
|
||||
}
|
||||
}
|
||||
|
||||
public void testSearchIsReleaseIfWrapperFails() throws IOException {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService indexService = indicesService.indexService("test");
|
||||
IndexShard shard = indexService.getShardOrNull(0);
|
||||
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
|
||||
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
|
||||
@Override
|
||||
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
|
||||
throw new RuntimeException("boom");
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException {
|
||||
return searcher;
|
||||
}
|
||||
};
|
||||
|
||||
IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
|
||||
try {
|
||||
newShard.acquireSearcher("test");
|
||||
fail("exception expected");
|
||||
} catch (RuntimeException ex) {
|
||||
//
|
||||
} finally {
|
||||
newShard.close("just do it", randomBoolean());
|
||||
}
|
||||
// test will fail due to unclosed searchers if the searcher is not released
|
||||
}
|
||||
|
||||
private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper) throws IOException {
|
||||
ShardRouting routing = new ShardRouting(shard.routingEntry());
|
||||
shard.close("simon says", true);
|
||||
IndexServicesProvider indexServices = indexService.getIndexServices();
|
||||
IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController());
|
||||
IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider);
|
||||
ShardRoutingHelper.reinit(routing);
|
||||
newShard.updateRoutingEntry(routing, false);
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||
assertTrue(newShard.recoverFromStore(routing, localNode));
|
||||
routing = new ShardRouting(routing);
|
||||
ShardRoutingHelper.moveToStarted(routing);
|
||||
newShard.updateRoutingEntry(routing, true);
|
||||
return newShard;
|
||||
}
|
||||
|
||||
private static class FieldMaskingReader extends FilterDirectoryReader {
|
||||
|
|
Loading…
Reference in New Issue