diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 51f3c4359b9..449ea21b95a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -36,14 +36,18 @@ public class EsExecutors { threadFactory); } - public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) { + public static String threadName(Settings settings, String namePrefix) { String name = settings.get("name"); if (name == null) { name = "elasticsearch"; } else { name = "elasticsearch[" + name + "]"; } - return daemonThreadFactory(name + namePrefix); + return name + namePrefix; + } + + public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) { + return daemonThreadFactory(threadName(settings, namePrefix)); } /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java index 461f87cf677..91d3adc7c33 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java @@ -65,6 +65,8 @@ import static org.elasticsearch.common.unit.TimeValue.*; */ public class SearchService extends AbstractLifecycleComponent { + private final ThreadPool threadPool; + private final ClusterService clusterService; private final IndicesService indicesService; @@ -94,6 +96,7 @@ public class SearchService extends AbstractLifecycleComponent { @Inject public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesLifecycle indicesLifecycle, ThreadPool threadPool, ScriptService scriptService, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) { super(settings); + this.threadPool = threadPool; this.clusterService = clusterService; this.indicesService = indicesService; this.scriptService = scriptService; @@ -431,7 +434,7 @@ public class SearchService extends AbstractLifecycleComponent { } private void contextProcessedSuccessfully(SearchContext context) { - context.accessed(System.currentTimeMillis()); + context.accessed(threadPool.estimatedTimeInMillis()); } private void cleanContext(SearchContext context) { @@ -535,7 +538,7 @@ public class SearchService extends AbstractLifecycleComponent { class Reaper implements Runnable { @Override public void run() { - long time = System.currentTimeMillis(); + long time = threadPool.estimatedTimeInMillis(); for (SearchContext context : activeContexts.values()) { if (context.lastAccessTime() == -1) { // its being processed or timeout is disabled continue; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index f2cda80f923..fc35bdeb231 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -60,6 +60,8 @@ public class ThreadPool extends AbstractComponent { private final ScheduledExecutorService scheduler; + private final EstimatedTimeThread estimatedTimeThread; + public ThreadPool() { this(ImmutableSettings.Builder.EMPTY_SETTINGS); } @@ -80,6 +82,14 @@ public class ThreadPool extends AbstractComponent { executors.put(Names.SAME, MoreExecutors.sameThreadExecutor()); this.executors = ImmutableMap.copyOf(executors); this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "[scheduler]")); + + TimeValue estimatedTimeInterval = componentSettings.getAsTime("estimated_time_interval", TimeValue.timeValueMillis(200)); + this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis()); + this.estimatedTimeThread.start(); + } + + public long estimatedTimeInMillis() { + return estimatedTimeThread.estimatedTimeInMillis(); } public Executor cached() { @@ -106,6 +116,8 @@ public class ThreadPool extends AbstractComponent { } public void shutdown() { + estimatedTimeThread.running = false; + estimatedTimeThread.interrupt(); scheduler.shutdown(); for (Executor executor : executors.values()) { if (executor instanceof ThreadPoolExecutor) { @@ -114,6 +126,17 @@ public class ThreadPool extends AbstractComponent { } } + public void shutdownNow() { + estimatedTimeThread.running = false; + estimatedTimeThread.interrupt(); + scheduler.shutdownNow(); + for (Executor executor : executors.values()) { + if (executor instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor) executor).shutdownNow(); + } + } + } + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { boolean result = scheduler.awaitTermination(timeout, unit); for (Executor executor : executors.values()) { @@ -124,15 +147,6 @@ public class ThreadPool extends AbstractComponent { return result; } - public void shutdownNow() { - scheduler.shutdownNow(); - for (Executor executor : executors.values()) { - if (executor instanceof ThreadPoolExecutor) { - ((ThreadPoolExecutor) executor).shutdownNow(); - } - } - } - private Executor build(String name, String defaultType, @Nullable Settings settings, Settings defaultSettings) { if (settings == null) { settings = ImmutableSettings.Builder.EMPTY_SETTINGS; @@ -230,4 +244,35 @@ public class ThreadPool extends AbstractComponent { return "[threaded] " + runnable.toString(); } } + + static class EstimatedTimeThread extends Thread { + + final long interval; + + volatile boolean running = true; + + volatile long estimatedTimeInMillis; + + EstimatedTimeThread(String name, long interval) { + super(name); + this.interval = interval; + setDaemon(true); + } + + public long estimatedTimeInMillis() { + return this.estimatedTimeInMillis; + } + + @Override public void run() { + while (running) { + estimatedTimeInMillis = System.currentTimeMillis(); + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + running = false; + return; + } + } + } + } }