diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index c5e1564b5ee..4b12db960de 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -547,24 +547,27 @@ abstract class AbstractSearchAsyncAction exten } /** - * This method should be called if a search phase failed to ensure all relevant search contexts and resources are released. - * this method will also notify the listener and sends back a failure to the user. + * This method should be called if a search phase failed to ensure all relevant reader contexts are released. + * This method will also notify the listener and sends back a failure to the user. * * @param exception the exception explaining or causing the phase failure */ private void raisePhaseFailure(SearchPhaseExecutionException exception) { - results.getSuccessfulResults().forEach((entry) -> { - if (entry.getContextId() != null) { - try { - SearchShardTarget searchShardTarget = entry.getSearchShardTarget(); - Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); - sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices()); - } catch (Exception inner) { - inner.addSuppressed(exception); - logger.trace("failed to release context", inner); + // we don't release persistent readers (point in time). + if (request.pointInTimeBuilder() == null) { + results.getSuccessfulResults().forEach((entry) -> { + if (entry.getContextId() != null) { + try { + SearchShardTarget searchShardTarget = entry.getSearchShardTarget(); + Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); + sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices()); + } catch (Exception inner) { + inner.addSuppressed(exception); + logger.trace("failed to release context", inner); + } } - } - }); + }); + } listener.onFailure(exception); } diff --git a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java index 82e5d002bee..980049e99af 100644 --- a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java @@ -98,11 +98,13 @@ final class DfsQueryPhase extends SearchPhase { progressListener.notifyQueryFailure(shardIndex, searchShardTarget, exception); counter.onFailure(shardIndex, searchShardTarget, exception); } finally { - // the query might not have been executed at all (for example because thread pool rejected - // execution) and the search context that was created in dfs phase might not be released. - // release it again to be in the safe side - context.sendReleaseSearchContext( - querySearchRequest.contextId(), connection, searchShardTarget.getOriginalIndices()); + if (context.getRequest().pointInTimeBuilder() == null) { + // the query might not have been executed at all (for example because thread pool rejected + // execution) and the search context that was created in dfs phase might not be released. + // release it again to be in the safe side + context.sendReleaseSearchContext( + querySearchRequest.contextId(), connection, searchShardTarget.getOriginalIndices()); + } } } }); diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 613761871f4..55d40a023d5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -206,11 +206,11 @@ final class FetchSearchPhase extends SearchPhase { * Releases shard targets that are not used in the docsIdsToLoad. */ private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) { - // we only release search context that we did not fetch from if we are not scrolling - // and if it has at lease one hit that didn't make it to the global topDocs - if (context.getRequest().scroll() == null && - context.getRequest().pointInTimeBuilder() == null && - queryResult.hasSearchContext()) { + // we only release search context that we did not fetch from, if we are not scrolling + // or using a PIT and if it has at least one hit that didn't make it to the global topDocs + if (queryResult.hasSearchContext() + && context.getRequest().scroll() == null + && context.getRequest().pointInTimeBuilder() == null) { try { SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget(); Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); diff --git a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java index c0d98b434a3..29b5e11040f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java @@ -113,7 +113,7 @@ public interface SearchOperationListener { * @param readerContext The reader context used by this request. * @param transportRequest the request that is going to use the search context */ - default void validateSearchContext(ReaderContext readerContext, TransportRequest transportRequest) {} + default void validateReaderContext(ReaderContext readerContext, TransportRequest transportRequest) {} /** * A Composite listener that multiplexes calls to each of the listeners methods. @@ -238,11 +238,11 @@ public interface SearchOperationListener { } @Override - public void validateSearchContext(ReaderContext readerContext, TransportRequest request) { + public void validateReaderContext(ReaderContext readerContext, TransportRequest request) { Exception exception = null; for (SearchOperationListener listener : listeners) { try { - listener.validateSearchContext(readerContext, request); + listener.validateReaderContext(readerContext, request); } catch (Exception e) { exception = ExceptionsHelper.useOrSuppress(exception, e); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 30230d96f9d..341bafb9453 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -118,6 +118,7 @@ import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportRequest; import java.io.IOException; import java.util.Collections; @@ -353,7 +354,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv return context.dfsResult(); } catch (Exception e) { logger.trace("Dfs phase failed", e); - processFailure(request, readerContext, e); + processFailure(readerContext, e); throw e; } } @@ -396,12 +397,12 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv try (Releasable ignored = markAsUsed) { listener.onFailure(exc); } finally { - processFailure(request, readerContext, exc); + processFailure(readerContext, exc); } return; } if (canRewriteToMatchNone(canMatchRequest.source()) - && canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) { + && canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) { try (Releasable ignored = markAsUsed) { if (orig.readerId() == null) { try { @@ -420,17 +421,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } // fork the execution in the search thread pool - runAsync(getExecutor(shard), () -> { - try (Releasable ignored = markAsUsed) { - return executeQueryPhase(orig, task, readerContext); - } - }, ActionListener.wrap(listener::onResponse, exc -> { - try (Releasable ignored = markAsUsed) { - listener.onFailure(exc); - } finally { - processFailure(request, readerContext, exc); - } - })); + runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, readerContext), + wrapFailureListener(listener, readerContext, markAsUsed)); } @Override @@ -442,7 +434,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private IndexShard getShard(ShardSearchRequest request) { if (request.readerId() != null) { - return findReaderContext(request.readerId()).indexShard(); + return findReaderContext(request.readerId(), request).indexShard(); } else { return indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); } @@ -481,7 +473,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv (Exception) e.getCause() : new ElasticsearchException(e.getCause()); } logger.trace("Query phase failed", e); - processFailure(request, readerContext, e); + processFailure(readerContext, e); throw e; } } @@ -501,13 +493,12 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv public void executeQueryPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { - final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId()); + final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); + final Releasable markAsUsed = readerContext.markAsUsed(); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); - try (Releasable ignored = readerContext.markAsUsed(); - SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { - readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, request); if (request.scroll() != null && request.scroll().keepAlive() != null) { final long keepAlive = request.scroll().keepAlive().millis(); checkKeepAliveLimit(keepAlive); @@ -521,21 +512,20 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv return new ScrollQuerySearchResult(searchContext.queryResult(), searchContext.shardTarget()); } catch (Exception e) { logger.trace("Query phase failed", e); - processFailure(shardSearchRequest, readerContext, e); + processFailure(readerContext, e); throw e; } - }, listener); + }, ActionListener.runAfter(listener, markAsUsed::close)); } public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { - final ReaderContext readerContext = findReaderContext(request.contextId()); + final ReaderContext readerContext = findReaderContext(request.contextId(), request); + final Releasable markAsUsed = readerContext.markAsUsed(); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest()); readerContext.setAggregatedDfs(request.dfs()); - try (Releasable ignored = readerContext.markAsUsed(); - SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { - readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, request); searchContext.searcher().setAggregatedDfs(request.dfs()); queryPhase.execute(searchContext); if (searchContext.queryResult().hasSearchContext() == false && readerContext.singleSession()) { @@ -552,10 +542,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); logger.trace("Query phase failed", e); - processFailure(shardSearchRequest, readerContext, e); + processFailure(readerContext, e); throw e; } - }, listener); + }, wrapFailureListener(listener, readerContext, markAsUsed)); } private Executor getExecutor(IndexShard indexShard) { @@ -573,13 +563,12 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { - final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId()); + final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); + final Releasable markAsUsed = readerContext.markAsUsed(); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); - try (Releasable ignored = readerContext.markAsUsed(); - SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { - readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, request); if (request.scroll() != null && request.scroll().keepAlive() != null) { checkKeepAliveLimit(request.scroll().keepAlive().millis()); readerContext.keepAlive(request.scroll().keepAlive().millis()); @@ -594,19 +583,18 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); logger.trace("Fetch phase failed", e); - processFailure(shardSearchRequest, readerContext, e); + processFailure(readerContext, e); throw e; } - }, listener); + }, ActionListener.runAfter(listener, markAsUsed::close)); } public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { - final ReaderContext readerContext = findReaderContext(request.contextId()); + final ReaderContext readerContext = findReaderContext(request.contextId(), request); + final Releasable markAsUsed = readerContext.markAsUsed(); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); - try (Releasable ignored = readerContext.markAsUsed(); - SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) { - readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, request); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) { if (request.lastEmittedDoc() != null) { searchContext.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); } @@ -625,10 +613,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); logger.trace("Fetch phase failed", e); - processFailure(shardSearchRequest, readerContext, e); + processFailure(readerContext, e); throw e; } - }, listener); + }, wrapFailureListener(listener, readerContext, markAsUsed)); } private ReaderContext getReaderContext(ShardSearchContextId id) { @@ -642,19 +630,24 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv return null; } - private ReaderContext findReaderContext(ShardSearchContextId id) throws SearchContextMissingException { + private ReaderContext findReaderContext(ShardSearchContextId id, TransportRequest request) throws SearchContextMissingException { final ReaderContext reader = getReaderContext(id); if (reader == null) { throw new SearchContextMissingException(id); } + try { + reader.validate(request); + } catch (Exception exc) { + processFailure(reader, exc); + throw exc; + } return reader; } final ReaderContext createOrGetReaderContext(ShardSearchRequest request, boolean keepStatesInContext) { if (request.readerId() != null) { assert keepStatesInContext == false; - final ReaderContext readerContext = findReaderContext(request.readerId()); - readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, request); + final ReaderContext readerContext = findReaderContext(request.readerId(), request); final long keepAlive = request.keepAlive().millis(); checkKeepAliveLimit(keepAlive); readerContext.keepAlive(keepAlive); @@ -860,17 +853,38 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } } - private void processFailure(ShardSearchRequest request, ReaderContext context, Exception e) { - if (context.singleSession() || request.scroll() != null) { + private ActionListener wrapFailureListener(ActionListener listener, ReaderContext context, Releasable releasable) { + return new ActionListener() { + @Override + public void onResponse(T resp) { + Releasables.close(releasable); + listener.onResponse(resp); + } + + @Override + public void onFailure(Exception exc) { + processFailure(context, exc); + Releasables.close(releasable); + listener.onFailure(exc); + } + }; + } + + private boolean isScrollContext(ReaderContext context) { + return context instanceof LegacyReaderContext && context.singleSession() == false; + } + + private void processFailure(ReaderContext context, Exception exc) { + if (context.singleSession() || isScrollContext(context)) { // we release the reader on failure if the request is a normal search or a scroll freeReaderContext(context.id()); } try { - if (Lucene.isCorruptionException(e)) { - context.indexShard().failShard("search execution corruption failure", e); + if (Lucene.isCorruptionException(exc)) { + context.indexShard().failShard("search execution corruption failure", exc); } } catch (Exception inner) { - inner.addSuppressed(e); + inner.addSuppressed(exc); logger.warn("failed to process shard failure to (potentially) send back shard failure on corruption", inner); } } @@ -1145,42 +1159,42 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv */ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException { assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType(); - final ReaderContext readerContext = request.readerId() != null ? getReaderContext(request.readerId()) : null; - final Releasable markAsUsed = readerContext != null ? readerContext.markAsUsed() : null; - final IndexService indexService; - final Engine.Searcher canMatchSearcher; - final boolean hasRefreshPending; - if (readerContext != null) { - readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, request); - checkKeepAliveLimit(request.keepAlive().millis()); - readerContext.keepAlive(request.keepAlive().millis()); - indexService = readerContext.indexService(); - canMatchSearcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); - hasRefreshPending = false; - } else { - indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - IndexShard indexShard = indexService.getShard(request.shardId().getId()); - hasRefreshPending = indexShard.hasRefreshPending(); - canMatchSearcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); - } - - try (Releasable ignored = Releasables.wrap(markAsUsed, canMatchSearcher)) { - QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), canMatchSearcher, - request::nowInMillis, request.getClusterAlias()); - Rewriteable.rewrite(request.getRewriteable(), context, false); - final boolean aliasFilterCanMatch = request.getAliasFilter() - .getQueryBuilder() instanceof MatchNoneQueryBuilder == false; - FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source()); - MinAndMax minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null; - final boolean canMatch; - if (canRewriteToMatchNone(request.source())) { - QueryBuilder queryBuilder = request.source().query(); - canMatch = aliasFilterCanMatch && queryBuilder instanceof MatchNoneQueryBuilder == false; + final ReaderContext readerContext = request.readerId() != null ? findReaderContext(request.readerId(), request) : null; + try (Releasable ignored = readerContext != null ? readerContext.markAsUsed() : () -> {}) { + final IndexService indexService; + final Engine.Searcher canMatchSearcher; + final boolean hasRefreshPending; + if (readerContext != null) { + checkKeepAliveLimit(request.keepAlive().millis()); + readerContext.keepAlive(request.keepAlive().millis()); + indexService = readerContext.indexService(); + canMatchSearcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); + hasRefreshPending = false; } else { - // null query means match_all - canMatch = aliasFilterCanMatch; + indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + IndexShard indexShard = indexService.getShard(request.shardId().getId()); + hasRefreshPending = indexShard.hasRefreshPending(); + canMatchSearcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); + } + + try (Releasable ignored2 = canMatchSearcher) { + QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), canMatchSearcher, + request::nowInMillis, request.getClusterAlias()); + Rewriteable.rewrite(request.getRewriteable(), context, false); + final boolean aliasFilterCanMatch = request.getAliasFilter() + .getQueryBuilder() instanceof MatchNoneQueryBuilder == false; + FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source()); + MinAndMax minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null; + final boolean canMatch; + if (canRewriteToMatchNone(request.source())) { + QueryBuilder queryBuilder = request.source().query(); + canMatch = aliasFilterCanMatch && queryBuilder instanceof MatchNoneQueryBuilder == false; + } else { + // null query means match_all + canMatch = aliasFilterCanMatch; + } + return new CanMatchResponse(canMatch || hasRefreshPending, minMax); } - return new CanMatchResponse(canMatch || hasRefreshPending, minMax); } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java index 1c3c14ab14d..ab188f8ddf2 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java @@ -19,8 +19,6 @@ package org.elasticsearch.search.internal; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; @@ -35,8 +33,7 @@ public class LegacyReaderContext extends ReaderContext { private AggregatedDfs aggregatedDfs; private RescoreDocIds rescoreDocIds; - private Engine.Searcher searcher; - private Releasable onClose; + private volatile Engine.Searcher searcher; public LegacyReaderContext(long id, IndexService indexService, IndexShard indexShard, Engine.SearcherSupplier reader, ShardSearchRequest shardSearchRequest, long keepAliveInMillis) { @@ -59,8 +56,9 @@ public class LegacyReaderContext extends ReaderContext { // This ensures that we wrap the searcher's reader with the user's permissions // when they are available. if (searcher == null) { - Engine.Searcher delegate = searcherSupplier.acquireSearcher(source); - onClose = delegate::close; + final Engine.Searcher delegate = searcherSupplier.acquireSearcher(source); + addOnClose(delegate); + // wrap the searcher so that closing is a noop, the actual closing happens when this context is closed searcher = new Engine.Searcher(delegate.source(), delegate.getDirectoryReader(), delegate.getSimilarity(), delegate.getQueryCache(), delegate.getQueryCachingPolicy(), () -> {}); } @@ -69,12 +67,6 @@ public class LegacyReaderContext extends ReaderContext { return super.acquireSearcher(source); } - - @Override - void doClose() { - Releasables.close(onClose, super::doClose); - } - @Override public ShardSearchRequest getShardSearchRequest(ShardSearchRequest other) { return shardSearchRequest; diff --git a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java index 507a2c1fbb4..b2a69d60196 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.search.RescoreDocIds; import org.elasticsearch.search.dfs.AggregatedDfs; +import org.elasticsearch.transport.TransportRequest; import java.util.HashMap; import java.util.List; @@ -84,6 +85,10 @@ public class ReaderContext implements Releasable { }; } + public void validate(TransportRequest request) { + indexShard.getSearchOperationListener().validateReaderContext(this, request); + } + private long nowInMillis() { return indexShard.getThreadPool().relativeTimeInMillis(); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java b/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java index edca5df7e4d..38292f61cca 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java @@ -116,7 +116,7 @@ public class SearchOperationListenerTests extends ESTestCase { } @Override - public void validateSearchContext(ReaderContext readerContext, TransportRequest request) { + public void validateReaderContext(ReaderContext readerContext, TransportRequest request) { assertNotNull(readerContext); validateSearchContext.incrementAndGet(); } @@ -271,10 +271,10 @@ public class SearchOperationListenerTests extends ESTestCase { assertEquals(0, validateSearchContext.get()); if (throwingListeners == 0) { - compositeListener.validateSearchContext(mock(ReaderContext.class), Empty.INSTANCE); + compositeListener.validateReaderContext(mock(ReaderContext.class), Empty.INSTANCE); } else { RuntimeException expected = expectThrows(RuntimeException.class, - () -> compositeListener.validateSearchContext(mock(ReaderContext.class), Empty.INSTANCE)); + () -> compositeListener.validateReaderContext(mock(ReaderContext.class), Empty.INSTANCE)); assertNull(expected.getMessage()); assertEquals(throwingListeners - 1, expected.getSuppressed().length); if (throwingListeners > 1) { diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java index 729c969f9c0..ec680509b18 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java @@ -79,7 +79,7 @@ public class SecuritySearchOperationListenerTests extends ESSingleNodeTestCase { SecuritySearchOperationListener listener = new SecuritySearchOperationListener(securityContext, licenseState, auditTrailService); listener.onNewScrollContext(readerContext); - listener.validateSearchContext(readerContext, Empty.INSTANCE); + listener.validateReaderContext(readerContext, Empty.INSTANCE); verify(licenseState, times(2)).isSecurityEnabled(); verifyZeroInteractions(auditTrailService, searchContext); } @@ -136,7 +136,7 @@ public class SecuritySearchOperationListenerTests extends ESSingleNodeTestCase { try (StoredContext ignore = threadContext.newStoredContext(false)) { Authentication authentication = new Authentication(new User("test", "role"), new RealmRef("realm", "file", "node"), null); authentication.writeToContext(threadContext); - listener.validateSearchContext(readerContext, Empty.INSTANCE); + listener.validateReaderContext(readerContext, Empty.INSTANCE); assertThat(threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY), is(indicesAccessControl)); verify(licenseState).isSecurityEnabled(); verifyZeroInteractions(auditTrail); @@ -148,7 +148,7 @@ public class SecuritySearchOperationListenerTests extends ESSingleNodeTestCase { Authentication authentication = new Authentication(new User("test", "role"), new RealmRef(realmName, "file", nodeName), null); authentication.writeToContext(threadContext); - listener.validateSearchContext(readerContext, Empty.INSTANCE); + listener.validateReaderContext(readerContext, Empty.INSTANCE); assertThat(threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY), is(indicesAccessControl)); verify(licenseState, times(2)).isSecurityEnabled(); verifyZeroInteractions(auditTrail); @@ -166,7 +166,7 @@ public class SecuritySearchOperationListenerTests extends ESSingleNodeTestCase { (AuthorizationInfo) () -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, authentication.getUser().roles())); final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); SearchContextMissingException expected = expectThrows(SearchContextMissingException.class, - () -> listener.validateSearchContext(readerContext, request)); + () -> listener.validateReaderContext(readerContext, request)); assertEquals(readerContext.id(), expected.contextId()); assertThat(threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY), nullValue()); verify(licenseState, Mockito.atLeast(3)).isSecurityEnabled(); @@ -185,7 +185,7 @@ public class SecuritySearchOperationListenerTests extends ESSingleNodeTestCase { authentication.writeToContext(threadContext); threadContext.putTransient(ORIGINATING_ACTION_KEY, "action"); final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); - listener.validateSearchContext(readerContext, request); + listener.validateReaderContext(readerContext, request); assertThat(threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY), is(indicesAccessControl)); verify(licenseState, Mockito.atLeast(4)).isSecurityEnabled(); verifyNoMoreInteractions(auditTrail); @@ -204,7 +204,7 @@ public class SecuritySearchOperationListenerTests extends ESSingleNodeTestCase { (AuthorizationInfo) () -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, authentication.getUser().roles())); final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); SearchContextMissingException expected = expectThrows(SearchContextMissingException.class, - () -> listener.validateSearchContext(readerContext, request)); + () -> listener.validateReaderContext(readerContext, request)); assertEquals(readerContext.id(), expected.contextId()); assertThat(threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY), nullValue()); verify(licenseState, Mockito.atLeast(5)).isSecurityEnabled(); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java index eca53bf802a..9e86c59995f 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java @@ -69,7 +69,7 @@ public final class SecuritySearchOperationListener implements SearchOperationLis * object from the scroll context with the current authentication context */ @Override - public void validateSearchContext(ReaderContext readerContext, TransportRequest request) { + public void validateReaderContext(ReaderContext readerContext, TransportRequest request) { if (licenseState.isSecurityEnabled()) { if (readerContext.scrollContext() != null) { final Authentication originalAuth = readerContext.getFromContext(AuthenticationField.AUTHENTICATION_KEY);