Errors (like StackOverflow) can cause a search context to not be released
It will eventually time out (with the default 5 minutes timeout), but we should properly handle it, and also, properly propagate the failure. closes #3513
This commit is contained in:
parent
4a15106d6a
commit
28ae4d6393
|
@ -30,6 +30,20 @@ public final class ExceptionsHelper {
|
|||
|
||||
private static final ESLogger logger = Loggers.getLogger(ExceptionsHelper.class);
|
||||
|
||||
public static RuntimeException convertToRuntime(Throwable t) {
|
||||
if (t instanceof RuntimeException) {
|
||||
return (RuntimeException) t;
|
||||
}
|
||||
return new ElasticSearchException(t.getMessage(), t);
|
||||
}
|
||||
|
||||
public static ElasticSearchException convertToElastic(Throwable t) {
|
||||
if (t instanceof ElasticSearchException) {
|
||||
return (ElasticSearchException) t;
|
||||
}
|
||||
return new ElasticSearchException(t.getMessage(), t);
|
||||
}
|
||||
|
||||
public static RestStatus status(Throwable t) {
|
||||
if (t instanceof ElasticSearchException) {
|
||||
return ((ElasticSearchException) t).status();
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.search;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.cache.recycler.CacheRecycler;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
|
@ -58,7 +59,6 @@ import org.elasticsearch.search.query.*;
|
|||
import org.elasticsearch.search.warmer.IndexWarmersMetaData;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
@ -175,10 +175,10 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
dfsPhase.execute(context);
|
||||
contextProcessedSuccessfully(context);
|
||||
return context.dfsResult();
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
logger.trace("Dfs phase failed", e);
|
||||
freeContext(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -197,10 +197,10 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
queryPhase.execute(context);
|
||||
contextProcessedSuccessfully(context);
|
||||
return context.queryResult();
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
logger.trace("Scan phase failed", e);
|
||||
freeContext(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -225,10 +225,10 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
logger.trace("Scan phase failed", e);
|
||||
freeContext(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -248,11 +248,11 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
}
|
||||
context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);
|
||||
return context.queryResult();
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
context.indexShard().searchService().onFailedQueryPhase(context);
|
||||
logger.trace("Query phase failed", e);
|
||||
freeContext(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -269,11 +269,11 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
contextProcessedSuccessfully(context);
|
||||
context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);
|
||||
return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget());
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
context.indexShard().searchService().onFailedQueryPhase(context);
|
||||
logger.trace("Query phase failed", e);
|
||||
freeContext(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -284,7 +284,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
contextProcessing(context);
|
||||
try {
|
||||
context.searcher().dfSource(new CachedDfSource(context.searcher().getIndexReader(), request.dfs(), context.similarityService().similarity()));
|
||||
} catch (IOException e) {
|
||||
} catch (Throwable e) {
|
||||
freeContext(context);
|
||||
cleanContext(context);
|
||||
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
|
||||
|
@ -296,11 +296,11 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
contextProcessedSuccessfully(context);
|
||||
context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);
|
||||
return context.queryResult();
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
context.indexShard().searchService().onFailedQueryPhase(context);
|
||||
logger.trace("Query phase failed", e);
|
||||
freeContext(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -314,9 +314,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
long time = System.nanoTime();
|
||||
try {
|
||||
queryPhase.execute(context);
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
context.indexShard().searchService().onFailedQueryPhase(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
long time2 = System.nanoTime();
|
||||
context.indexShard().searchService().onQueryPhase(context, time2 - time);
|
||||
|
@ -329,16 +329,16 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
context.indexShard().searchService().onFailedFetchPhase(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time2);
|
||||
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
logger.trace("Fetch phase failed", e);
|
||||
freeContext(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -349,7 +349,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
contextProcessing(context);
|
||||
try {
|
||||
context.searcher().dfSource(new CachedDfSource(context.searcher().getIndexReader(), request.dfs(), context.similarityService().similarity()));
|
||||
} catch (IOException e) {
|
||||
} catch (Throwable e) {
|
||||
freeContext(context);
|
||||
cleanContext(context);
|
||||
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
|
||||
|
@ -359,9 +359,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
long time = System.nanoTime();
|
||||
try {
|
||||
queryPhase.execute(context);
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
context.indexShard().searchService().onFailedQueryPhase(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
long time2 = System.nanoTime();
|
||||
context.indexShard().searchService().onQueryPhase(context, time2 - time);
|
||||
|
@ -374,16 +374,16 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
context.indexShard().searchService().onFailedFetchPhase(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time2);
|
||||
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
logger.trace("Fetch phase failed", e);
|
||||
freeContext(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -398,9 +398,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
long time = System.nanoTime();
|
||||
try {
|
||||
queryPhase.execute(context);
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
context.indexShard().searchService().onFailedQueryPhase(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
long time2 = System.nanoTime();
|
||||
context.indexShard().searchService().onQueryPhase(context, time2 - time);
|
||||
|
@ -413,16 +413,16 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
context.indexShard().searchService().onFailedFetchPhase(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time2);
|
||||
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
logger.trace("Fetch phase failed", e);
|
||||
freeContext(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -443,11 +443,11 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
}
|
||||
context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time);
|
||||
return context.fetchResult();
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
context.indexShard().searchService().onFailedFetchPhase(context);
|
||||
logger.trace("Fetch phase failed", e);
|
||||
freeContext(context);
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
|
@ -507,9 +507,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
keepAlive = request.scroll().keepAlive().millis();
|
||||
}
|
||||
context.keepAlive(keepAlive);
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Throwable e) {
|
||||
context.release();
|
||||
throw e;
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
|
||||
return context;
|
||||
|
@ -567,7 +567,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
break;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
String sSource = "_na_";
|
||||
try {
|
||||
sSource = XContentHelper.convertToJson(source, false);
|
||||
|
|
Loading…
Reference in New Issue