add an estimated time thread and use it where we don't need exact time

This commit is contained in:
kimchy 2011-04-16 16:57:02 +03:00
parent 2cc4a286f8
commit a1796c3408
3 changed files with 65 additions and 13 deletions

View File

@ -36,14 +36,18 @@ public class EsExecutors {
threadFactory); threadFactory);
} }
public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) { public static String threadName(Settings settings, String namePrefix) {
String name = settings.get("name"); String name = settings.get("name");
if (name == null) { if (name == null) {
name = "elasticsearch"; name = "elasticsearch";
} else { } else {
name = "elasticsearch[" + name + "]"; name = "elasticsearch[" + name + "]";
} }
return daemonThreadFactory(name + namePrefix); return name + namePrefix;
}
public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) {
return daemonThreadFactory(threadName(settings, namePrefix));
} }
/** /**

View File

@ -65,6 +65,8 @@ import static org.elasticsearch.common.unit.TimeValue.*;
*/ */
public class SearchService extends AbstractLifecycleComponent<SearchService> { public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final ThreadPool threadPool;
private final ClusterService clusterService; private final ClusterService clusterService;
private final IndicesService indicesService; private final IndicesService indicesService;
@ -94,6 +96,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
@Inject public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesLifecycle indicesLifecycle, ThreadPool threadPool, @Inject public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesLifecycle indicesLifecycle, ThreadPool threadPool,
ScriptService scriptService, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) { ScriptService scriptService, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) {
super(settings); super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService; this.clusterService = clusterService;
this.indicesService = indicesService; this.indicesService = indicesService;
this.scriptService = scriptService; this.scriptService = scriptService;
@ -431,7 +434,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
} }
private void contextProcessedSuccessfully(SearchContext context) { private void contextProcessedSuccessfully(SearchContext context) {
context.accessed(System.currentTimeMillis()); context.accessed(threadPool.estimatedTimeInMillis());
} }
private void cleanContext(SearchContext context) { private void cleanContext(SearchContext context) {
@ -535,7 +538,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
class Reaper implements Runnable { class Reaper implements Runnable {
@Override public void run() { @Override public void run() {
long time = System.currentTimeMillis(); long time = threadPool.estimatedTimeInMillis();
for (SearchContext context : activeContexts.values()) { for (SearchContext context : activeContexts.values()) {
if (context.lastAccessTime() == -1) { // its being processed or timeout is disabled if (context.lastAccessTime() == -1) { // its being processed or timeout is disabled
continue; continue;

View File

@ -60,6 +60,8 @@ public class ThreadPool extends AbstractComponent {
private final ScheduledExecutorService scheduler; private final ScheduledExecutorService scheduler;
private final EstimatedTimeThread estimatedTimeThread;
public ThreadPool() { public ThreadPool() {
this(ImmutableSettings.Builder.EMPTY_SETTINGS); this(ImmutableSettings.Builder.EMPTY_SETTINGS);
} }
@ -80,6 +82,14 @@ public class ThreadPool extends AbstractComponent {
executors.put(Names.SAME, MoreExecutors.sameThreadExecutor()); executors.put(Names.SAME, MoreExecutors.sameThreadExecutor());
this.executors = ImmutableMap.copyOf(executors); this.executors = ImmutableMap.copyOf(executors);
this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "[scheduler]")); this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "[scheduler]"));
TimeValue estimatedTimeInterval = componentSettings.getAsTime("estimated_time_interval", TimeValue.timeValueMillis(200));
this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.estimatedTimeThread.start();
}
public long estimatedTimeInMillis() {
return estimatedTimeThread.estimatedTimeInMillis();
} }
public Executor cached() { public Executor cached() {
@ -106,6 +116,8 @@ public class ThreadPool extends AbstractComponent {
} }
public void shutdown() { public void shutdown() {
estimatedTimeThread.running = false;
estimatedTimeThread.interrupt();
scheduler.shutdown(); scheduler.shutdown();
for (Executor executor : executors.values()) { for (Executor executor : executors.values()) {
if (executor instanceof ThreadPoolExecutor) { if (executor instanceof ThreadPoolExecutor) {
@ -114,6 +126,17 @@ public class ThreadPool extends AbstractComponent {
} }
} }
public void shutdownNow() {
estimatedTimeThread.running = false;
estimatedTimeThread.interrupt();
scheduler.shutdownNow();
for (Executor executor : executors.values()) {
if (executor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) executor).shutdownNow();
}
}
}
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
boolean result = scheduler.awaitTermination(timeout, unit); boolean result = scheduler.awaitTermination(timeout, unit);
for (Executor executor : executors.values()) { for (Executor executor : executors.values()) {
@ -124,15 +147,6 @@ public class ThreadPool extends AbstractComponent {
return result; return result;
} }
public void shutdownNow() {
scheduler.shutdownNow();
for (Executor executor : executors.values()) {
if (executor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) executor).shutdownNow();
}
}
}
private Executor build(String name, String defaultType, @Nullable Settings settings, Settings defaultSettings) { private Executor build(String name, String defaultType, @Nullable Settings settings, Settings defaultSettings) {
if (settings == null) { if (settings == null) {
settings = ImmutableSettings.Builder.EMPTY_SETTINGS; settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
@ -230,4 +244,35 @@ public class ThreadPool extends AbstractComponent {
return "[threaded] " + runnable.toString(); return "[threaded] " + runnable.toString();
} }
} }
static class EstimatedTimeThread extends Thread {
final long interval;
volatile boolean running = true;
volatile long estimatedTimeInMillis;
EstimatedTimeThread(String name, long interval) {
super(name);
this.interval = interval;
setDaemon(true);
}
public long estimatedTimeInMillis() {
return this.estimatedTimeInMillis;
}
@Override public void run() {
while (running) {
estimatedTimeInMillis = System.currentTimeMillis();
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
running = false;
return;
}
}
}
}
} }