better handling of search context timeout

This commit is contained in:
kimchy 2010-09-21 23:17:46 +02:00
parent d0b29fe3ef
commit 2cbcc8dd2e
2 changed files with 52 additions and 7 deletions

View File

@ -148,7 +148,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
SearchContext context = createContext(request);
activeContexts.put(context.id(), context);
try {
contextProcessing(context);
dfsPhase.execute(context);
contextProcessingDone(context);
return context.dfsResult();
} catch (RuntimeException e) {
freeContext(context);
@ -160,7 +162,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
SearchContext context = createContext(request);
activeContexts.put(context.id(), context);
try {
contextProcessing(context);
queryPhase.execute(context);
contextProcessingDone(context);
return context.queryResult();
} catch (RuntimeException e) {
freeContext(context);
@ -171,7 +175,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) throws ElasticSearchException {
SearchContext context = findContext(request.id());
try {
contextProcessing(context);
processScroll(request, context);
contextProcessingDone(context);
queryPhase.execute(context);
return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget());
} catch (RuntimeException e) {
@ -182,6 +188,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
public QuerySearchResult executeQueryPhase(QuerySearchRequest request) throws ElasticSearchException {
SearchContext context = findContext(request.id());
contextProcessing(context);
try {
context.searcher().dfSource(new CachedDfSource(request.dfs(), context.similarityService().defaultSearchSimilarity()));
} catch (IOException e) {
@ -190,6 +197,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
try {
queryPhase.execute(context);
contextProcessingDone(context);
return context.queryResult();
} catch (RuntimeException e) {
freeContext(context);
@ -199,12 +207,16 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
public QueryFetchSearchResult executeFetchPhase(InternalSearchRequest request) throws ElasticSearchException {
SearchContext context = createContext(request);
activeContexts.put(context.id(), context);
contextProcessing(context);
try {
queryPhase.execute(context);
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
if (context.scroll() != null) {
activeContexts.put(context.id(), context);
if (context.scroll() == null) {
freeContext(context.id());
} else {
contextProcessingDone(context);
}
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (RuntimeException e) {
@ -215,6 +227,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) throws ElasticSearchException {
SearchContext context = findContext(request.id());
contextProcessing(context);
try {
context.searcher().dfSource(new CachedDfSource(request.dfs(), context.similarityService().defaultSearchSimilarity()));
} catch (IOException e) {
@ -227,6 +240,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
fetchPhase.execute(context);
if (context.scroll() == null) {
freeContext(request.id());
} else {
contextProcessingDone(context);
}
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (RuntimeException e) {
@ -237,6 +252,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request) throws ElasticSearchException {
SearchContext context = findContext(request.id());
contextProcessing(context);
try {
processScroll(request, context);
queryPhase.execute(context);
@ -244,6 +260,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
fetchPhase.execute(context);
if (context.scroll() == null) {
freeContext(request.id());
} else {
contextProcessingDone(context);
}
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
} catch (RuntimeException e) {
@ -254,11 +272,14 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticSearchException {
SearchContext context = findContext(request.id());
contextProcessing(context);
try {
context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
fetchPhase.execute(context);
if (context.scroll() == null) {
freeContext(request.id());
} else {
contextProcessingDone(context);
}
return context.fetchResult();
} catch (RuntimeException e) {
@ -272,8 +293,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
if (context == null) {
throw new SearchContextMissingException(id);
}
// update the last access time of the context
context.accessed(timerService.estimatedTimeInMillis());
return context;
}
@ -311,8 +330,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
keepAlive = request.scroll().keepAlive();
}
context.keepAlive(keepAlive);
context.accessed(timerService.estimatedTimeInMillis());
context.keepAliveTimeout(timerService.newTimeout(new KeepAliveTimerTask(context), keepAlive, TimerService.ExecutionType.DEFAULT));
} catch (RuntimeException e) {
context.release();
throw e;
@ -334,6 +351,21 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
context.release();
}
private void contextProcessing(SearchContext context) {
if (context.keepAliveTimeout() != null) {
((KeepAliveTimerTask) context.keepAliveTimeout().getTask()).processing();
}
}
private void contextProcessingDone(SearchContext context) {
if (context.keepAliveTimeout() != null) {
((KeepAliveTimerTask) context.keepAliveTimeout().getTask()).doneProcessing();
} else {
context.accessed(timerService.estimatedTimeInMillis());
context.keepAliveTimeout(timerService.newTimeout(new KeepAliveTimerTask(context), context.keepAlive(), TimerService.ExecutionType.DEFAULT));
}
}
private void parseSource(SearchContext context, byte[] source, int offset, int length) throws SearchParseException {
// nothing to parse...
if (source == null || length == 0) {
@ -409,10 +441,19 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final SearchContext context;
private KeepAliveTimerTask(SearchContext context) {
KeepAliveTimerTask(SearchContext context) {
this.context = context;
}
public void processing() {
context.keepAliveTimeout().cancel();
}
public void doneProcessing() {
context.accessed(timerService.estimatedTimeInMillis());
context.keepAliveTimeout(timerService.newTimeout(this, context.keepAlive(), TimerService.ExecutionType.DEFAULT));
}
@Override public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;

View File

@ -387,6 +387,10 @@ public class SearchContext implements Releasable {
this.keepAliveTimeout = keepAliveTimeout;
}
public Timeout keepAliveTimeout() {
return this.keepAliveTimeout;
}
public DfsSearchResult dfsResult() {
return dfsResult;
}