From 72c4cb51cc9d5768f8a1e40d7acb08f046ac9faf Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 8 Sep 2014 15:59:55 +0200 Subject: [PATCH] [CORE] Unify search context cleanup Today there are two different ways to cleanup search contexts which can potentially lead to double releasing of a context. This commit unifies the methods and prevents double closing. Closes #7625 --- .../elasticsearch/search/SearchService.java | 79 +++++++++---------- .../action/SearchServiceTransportAction.java | 2 +- .../search/internal/SearchContext.java | 12 ++- 3 files changed, 46 insertions(+), 47 deletions(-) diff --git a/src/main/java/org/elasticsearch/search/SearchService.java b/src/main/java/org/elasticsearch/search/SearchService.java index 0573d8e7feb..7795a4e4379 100644 --- a/src/main/java/org/elasticsearch/search/SearchService.java +++ b/src/main/java/org/elasticsearch/search/SearchService.java @@ -175,8 +175,8 @@ public class SearchService extends AbstractLifecycleComponent { @Override protected void doStop() throws ElasticsearchException { - for (SearchContext context : activeContexts.values()) { - freeContext(context); + for (final SearchContext context : activeContexts.values()) { + freeContext(context.id()); } activeContexts.clear(); } @@ -187,7 +187,7 @@ public class SearchService extends AbstractLifecycleComponent { } public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws ElasticsearchException { - SearchContext context = createAndPutContext(request); + final SearchContext context = createAndPutContext(request); try { contextProcessing(context); dfsPhase.execute(context); @@ -195,7 +195,7 @@ public class SearchService extends AbstractLifecycleComponent { return context.dfsResult(); } catch (Throwable e) { logger.trace("Dfs phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -203,7 +203,7 @@ public class SearchService extends AbstractLifecycleComponent { } public QuerySearchResult executeScan(ShardSearchRequest request) throws ElasticsearchException { - SearchContext context = createAndPutContext(request); + final SearchContext context = createAndPutContext(request); try { if (context.aggregations() != null) { throw new ElasticsearchIllegalArgumentException("aggregations are not supported with search_type=scan"); @@ -221,7 +221,7 @@ public class SearchService extends AbstractLifecycleComponent { return context.queryResult(); } catch (Throwable e) { logger.trace("Scan phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -229,7 +229,7 @@ public class SearchService extends AbstractLifecycleComponent { } public ScrollQueryFetchSearchResult executeScan(InternalScrollSearchRequest request) throws ElasticsearchException { - SearchContext context = findContext(request.id()); + final SearchContext context = findContext(request.id()); contextProcessing(context); try { processScroll(request, context); @@ -249,7 +249,7 @@ public class SearchService extends AbstractLifecycleComponent { return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget()); } catch (Throwable e) { logger.trace("Scan phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -257,7 +257,7 @@ public class SearchService extends AbstractLifecycleComponent { } public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException { - SearchContext context = createAndPutContext(request); + final SearchContext context = createAndPutContext(request); try { context.indexShard().searchService().onPreQueryPhase(context); long time = System.nanoTime(); @@ -287,7 +287,7 @@ public class SearchService extends AbstractLifecycleComponent { } context.indexShard().searchService().onFailedQueryPhase(context); logger.trace("Query phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -295,7 +295,7 @@ public class SearchService extends AbstractLifecycleComponent { } public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) throws ElasticsearchException { - SearchContext context = findContext(request.id()); + final SearchContext context = findContext(request.id()); try { context.indexShard().searchService().onPreQueryPhase(context); long time = System.nanoTime(); @@ -308,7 +308,7 @@ public class SearchService extends AbstractLifecycleComponent { } catch (Throwable e) { context.indexShard().searchService().onFailedQueryPhase(context); logger.trace("Query phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -316,12 +316,12 @@ public class SearchService extends AbstractLifecycleComponent { } public QuerySearchResult executeQueryPhase(QuerySearchRequest request) throws ElasticsearchException { - SearchContext context = findContext(request.id()); + final SearchContext context = findContext(request.id()); contextProcessing(context); try { context.searcher().dfSource(new CachedDfSource(context.searcher().getIndexReader(), request.dfs(), context.similarityService().similarity())); } catch (Throwable e) { - freeContext(context); + freeContext(context.id()); cleanContext(context); throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e); } @@ -335,7 +335,7 @@ public class SearchService extends AbstractLifecycleComponent { } catch (Throwable e) { context.indexShard().searchService().onFailedQueryPhase(context); logger.trace("Query phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -343,7 +343,7 @@ public class SearchService extends AbstractLifecycleComponent { } public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws ElasticsearchException { - SearchContext context = createAndPutContext(request); + final SearchContext context = createAndPutContext(request); contextProcessing(context); try { context.indexShard().searchService().onPreQueryPhase(context); @@ -373,7 +373,7 @@ public class SearchService extends AbstractLifecycleComponent { return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } catch (Throwable e) { logger.trace("Fetch phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -381,12 +381,12 @@ public class SearchService extends AbstractLifecycleComponent { } public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) throws ElasticsearchException { - SearchContext context = findContext(request.id()); + final SearchContext context = findContext(request.id()); contextProcessing(context); try { context.searcher().dfSource(new CachedDfSource(context.searcher().getIndexReader(), request.dfs(), context.similarityService().similarity())); } catch (Throwable e) { - freeContext(context); + freeContext(context.id()); cleanContext(context); throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e); } @@ -418,7 +418,7 @@ public class SearchService extends AbstractLifecycleComponent { return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } catch (Throwable e) { logger.trace("Fetch phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -426,7 +426,7 @@ public class SearchService extends AbstractLifecycleComponent { } public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request) throws ElasticsearchException { - SearchContext context = findContext(request.id()); + final SearchContext context = findContext(request.id()); contextProcessing(context); try { processScroll(request, context); @@ -457,7 +457,7 @@ public class SearchService extends AbstractLifecycleComponent { return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget()); } catch (Throwable e) { logger.trace("Fetch phase failed", e); - freeContext(context); + freeContext(context.id()); throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -465,7 +465,7 @@ public class SearchService extends AbstractLifecycleComponent { } public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticsearchException { - SearchContext context = findContext(request.id()); + final SearchContext context = findContext(request.id()); contextProcessing(context); try { if (request.lastEmittedDoc() != null) { @@ -485,7 +485,7 @@ public class SearchService extends AbstractLifecycleComponent { } catch (Throwable e) { context.indexShard().searchService().onFailedFetchPhase(context); logger.trace("Fetch phase failed", e); - freeContext(context); // we just try to make sure this is freed - rethrow orig exception. + freeContext(context.id()); // we just try to make sure this is freed - rethrow orig exception. throw ExceptionsHelper.convertToRuntime(e); } finally { cleanContext(context); @@ -511,7 +511,7 @@ public class SearchService extends AbstractLifecycleComponent { return context; } finally { if (!success) { - freeContext(context); + freeContext(context.id()); } } } @@ -561,27 +561,22 @@ public class SearchService extends AbstractLifecycleComponent { } public boolean freeContext(long id) { - SearchContext context = activeContexts.remove(id); - if (context == null) { - return false; + final SearchContext context = activeContexts.remove(id); + if (context != null) { + try { + context.indexShard().searchService().onFreeContext(context); + } finally { + context.close(); + } + return true; } - context.indexShard().searchService().onFreeContext(context); - context.close(); - return true; - } - - private void freeContext(SearchContext context) { - SearchContext removed = activeContexts.remove(context.id()); - if (removed != null) { - removed.indexShard().searchService().onFreeContext(removed); - } - context.close(); + return false; } public void freeAllScrollContexts() { for (SearchContext searchContext : activeContexts.values()) { if (searchContext.scroll() != null) { - freeContext(searchContext); + freeContext(searchContext.id()); } } } @@ -969,7 +964,7 @@ public class SearchService extends AbstractLifecycleComponent { } finally { try { if (context != null) { - freeContext(context); + freeContext(context.id()); cleanContext(context); } } finally { @@ -1002,7 +997,7 @@ public class SearchService extends AbstractLifecycleComponent { } if ((time - lastAccessTime > context.keepAlive())) { logger.debug("freeing search context [{}], time [{}], lastAccessTime [{}], keepAlive [{}]", context.id(), time, lastAccessTime, context.keepAlive()); - freeContext(context); + freeContext(context.id()); } } } diff --git a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java index 8ab244c1000..9fa8623bf81 100644 --- a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java +++ b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java @@ -145,7 +145,7 @@ public class SearchServiceTransportAction extends AbstractComponent { public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener actionListener) { if (clusterService.state().nodes().localNodeId().equals(node.id())) { - boolean freed = searchService.freeContext(contextId); + final boolean freed = searchService.freeContext(contextId); actionListener.onResponse(freed); } else { transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener)); diff --git a/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/src/main/java/org/elasticsearch/search/internal/SearchContext.java index a8967780181..71ea669a659 100644 --- a/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -64,6 +64,7 @@ import org.elasticsearch.search.suggest.SuggestionSearchContext; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** */ @@ -87,12 +88,15 @@ public abstract class SearchContext implements Releasable { } private Multimap clearables = null; + private final AtomicBoolean closed = new AtomicBoolean(false); public final void close() { - try { - clearReleasables(Lifetime.CONTEXT); - } finally { - doClose(); + if (closed.compareAndSet(false, true)) { // prevent double release + try { + clearReleasables(Lifetime.CONTEXT); + } finally { + doClose(); + } } }