diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index 06fac046490..8f08fd9e0f7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -69,6 +69,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA private final Map fetchResults = searchCache.obtainFetchResults(); + private volatile Map docIdsToLoad; private AsyncAction(SearchRequest request, ActionListener listener) { super(request, listener); @@ -169,9 +170,9 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA private void innerExecuteFetchPhase() { sortedShardList = searchPhaseController.sortDocs(queryResults.values()); final Map docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList); + this.docIdsToLoad = docIdsToLoad; if (docIdsToLoad.isEmpty()) { - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); finishHim(); } @@ -219,8 +220,6 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA } } } - - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); } private void executeFetch(final AtomicInteger counter, FetchSearchRequest fetchSearchRequest, DiscoveryNode node) { @@ -259,6 +258,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA if (request.scroll() != null) { scrollId = TransportSearchHelper.buildScrollId(request.searchType(), dfsResults); } + releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); searchCache.releaseDfsResults(dfsResults); searchCache.releaseQueryResults(queryResults); searchCache.releaseFetchResults(fetchResults); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java index 8c184f84d04..4498534d18b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java @@ -42,7 +42,7 @@ import java.util.Map; import static org.elasticsearch.action.search.type.TransportSearchHelper.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeAction { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index 55ca0068445..0c44168d6e9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -63,6 +63,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi private final Map fetchResults = searchCache.obtainFetchResults(); + private volatile Map docIdsToLoad; private AsyncAction(SearchRequest request, ActionListener listener) { super(request, listener); @@ -83,9 +84,9 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi @Override protected void moveToSecondPhase() { sortedShardList = searchPhaseController.sortDocs(queryResults.values()); final Map docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList); + this.docIdsToLoad = docIdsToLoad; if (docIdsToLoad.isEmpty()) { - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); finishHim(); } @@ -134,8 +135,6 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi } } } - - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); } private void executeFetch(final AtomicInteger counter, FetchSearchRequest fetchSearchRequest, DiscoveryNode node) { @@ -174,6 +173,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi if (request.scroll() != null) { scrollId = TransportSearchHelper.buildScrollId(request.searchType(), queryResults.values()); } + releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); searchCache.releaseQueryResults(queryResults); searchCache.releaseFetchResults(fetchResults); invokeListener(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures())); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index b186a9e9b5a..b852e3e0d38 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -279,6 +279,9 @@ public abstract class TransportSearchTypeAction extends BaseAction queryResults, Map docIdsToLoad) { + if (docIdsToLoad == null) { + return; + } for (Map.Entry entry : queryResults.entrySet()) { if (!docIdsToLoad.containsKey(entry.getKey())) { DiscoveryNode node = nodes.get(entry.getKey().nodeId()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractConcurrentMapFilterCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractConcurrentMapFilterCache.java index 43c8f0842f0..f44e63d0639 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractConcurrentMapFilterCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractConcurrentMapFilterCache.java @@ -75,12 +75,16 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp private class IndexReaderCleaner implements Runnable { @Override public void run() { + int totalCount = cache.size(); + int cleaned = 0; for (Iterator readerIt = cache.keySet().iterator(); readerIt.hasNext();) { IndexReader reader = readerIt.next(); if (reader.getRefCount() <= 0) { readerIt.remove(); + cleaned++; } } + logger.trace("Cleaned [{}] out of estimated total [{}]", cleaned, totalCount); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java index 1420767cbee..e3dd980e488 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java @@ -225,8 +225,8 @@ public class SearchService extends AbstractLifecycleComponent { queryPhase.execute(context); shortcutDocIdsToLoad(context); fetchPhase.execute(context); - if (context.scroll() != null) { - activeContexts.put(context.id(), context); + if (context.scroll() == null) { + freeContext(request.id()); } return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } catch (RuntimeException e) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java index ba7d284a8b0..f0ec2f0add0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java @@ -31,9 +31,12 @@ import org.elasticsearch.search.internal.InternalSearchRequest; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.transport.*; +import org.elasticsearch.util.component.AbstractComponent; import org.elasticsearch.util.guice.inject.Inject; import org.elasticsearch.util.io.stream.LongStreamable; import org.elasticsearch.util.io.stream.VoidStreamable; +import org.elasticsearch.util.logging.ESLogger; +import org.elasticsearch.util.settings.Settings; /** * An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through @@ -41,7 +44,21 @@ import org.elasticsearch.util.io.stream.VoidStreamable; * * @author kimchy (Shay Banon) */ -public class SearchServiceTransportAction { +public class SearchServiceTransportAction extends AbstractComponent { + + static final class FreeContextResponseHandler extends VoidTransportResponseHandler { + + private final ESLogger logger; + + FreeContextResponseHandler(ESLogger logger) { + super(false); + this.logger = logger; + } + + @Override public void handleException(RemoteTransportException exp) { + logger.warn("Failed to send release search context", exp); + } + } private final TransportService transportService; @@ -49,7 +66,10 @@ public class SearchServiceTransportAction { private final SearchService searchService; - @Inject public SearchServiceTransportAction(TransportService transportService, ClusterService clusterService, SearchService searchService) { + private final FreeContextResponseHandler freeContextResponseHandler = new FreeContextResponseHandler(logger); + + @Inject public SearchServiceTransportAction(Settings settings, TransportService transportService, ClusterService clusterService, SearchService searchService) { + super(settings); this.transportService = transportService; this.clusterService = clusterService; this.searchService = searchService; @@ -69,7 +89,7 @@ public class SearchServiceTransportAction { if (clusterService.state().nodes().localNodeId().equals(node.id())) { searchService.freeContext(contextId); } else { - transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new LongStreamable(contextId), VoidTransportResponseHandler.INSTANCE_NOSPAWN); + transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new LongStreamable(contextId), freeContextResponseHandler); } }