Fix concurrent search and index delete (#42621)

Changed order of listener invocation so that we notify before
registering search context and notify after unregistering same.

This ensures that count up/down like what we do in ShardSearchStats
works. Otherwise, we risk notifying onFreeScrollContext before notifying
onNewScrollContext (same for onFreeContext/onNewContext, but we
currently have no assertions failing in those).

Closes #28053
This commit is contained in:
Henning Andersen 2019-06-06 19:48:59 +02:00 committed by Henning Andersen
parent 2de919e3a8
commit ca5dbf93a5
2 changed files with 48 additions and 12 deletions

View File

@ -548,19 +548,35 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
SearchContext context = createContext(request);
onNewContext(context);
boolean success = false;
try {
putContext(context);
if (request.scroll() != null) {
success = true;
return context;
} finally {
if (success == false) {
freeContext(context.id());
}
}
}
private void onNewContext(SearchContext context) {
boolean success = false;
try {
if (context.scrollContext() != null) {
openScrollContexts.incrementAndGet();
context.indexShard().getSearchOperationListener().onNewScrollContext(context);
}
context.indexShard().getSearchOperationListener().onNewContext(context);
success = true;
return context;
} finally {
if (!success) {
freeContext(context.id());
// currently, the concrete listener is CompositeListener, which swallows exceptions, but here we anyway try to do the
// right thing by closing and notifying onFreeXXX in case one of the listeners fails with an exception in the future.
if (success == false) {
try (SearchContext dummy = context) {
onFreeContext(context);
}
}
}
}
@ -648,16 +664,21 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public boolean freeContext(long id) {
try (SearchContext context = removeContext(id)) {
if (context != null) {
onFreeContext(context);
return true;
}
return false;
}
}
private void onFreeContext(SearchContext context) {
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
assert activeContexts.containsKey(context.id()) == false;
context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
openScrollContexts.decrementAndGet();
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
}
return true;
}
return false;
}
}
public void freeAllScrollContexts() {

View File

@ -19,7 +19,6 @@
package org.elasticsearch.search;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
@ -51,6 +50,7 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
@ -226,6 +226,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
AtomicBoolean running = new AtomicBoolean(true);
CountDownLatch startGun = new CountDownLatch(1);
Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
final Thread thread = new Thread() {
@Override
public void run() {
@ -261,12 +262,16 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
try {
final int rounds = scaledRandomIntBetween(100, 10000);
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
SearchRequest scrollSearchRequest = new SearchRequest().allowPartialSearchResults(true)
.scroll(new Scroll(TimeValue.timeValueMinutes(1)));
for (int i = 0; i < rounds; i++) {
try {
try {
PlainActionFuture<SearchPhaseResult> result = new PlainActionFuture<>();
final boolean useScroll = randomBoolean();
service.executeQueryPhase(
new ShardSearchLocalRequest(searchRequest, indexShard.shardId(), 1,
new ShardSearchLocalRequest(useScroll ? scrollSearchRequest : searchRequest,
indexShard.shardId(), 1,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null),
new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result);
SearchPhaseResult searchPhaseResult = result.get();
@ -276,6 +281,9 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
PlainActionFuture<FetchSearchResult> listener = new PlainActionFuture<>();
service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener);
listener.get();
if (useScroll) {
service.freeContext(searchPhaseResult.getRequestId());
}
} catch (ExecutionException ex) {
assertThat(ex.getCause(), instanceOf(RuntimeException.class));
throw ((RuntimeException)ex.getCause());
@ -293,6 +301,13 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
thread.join();
semaphore.acquire(Integer.MAX_VALUE);
}
assertEquals(0, service.getActiveContexts());
SearchStats.Stats totalStats = indexShard.searchStats().getTotal();
assertEquals(0, totalStats.getQueryCurrent());
assertEquals(0, totalStats.getScrollCurrent());
assertEquals(0, totalStats.getFetchCurrent());
}
public void testTimeout() throws IOException {