Ensure either success or failure path for SearchOperationListener is called (#37467)
Today we have several implementations of executing SearchOperationListener in SearchService. While all of them seem to be safe at least on, the one that executes scroll searches can cause illegal execution of SearchOperationListener that can then in-turn trigger assertions in ShardSearchStats. This change adds a SearchOperationListenerExecutor that uses try-with blocks to ensure listeners are called in a safe way. Relates to #37185
This commit is contained in:
parent
100537fbc3
commit
4ec3a6d922
|
@ -24,7 +24,6 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.apache.lucene.search.FieldDoc;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.OriginalIndices;
|
||||
import org.elasticsearch.action.search.SearchTask;
|
||||
|
@ -329,7 +328,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
} catch (Exception e) {
|
||||
logger.trace("Dfs phase failed", e);
|
||||
processFailure(context, e);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
throw e;
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -380,29 +379,24 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
});
|
||||
}
|
||||
|
||||
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
|
||||
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception {
|
||||
final SearchContext context = createAndPutContext(request);
|
||||
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
|
||||
context.incRef();
|
||||
boolean queryPhaseSuccess = false;
|
||||
try {
|
||||
context.setTask(task);
|
||||
operationListener.onPreQueryPhase(context);
|
||||
long time = System.nanoTime();
|
||||
contextProcessing(context);
|
||||
|
||||
loadOrExecuteQueryPhase(request, context);
|
||||
|
||||
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
|
||||
freeContext(context.id());
|
||||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
final long afterQueryTime;
|
||||
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
|
||||
contextProcessing(context);
|
||||
loadOrExecuteQueryPhase(request, context);
|
||||
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
|
||||
freeContext(context.id());
|
||||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
afterQueryTime = executor.success();
|
||||
}
|
||||
final long afterQueryTime = System.nanoTime();
|
||||
queryPhaseSuccess = true;
|
||||
operationListener.onQueryPhase(context, afterQueryTime - time);
|
||||
if (request.numberOfShards() == 1) {
|
||||
return executeFetchPhase(context, operationListener, afterQueryTime);
|
||||
return executeFetchPhase(context, afterQueryTime);
|
||||
}
|
||||
return context.queryResult();
|
||||
} catch (Exception e) {
|
||||
|
@ -411,21 +405,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
e = (e.getCause() == null || e.getCause() instanceof Exception) ?
|
||||
(Exception) e.getCause() : new ElasticsearchException(e.getCause());
|
||||
}
|
||||
if (!queryPhaseSuccess) {
|
||||
operationListener.onFailedQueryPhase(context);
|
||||
}
|
||||
logger.trace("Query phase failed", e);
|
||||
processFailure(context, e);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
throw e;
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
}
|
||||
|
||||
private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOperationListener operationListener,
|
||||
long afterQueryTime) {
|
||||
operationListener.onPreFetchPhase(context);
|
||||
try {
|
||||
private QueryFetchSearchResult executeFetchPhase(SearchContext context, long afterQueryTime) {
|
||||
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)){
|
||||
shortcutDocIdsToLoad(context);
|
||||
fetchPhase.execute(context);
|
||||
if (fetchPhaseShouldFreeContext(context)) {
|
||||
|
@ -433,34 +422,27 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
operationListener.onFailedFetchPhase(context);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
executor.success();
|
||||
}
|
||||
operationListener.onFetchPhase(context, System.nanoTime() - afterQueryTime);
|
||||
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
|
||||
}
|
||||
|
||||
public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask task, ActionListener<ScrollQuerySearchResult> listener) {
|
||||
runAsync(request.id(), () -> {
|
||||
final SearchContext context = findContext(request.id(), request);
|
||||
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
|
||||
context.incRef();
|
||||
try {
|
||||
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
|
||||
context.setTask(task);
|
||||
operationListener.onPreQueryPhase(context);
|
||||
long time = System.nanoTime();
|
||||
contextProcessing(context);
|
||||
processScroll(request, context);
|
||||
queryPhase.execute(context);
|
||||
contextProcessedSuccessfully(context);
|
||||
operationListener.onQueryPhase(context, System.nanoTime() - time);
|
||||
executor.success();
|
||||
return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget());
|
||||
} catch (Exception e) {
|
||||
operationListener.onFailedQueryPhase(context);
|
||||
logger.trace("Query phase failed", e);
|
||||
processFailure(context, e);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
throw e;
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -471,15 +453,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
runAsync(request.id(), () -> {
|
||||
final SearchContext context = findContext(request.id(), request);
|
||||
context.setTask(task);
|
||||
IndexShard indexShard = context.indexShard();
|
||||
SearchOperationListener operationListener = indexShard.getSearchOperationListener();
|
||||
context.incRef();
|
||||
try {
|
||||
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
|
||||
contextProcessing(context);
|
||||
context.searcher().setAggregatedDfs(request.dfs());
|
||||
|
||||
operationListener.onPreQueryPhase(context);
|
||||
long time = System.nanoTime();
|
||||
queryPhase.execute(context);
|
||||
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
|
||||
// no hits, we can release the context since there will be no fetch phase
|
||||
|
@ -487,13 +464,12 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
operationListener.onQueryPhase(context, System.nanoTime() - time);
|
||||
executor.success();
|
||||
return context.queryResult();
|
||||
} catch (Exception e) {
|
||||
operationListener.onFailedQueryPhase(context);
|
||||
logger.trace("Query phase failed", e);
|
||||
processFailure(context, e);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
throw e;
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -527,28 +503,19 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
ActionListener<ScrollQueryFetchSearchResult> listener) {
|
||||
runAsync(request.id(), () -> {
|
||||
final SearchContext context = findContext(request.id(), request);
|
||||
context.setTask(task);
|
||||
context.incRef();
|
||||
try {
|
||||
context.setTask(task);
|
||||
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)){
|
||||
contextProcessing(context);
|
||||
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
|
||||
processScroll(request, context);
|
||||
operationListener.onPreQueryPhase(context);
|
||||
final long time = System.nanoTime();
|
||||
try {
|
||||
queryPhase.execute(context);
|
||||
} catch (Exception e) {
|
||||
operationListener.onFailedQueryPhase(context);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
long afterQueryTime = System.nanoTime();
|
||||
operationListener.onQueryPhase(context, afterQueryTime - time);
|
||||
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, operationListener, afterQueryTime);
|
||||
queryPhase.execute(context);
|
||||
final long afterQueryTime = executor.success();
|
||||
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, afterQueryTime);
|
||||
return new ScrollQueryFetchSearchResult(fetchSearchResult, context.shardTarget());
|
||||
} catch (Exception e) {
|
||||
logger.trace("Fetch phase failed", e);
|
||||
processFailure(context, e);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
throw e;
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -558,7 +525,6 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
public void executeFetchPhase(ShardFetchRequest request, SearchTask task, ActionListener<FetchSearchResult> listener) {
|
||||
runAsync(request.id(), () -> {
|
||||
final SearchContext context = findContext(request.id(), request);
|
||||
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
|
||||
context.incRef();
|
||||
try {
|
||||
context.setTask(task);
|
||||
|
@ -567,21 +533,20 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
|
||||
}
|
||||
context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
|
||||
operationListener.onPreFetchPhase(context);
|
||||
long time = System.nanoTime();
|
||||
fetchPhase.execute(context);
|
||||
if (fetchPhaseShouldFreeContext(context)) {
|
||||
freeContext(request.id());
|
||||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) {
|
||||
fetchPhase.execute(context);
|
||||
if (fetchPhaseShouldFreeContext(context)) {
|
||||
freeContext(request.id());
|
||||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
executor.success();
|
||||
}
|
||||
operationListener.onFetchPhase(context, System.nanoTime() - time);
|
||||
return context.fetchResult();
|
||||
} catch (Exception e) {
|
||||
operationListener.onFailedFetchPhase(context);
|
||||
logger.trace("Fetch phase failed", e);
|
||||
processFailure(context, e);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
throw e;
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -661,7 +626,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
context.lowLevelCancellation(lowLevelCancellation);
|
||||
} catch (Exception e) {
|
||||
context.close();
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
return context;
|
||||
|
@ -733,7 +698,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
}
|
||||
}
|
||||
|
||||
private void contextScrollKeepAlive(SearchContext context, long keepAlive) throws IOException {
|
||||
private void contextScrollKeepAlive(SearchContext context, long keepAlive) {
|
||||
if (keepAlive > maxKeepAlive) {
|
||||
throw new IllegalArgumentException(
|
||||
"Keep alive for scroll (" + TimeValue.timeValueMillis(keepAlive) + ") is too large. " +
|
||||
|
@ -991,7 +956,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
|
||||
}
|
||||
|
||||
private void processScroll(InternalScrollSearchRequest request, SearchContext context) throws IOException {
|
||||
private void processScroll(InternalScrollSearchRequest request, SearchContext context) {
|
||||
// process scroll
|
||||
context.from(context.from() + context.size());
|
||||
context.scrollContext().scroll = request.scroll();
|
||||
|
@ -1147,4 +1112,58 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
return canMatch;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}.
|
||||
* This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}.
|
||||
*/
|
||||
private static final class SearchOperationListenerExecutor implements AutoCloseable {
|
||||
private final SearchOperationListener listener;
|
||||
private final SearchContext context;
|
||||
private final long time;
|
||||
private final boolean fetch;
|
||||
private long afterQueryTime = -1;
|
||||
private boolean closed = false;
|
||||
|
||||
SearchOperationListenerExecutor(SearchContext context) {
|
||||
this(context, false, System.nanoTime());
|
||||
}
|
||||
|
||||
SearchOperationListenerExecutor(SearchContext context, boolean fetch, long startTime) {
|
||||
this.listener = context.indexShard().getSearchOperationListener();
|
||||
this.context = context;
|
||||
time = startTime;
|
||||
this.fetch = fetch;
|
||||
if (fetch) {
|
||||
listener.onPreFetchPhase(context);
|
||||
} else {
|
||||
listener.onPreQueryPhase(context);
|
||||
}
|
||||
}
|
||||
|
||||
long success() {
|
||||
return afterQueryTime = System.nanoTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case";
|
||||
if (closed == false) {
|
||||
closed = true;
|
||||
if (afterQueryTime != -1) {
|
||||
if (fetch) {
|
||||
listener.onFetchPhase(context, afterQueryTime - time);
|
||||
} else {
|
||||
listener.onQueryPhase(context, afterQueryTime - time);
|
||||
}
|
||||
} else {
|
||||
if (fetch) {
|
||||
listener.onFailedFetchPhase(context);
|
||||
} else {
|
||||
listener.onFailedQueryPhase(context);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue