Fix leaking searcher when shards are removed or relocated (#52099)

We might leak a searcher if the target shard is removed (i.e., its index
is deleted) or relocated while we are creating a SearchContext from a
SearchRewriteContext.

Relates #51708
Closes #52021

I labelled this non-issue for an unreleased bug introduced in #51708.
This commit is contained in:
Nhat Nguyen 2020-02-09 22:03:57 -05:00
parent 79f67e79cf
commit 80a9a08b05
3 changed files with 59 additions and 10 deletions

View File

@ -694,14 +694,14 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) {
final ShardSearchRequest request = rewriteContext.request;
final Engine.Searcher searcher = rewriteContext.searcher;
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(),
indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE);
boolean success = false;
try {
final ShardSearchRequest request = rewriteContext.request;
final Engine.Searcher searcher = rewriteContext.searcher;
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(),
indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE);
DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
searcher, clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout,
fetchPhase, clusterService.state().nodes().getMinNodeVersion());
@ -709,8 +709,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
return searchContext;
} finally {
if (success == false) {
// we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise
// leak a searcher and this can have severe implications (unable to obtain shard lock exceptions).
// we handle the case where `IndicesService#indexServiceSafe`or `IndexService#getShard`, or the DefaultSearchContext
// constructor throws an exception since we would otherwise leak a searcher and this can have severe implications
// (unable to obtain shard lock exceptions).
IOUtils.closeWhileHandlingException(rewriteContext.searcher);
}
}

View File

@ -87,6 +87,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -457,7 +458,7 @@ public class RelocationIT extends ESIntegTestCase {
}
}
public void testIndexAndRelocateConcurrently() throws Exception {
public void testIndexSearchAndRelocateConcurrently() throws Exception {
int halfNodes = randomIntBetween(1, 3);
Settings[] nodeSettings = Stream.concat(
Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes),
@ -474,8 +475,21 @@ public class RelocationIT extends ESIntegTestCase {
.put("index.routing.allocation.exclude.color", "blue")
.put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1));
if (randomBoolean()) {
settings.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), randomIntBetween(1, 10) + "s");
}
assertAcked(prepareCreate("test", settings));
assertAllShardsOnNodes("test", redNodes);
AtomicBoolean stopped = new AtomicBoolean(false);
Thread[] searchThreads = randomBoolean() ? new Thread[0] : new Thread[randomIntBetween(1, 4)];
for (int i = 0; i < searchThreads.length; i++) {
searchThreads[i] = new Thread(() -> {
while (stopped.get() == false) {
assertNoFailures(client().prepareSearch("test").setRequestCache(false).get());
}
});
searchThreads[i].start();
}
int numDocs = randomIntBetween(100, 150);
ArrayList<String> ids = new ArrayList<>();
logger.info(" --> indexing [{}] docs", numDocs);
@ -513,7 +527,10 @@ public class RelocationIT extends ESIntegTestCase {
assertNoFailures(afterRelocation);
assertSearchHits(afterRelocation, ids.toArray(new String[ids.size()]));
}
stopped.set(true);
for (Thread searchThread : searchThreads) {
searchThread.join();
}
}
public void testRelocateWhileWaitingForRefresh() {

View File

@ -898,4 +898,35 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
latch.await();
}
}
public void testDeleteIndexWhileSearch() throws Exception {
createIndex("test");
int numDocs = randomIntBetween(1, 20);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test", "_doc").setSource("f", "v").get();
}
client().admin().indices().prepareRefresh("test").get();
AtomicBoolean stopped = new AtomicBoolean(false);
Thread[] searchers = new Thread[randomIntBetween(1, 4)];
CountDownLatch latch = new CountDownLatch(searchers.length);
for (int i = 0; i < searchers.length; i++) {
searchers[i] = new Thread(() -> {
latch.countDown();
while (stopped.get() == false) {
try {
client().prepareSearch("test").setRequestCache(false).get();
} catch (Exception ignored) {
return;
}
}
});
searchers[i].start();
}
latch.await();
client().admin().indices().prepareDelete("test").get();
stopped.set(true);
for (Thread searcher : searchers) {
searcher.join();
}
}
}