[CORE] Unify search context cleanup

Today there are two different ways to cleanup search contexts which can
potentially lead to double releasing of a context. This commit unifies
the methods and prevents double closing.

Closes #7625
This commit is contained in:
Simon Willnauer 2014-09-08 15:59:55 +02:00
parent 80a3038f83
commit 72c4cb51cc
3 changed files with 46 additions and 47 deletions

View File

@ -175,8 +175,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
@Override
protected void doStop() throws ElasticsearchException {
for (SearchContext context : activeContexts.values()) {
freeContext(context);
for (final SearchContext context : activeContexts.values()) {
freeContext(context.id());
}
activeContexts.clear();
}
@ -187,7 +187,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws ElasticsearchException {
SearchContext context = createAndPutContext(request);
final SearchContext context = createAndPutContext(request);
try {
contextProcessing(context);
dfsPhase.execute(context);
@ -195,7 +195,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
return context.dfsResult();
} catch (Throwable e) {
logger.trace("Dfs phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
@ -203,7 +203,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
public QuerySearchResult executeScan(ShardSearchRequest request) throws ElasticsearchException {
SearchContext context = createAndPutContext(request);
final SearchContext context = createAndPutContext(request);
try {
if (context.aggregations() != null) {
throw new ElasticsearchIllegalArgumentException("aggregations are not supported with search_type=scan");
@ -221,7 +221,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
return context.queryResult();
} catch (Throwable e) {
logger.trace("Scan phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
@ -229,7 +229,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
public ScrollQueryFetchSearchResult executeScan(InternalScrollSearchRequest request) throws ElasticsearchException {
SearchContext context = findContext(request.id());
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
processScroll(request, context);
@ -249,7 +249,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
} catch (Throwable e) {
logger.trace("Scan phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
@ -257,7 +257,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {
SearchContext context = createAndPutContext(request);
final SearchContext context = createAndPutContext(request);
try {
context.indexShard().searchService().onPreQueryPhase(context);
long time = System.nanoTime();
@ -287,7 +287,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
context.indexShard().searchService().onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
@ -295,7 +295,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) throws ElasticsearchException {
SearchContext context = findContext(request.id());
final SearchContext context = findContext(request.id());
try {
context.indexShard().searchService().onPreQueryPhase(context);
long time = System.nanoTime();
@ -308,7 +308,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
} catch (Throwable e) {
context.indexShard().searchService().onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
@ -316,12 +316,12 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
public QuerySearchResult executeQueryPhase(QuerySearchRequest request) throws ElasticsearchException {
SearchContext context = findContext(request.id());
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
context.searcher().dfSource(new CachedDfSource(context.searcher().getIndexReader(), request.dfs(), context.similarityService().similarity()));
} catch (Throwable e) {
freeContext(context);
freeContext(context.id());
cleanContext(context);
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
}
@ -335,7 +335,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
} catch (Throwable e) {
context.indexShard().searchService().onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
@ -343,7 +343,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws ElasticsearchException {
SearchContext context = createAndPutContext(request);
final SearchContext context = createAndPutContext(request);
contextProcessing(context);
try {
context.indexShard().searchService().onPreQueryPhase(context);
@ -373,7 +373,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (Throwable e) {
logger.trace("Fetch phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
@ -381,12 +381,12 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) throws ElasticsearchException {
SearchContext context = findContext(request.id());
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
context.searcher().dfSource(new CachedDfSource(context.searcher().getIndexReader(), request.dfs(), context.similarityService().similarity()));
} catch (Throwable e) {
freeContext(context);
freeContext(context.id());
cleanContext(context);
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
}
@ -418,7 +418,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (Throwable e) {
logger.trace("Fetch phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
@ -426,7 +426,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request) throws ElasticsearchException {
SearchContext context = findContext(request.id());
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
processScroll(request, context);
@ -457,7 +457,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
} catch (Throwable e) {
logger.trace("Fetch phase failed", e);
freeContext(context);
freeContext(context.id());
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
@ -465,7 +465,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticsearchException {
SearchContext context = findContext(request.id());
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
if (request.lastEmittedDoc() != null) {
@ -485,7 +485,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
} catch (Throwable e) {
context.indexShard().searchService().onFailedFetchPhase(context);
logger.trace("Fetch phase failed", e);
freeContext(context); // we just try to make sure this is freed - rethrow orig exception.
freeContext(context.id()); // we just try to make sure this is freed - rethrow orig exception.
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
@ -511,7 +511,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
return context;
} finally {
if (!success) {
freeContext(context);
freeContext(context.id());
}
}
}
@ -561,27 +561,22 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
public boolean freeContext(long id) {
SearchContext context = activeContexts.remove(id);
if (context == null) {
return false;
final SearchContext context = activeContexts.remove(id);
if (context != null) {
try {
context.indexShard().searchService().onFreeContext(context);
} finally {
context.close();
}
return true;
}
context.indexShard().searchService().onFreeContext(context);
context.close();
return true;
}
private void freeContext(SearchContext context) {
SearchContext removed = activeContexts.remove(context.id());
if (removed != null) {
removed.indexShard().searchService().onFreeContext(removed);
}
context.close();
return false;
}
public void freeAllScrollContexts() {
for (SearchContext searchContext : activeContexts.values()) {
if (searchContext.scroll() != null) {
freeContext(searchContext);
freeContext(searchContext.id());
}
}
}
@ -969,7 +964,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
} finally {
try {
if (context != null) {
freeContext(context);
freeContext(context.id());
cleanContext(context);
}
} finally {
@ -1002,7 +997,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
if ((time - lastAccessTime > context.keepAlive())) {
logger.debug("freeing search context [{}], time [{}], lastAccessTime [{}], keepAlive [{}]", context.id(), time, lastAccessTime, context.keepAlive());
freeContext(context);
freeContext(context.id());
}
}
}

View File

@ -145,7 +145,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener<Boolean> actionListener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
boolean freed = searchService.freeContext(contextId);
final boolean freed = searchService.freeContext(contextId);
actionListener.onResponse(freed);
} else {
transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener));

View File

@ -64,6 +64,7 @@ import org.elasticsearch.search.suggest.SuggestionSearchContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
@ -87,12 +88,15 @@ public abstract class SearchContext implements Releasable {
}
private Multimap<Lifetime, Releasable> clearables = null;
private final AtomicBoolean closed = new AtomicBoolean(false);
public final void close() {
try {
clearReleasables(Lifetime.CONTEXT);
} finally {
doClose();
if (closed.compareAndSet(false, true)) { // prevent double release
try {
clearReleasables(Lifetime.CONTEXT);
} finally {
doClose();
}
}
}