Time Memory Leak: Search requests don't eagerly clean the search context, closes #153.

This commit is contained in:
kimchy 2010-04-30 01:48:35 +03:00
parent ceb0138aa8
commit 30aae506f3
7 changed files with 39 additions and 12 deletions

View File

@ -69,6 +69,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
private final Map<SearchShardTarget, FetchSearchResult> fetchResults = searchCache.obtainFetchResults(); private final Map<SearchShardTarget, FetchSearchResult> fetchResults = searchCache.obtainFetchResults();
private volatile Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad;
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) { private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener); super(request, listener);
@ -169,9 +170,9 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
private void innerExecuteFetchPhase() { private void innerExecuteFetchPhase() {
sortedShardList = searchPhaseController.sortDocs(queryResults.values()); sortedShardList = searchPhaseController.sortDocs(queryResults.values());
final Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList); final Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList);
this.docIdsToLoad = docIdsToLoad;
if (docIdsToLoad.isEmpty()) { if (docIdsToLoad.isEmpty()) {
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
finishHim(); finishHim();
} }
@ -219,8 +220,6 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
} }
} }
} }
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
} }
private void executeFetch(final AtomicInteger counter, FetchSearchRequest fetchSearchRequest, DiscoveryNode node) { private void executeFetch(final AtomicInteger counter, FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
@ -259,6 +258,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (request.scroll() != null) { if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), dfsResults); scrollId = TransportSearchHelper.buildScrollId(request.searchType(), dfsResults);
} }
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
searchCache.releaseDfsResults(dfsResults); searchCache.releaseDfsResults(dfsResults);
searchCache.releaseQueryResults(queryResults); searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults); searchCache.releaseFetchResults(fetchResults);

View File

@ -42,7 +42,7 @@ import java.util.Map;
import static org.elasticsearch.action.search.type.TransportSearchHelper.*; import static org.elasticsearch.action.search.type.TransportSearchHelper.*;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public class TransportSearchQueryAndFetchAction extends TransportSearchTypeAction { public class TransportSearchQueryAndFetchAction extends TransportSearchTypeAction {

View File

@ -63,6 +63,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
private final Map<SearchShardTarget, FetchSearchResult> fetchResults = searchCache.obtainFetchResults(); private final Map<SearchShardTarget, FetchSearchResult> fetchResults = searchCache.obtainFetchResults();
private volatile Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad;
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) { private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener); super(request, listener);
@ -83,9 +84,9 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
@Override protected void moveToSecondPhase() { @Override protected void moveToSecondPhase() {
sortedShardList = searchPhaseController.sortDocs(queryResults.values()); sortedShardList = searchPhaseController.sortDocs(queryResults.values());
final Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList); final Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList);
this.docIdsToLoad = docIdsToLoad;
if (docIdsToLoad.isEmpty()) { if (docIdsToLoad.isEmpty()) {
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
finishHim(); finishHim();
} }
@ -134,8 +135,6 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
} }
} }
} }
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
} }
private void executeFetch(final AtomicInteger counter, FetchSearchRequest fetchSearchRequest, DiscoveryNode node) { private void executeFetch(final AtomicInteger counter, FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
@ -174,6 +173,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
if (request.scroll() != null) { if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), queryResults.values()); scrollId = TransportSearchHelper.buildScrollId(request.searchType(), queryResults.values());
} }
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
searchCache.releaseQueryResults(queryResults); searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults); searchCache.releaseFetchResults(fetchResults);
invokeListener(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures())); invokeListener(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));

View File

@ -279,6 +279,9 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
*/ */
protected void releaseIrrelevantSearchContexts(Map<SearchShardTarget, QuerySearchResultProvider> queryResults, protected void releaseIrrelevantSearchContexts(Map<SearchShardTarget, QuerySearchResultProvider> queryResults,
Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad) { Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad) {
if (docIdsToLoad == null) {
return;
}
for (Map.Entry<SearchShardTarget, QuerySearchResultProvider> entry : queryResults.entrySet()) { for (Map.Entry<SearchShardTarget, QuerySearchResultProvider> entry : queryResults.entrySet()) {
if (!docIdsToLoad.containsKey(entry.getKey())) { if (!docIdsToLoad.containsKey(entry.getKey())) {
DiscoveryNode node = nodes.get(entry.getKey().nodeId()); DiscoveryNode node = nodes.get(entry.getKey().nodeId());

View File

@ -75,12 +75,16 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
private class IndexReaderCleaner implements Runnable { private class IndexReaderCleaner implements Runnable {
@Override public void run() { @Override public void run() {
int totalCount = cache.size();
int cleaned = 0;
for (Iterator<IndexReader> readerIt = cache.keySet().iterator(); readerIt.hasNext();) { for (Iterator<IndexReader> readerIt = cache.keySet().iterator(); readerIt.hasNext();) {
IndexReader reader = readerIt.next(); IndexReader reader = readerIt.next();
if (reader.getRefCount() <= 0) { if (reader.getRefCount() <= 0) {
readerIt.remove(); readerIt.remove();
cleaned++;
} }
} }
logger.trace("Cleaned [{}] out of estimated total [{}]", cleaned, totalCount);
} }
} }

View File

@ -225,8 +225,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
queryPhase.execute(context); queryPhase.execute(context);
shortcutDocIdsToLoad(context); shortcutDocIdsToLoad(context);
fetchPhase.execute(context); fetchPhase.execute(context);
if (context.scroll() != null) { if (context.scroll() == null) {
activeContexts.put(context.id(), context); freeContext(request.id());
} }
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (RuntimeException e) { } catch (RuntimeException e) {

View File

@ -31,9 +31,12 @@ import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.guice.inject.Inject; import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.io.stream.LongStreamable; import org.elasticsearch.util.io.stream.LongStreamable;
import org.elasticsearch.util.io.stream.VoidStreamable; import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.logging.ESLogger;
import org.elasticsearch.util.settings.Settings;
/** /**
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through * An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
@ -41,7 +44,21 @@ import org.elasticsearch.util.io.stream.VoidStreamable;
* *
* @author kimchy (Shay Banon) * @author kimchy (Shay Banon)
*/ */
public class SearchServiceTransportAction { public class SearchServiceTransportAction extends AbstractComponent {
static final class FreeContextResponseHandler extends VoidTransportResponseHandler {
private final ESLogger logger;
FreeContextResponseHandler(ESLogger logger) {
super(false);
this.logger = logger;
}
@Override public void handleException(RemoteTransportException exp) {
logger.warn("Failed to send release search context", exp);
}
}
private final TransportService transportService; private final TransportService transportService;
@ -49,7 +66,10 @@ public class SearchServiceTransportAction {
private final SearchService searchService; private final SearchService searchService;
@Inject public SearchServiceTransportAction(TransportService transportService, ClusterService clusterService, SearchService searchService) { private final FreeContextResponseHandler freeContextResponseHandler = new FreeContextResponseHandler(logger);
@Inject public SearchServiceTransportAction(Settings settings, TransportService transportService, ClusterService clusterService, SearchService searchService) {
super(settings);
this.transportService = transportService; this.transportService = transportService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.searchService = searchService; this.searchService = searchService;
@ -69,7 +89,7 @@ public class SearchServiceTransportAction {
if (clusterService.state().nodes().localNodeId().equals(node.id())) { if (clusterService.state().nodes().localNodeId().equals(node.id())) {
searchService.freeContext(contextId); searchService.freeContext(contextId);
} else { } else {
transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new LongStreamable(contextId), VoidTransportResponseHandler.INSTANCE_NOSPAWN); transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new LongStreamable(contextId), freeContextResponseHandler);
} }
} }