update search expiration to work in scheduled reaper mode

This commit is contained in:
kimchy 2010-10-21 16:38:44 +02:00
parent c37a0afbf0
commit 5649df572a
2 changed files with 29 additions and 64 deletions

View File

@ -27,8 +27,6 @@ import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.timer.Timeout;
import org.elasticsearch.common.timer.TimerTask;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
@ -50,13 +48,14 @@ import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchRequest; import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.query.*; import org.elasticsearch.search.query.*;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.timer.TimerService; import org.elasticsearch.timer.TimerService;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.common.unit.TimeValue.*; import static org.elasticsearch.common.unit.TimeValue.*;
@ -81,7 +80,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final FetchPhase fetchPhase; private final FetchPhase fetchPhase;
private final TimeValue defaultKeepAlive; private final long defaultKeepAlive;
private final ScheduledFuture keepAliveReaper;
private final AtomicLong idGenerator = new AtomicLong(); private final AtomicLong idGenerator = new AtomicLong();
@ -92,7 +93,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final ImmutableMap<String, SearchParseElement> elementParsers; private final ImmutableMap<String, SearchParseElement> elementParsers;
@Inject public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesLifecycle indicesLifecycle, TimerService timerService, @Inject public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesLifecycle indicesLifecycle, ThreadPool threadPool, TimerService timerService,
ScriptService scriptService, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) { ScriptService scriptService, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) {
super(settings); super(settings);
this.clusterService = clusterService; this.clusterService = clusterService;
@ -103,8 +104,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
this.queryPhase = queryPhase; this.queryPhase = queryPhase;
this.fetchPhase = fetchPhase; this.fetchPhase = fetchPhase;
TimeValue keepAliveInterval = componentSettings.getAsTime("keep_alive_interval", timeValueMinutes(1));
// we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes // we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes
this.defaultKeepAlive = componentSettings.getAsTime("default_keep_alive", timeValueMinutes(5)); this.defaultKeepAlive = componentSettings.getAsTime("default_keep_alive", timeValueMinutes(5)).millis();
Map<String, SearchParseElement> elementParsers = new HashMap<String, SearchParseElement>(); Map<String, SearchParseElement> elementParsers = new HashMap<String, SearchParseElement>();
elementParsers.putAll(dfsPhase.parseElements()); elementParsers.putAll(dfsPhase.parseElements());
@ -112,6 +114,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
elementParsers.putAll(fetchPhase.parseElements()); elementParsers.putAll(fetchPhase.parseElements());
this.elementParsers = ImmutableMap.copyOf(elementParsers); this.elementParsers = ImmutableMap.copyOf(elementParsers);
indicesLifecycle.addListener(indicesLifecycleListener); indicesLifecycle.addListener(indicesLifecycleListener);
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval);
} }
@Override protected void doStart() throws ElasticSearchException { @Override protected void doStart() throws ElasticSearchException {
@ -125,6 +129,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
} }
@Override protected void doClose() throws ElasticSearchException { @Override protected void doClose() throws ElasticSearchException {
keepAliveReaper.cancel(false);
indicesService.indicesLifecycle().removeListener(indicesLifecycleListener); indicesService.indicesLifecycle().removeListener(indicesLifecycleListener);
} }
@ -344,9 +349,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
fetchPhase.preProcess(context); fetchPhase.preProcess(context);
// compute the context keep alive // compute the context keep alive
TimeValue keepAlive = defaultKeepAlive; long keepAlive = defaultKeepAlive;
if (request.scroll() != null && request.scroll().keepAlive() != null) { if (request.scroll() != null && request.scroll().keepAlive() != null) {
keepAlive = request.scroll().keepAlive(); keepAlive = request.scroll().keepAlive().millis();
} }
context.keepAlive(keepAlive); context.keepAlive(keepAlive);
} catch (RuntimeException e) { } catch (RuntimeException e) {
@ -371,18 +376,12 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
} }
private void contextProcessing(SearchContext context) { private void contextProcessing(SearchContext context) {
if (context.keepAliveTimeout() != null) { // disable timeout while executing a search
((KeepAliveTimerTask) context.keepAliveTimeout().getTask()).processing(); context.accessed(-1);
}
} }
private void contextProcessedSuccessfully(SearchContext context) { private void contextProcessedSuccessfully(SearchContext context) {
if (context.keepAliveTimeout() != null) { context.accessed(timerService.estimatedTimeInMillis());
((KeepAliveTimerTask) context.keepAliveTimeout().getTask()).doneProcessing();
} else {
context.accessed(timerService.estimatedTimeInMillis());
context.keepAliveTimeout(timerService.newTimeout(new KeepAliveTimerTask(context), context.keepAlive(), TimerService.ExecutionType.DEFAULT));
}
} }
private void cleanContext(SearchContext context) { private void cleanContext(SearchContext context) {
@ -451,7 +450,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
context.scroll(request.scroll()); context.scroll(request.scroll());
// update the context keep alive based on the new scroll value // update the context keep alive based on the new scroll value
if (request.scroll() != null && request.scroll().keepAlive() != null) { if (request.scroll() != null && request.scroll().keepAlive() != null) {
context.keepAlive(request.scroll().keepAlive()); context.keepAlive(request.scroll().keepAlive().millis());
} }
} }
@ -466,35 +465,15 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
} }
} }
class KeepAliveTimerTask implements TimerTask { class Reaper implements Runnable {
@Override public void run() {
private final SearchContext context; for (SearchContext context : activeContexts.values()) {
if (context.lastAccessTime() == -1) { // its being processed or timeout is disabled
KeepAliveTimerTask(SearchContext context) { continue;
this.context = context; }
} if ((timerService.estimatedTimeInMillis() - context.lastAccessTime() > context.keepAlive())) {
freeContext(context);
public void processing() { }
context.keepAliveTimeout().cancel();
}
public void doneProcessing() {
context.accessed(timerService.estimatedTimeInMillis());
context.keepAliveTimeout(timerService.newTimeout(this, context.keepAlive(), TimerService.ExecutionType.DEFAULT));
}
@Override public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}
long currentTime = timerService.estimatedTimeInMillis();
long nextDelay = context.keepAlive().millis() - (currentTime - context.lastAccessTime());
if (nextDelay <= 0) {
// Time out, free the context (and remove it from the active context)
freeContext(context.id());
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
context.keepAliveTimeout(timerService.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS, TimerService.ExecutionType.DEFAULT));
} }
} }
} }

View File

@ -25,7 +25,6 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.timer.Timeout;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.cache.field.data.FieldDataCache; import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.cache.filter.FilterCache; import org.elasticsearch.index.cache.filter.FilterCache;
@ -130,12 +129,10 @@ public class SearchContext implements Releasable {
private boolean queryRewritten; private boolean queryRewritten;
private volatile TimeValue keepAlive; private volatile long keepAlive;
private volatile long lastAccessTime; private volatile long lastAccessTime;
private volatile Timeout keepAliveTimeout;
public SearchContext(long id, SearchShardTarget shardTarget, int numberOfShards, TimeValue timeout, public SearchContext(long id, SearchShardTarget shardTarget, int numberOfShards, TimeValue timeout,
String[] types, Engine.Searcher engineSearcher, IndexService indexService, ScriptService scriptService) { String[] types, Engine.Searcher engineSearcher, IndexService indexService, ScriptService scriptService) {
this.id = id; this.id = id;
@ -161,9 +158,6 @@ public class SearchContext implements Releasable {
// ignore any exception here // ignore any exception here
} }
engineSearcher.release(); engineSearcher.release();
if (keepAliveTimeout != null) {
keepAliveTimeout.cancel();
}
return true; return true;
} }
@ -392,22 +386,14 @@ public class SearchContext implements Releasable {
return this.lastAccessTime; return this.lastAccessTime;
} }
public TimeValue keepAlive() { public long keepAlive() {
return this.keepAlive; return this.keepAlive;
} }
public void keepAlive(TimeValue keepAlive) { public void keepAlive(long keepAlive) {
this.keepAlive = keepAlive; this.keepAlive = keepAlive;
} }
public void keepAliveTimeout(Timeout keepAliveTimeout) {
this.keepAliveTimeout = keepAliveTimeout;
}
public Timeout keepAliveTimeout() {
return this.keepAliveTimeout;
}
public ScriptSearchLookup scriptSearchLookup() { public ScriptSearchLookup scriptSearchLookup() {
if (scriptSearchLookup == null) { if (scriptSearchLookup == null) {
scriptSearchLookup = new ScriptSearchLookup(mapperService(), fieldDataCache()); scriptSearchLookup = new ScriptSearchLookup(mapperService(), fieldDataCache());