remove timer service

This commit is contained in:
kimchy 2011-02-09 19:01:05 +02:00
parent 9f2afeb4ca
commit d35f397608
12 changed files with 12 additions and 204 deletions

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
/** /**
@ -45,13 +44,10 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
private final ClusterName clusterName; private final ClusterName clusterName;
private final TimerService timerService;
@Inject public TransportClusterHealthAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, @Inject public TransportClusterHealthAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
TimerService timerService, ClusterName clusterName) { ClusterName clusterName) {
super(settings, transportService, clusterService, threadPool); super(settings, transportService, clusterService, threadPool);
this.clusterName = clusterName; this.clusterName = clusterName;
this.timerService = timerService;
} }
@Override protected String transportAction() { @Override protected String transportAction() {
@ -161,7 +157,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
if (waitForCounter == waitFor) { if (waitForCounter == waitFor) {
return response; return response;
} }
if (timerService.estimatedTimeInMillis() > endTime) { if (System.currentTimeMillis() > endTime) {
response.timedOut = true; response.timedOut = true;
return response; return response;
} }

View File

@ -63,8 +63,6 @@ import org.elasticsearch.node.internal.InternalSettingsPerparer;
import org.elasticsearch.search.TransportSearchModule; import org.elasticsearch.search.TransportSearchModule;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule; import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.timer.TimerModule;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -154,7 +152,6 @@ public class TransportClient extends AbstractClient {
modules.add(new SettingsModule(settings)); modules.add(new SettingsModule(settings));
modules.add(new NetworkModule()); modules.add(new NetworkModule());
modules.add(new ClusterNameModule(settings)); modules.add(new ClusterNameModule(settings));
modules.add(new TimerModule());
modules.add(new ThreadPoolModule(settings)); modules.add(new ThreadPoolModule(settings));
modules.add(new TransportSearchModule()); modules.add(new TransportSearchModule());
modules.add(new TransportModule(settings)); modules.add(new TransportModule(settings));
@ -220,7 +217,6 @@ public class TransportClient extends AbstractClient {
// ignore, might not be bounded // ignore, might not be bounded
} }
injector.getInstance(TimerService.class).close();
injector.getInstance(ThreadPool.class).shutdown(); injector.getInstance(ThreadPool.class).shutdown();
try { try {
injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS); injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS);

View File

@ -23,7 +23,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.timer.TimerService;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.NetworkInterface; import java.net.NetworkInterface;
@ -34,8 +33,6 @@ import java.util.Enumeration;
*/ */
public class NetworkService extends AbstractComponent { public class NetworkService extends AbstractComponent {
private final TimerService timerService;
private final NetworkProbe probe; private final NetworkProbe probe;
private final NetworkInfo info; private final NetworkInfo info;
@ -44,10 +41,9 @@ public class NetworkService extends AbstractComponent {
private NetworkStats cachedStats; private NetworkStats cachedStats;
@Inject public NetworkService(Settings settings, NetworkProbe probe, TimerService timerService) { @Inject public NetworkService(Settings settings, NetworkProbe probe) {
super(settings); super(settings);
this.probe = probe; this.probe = probe;
this.timerService = timerService;
this.refreshInterval = componentSettings.getAsTime("refresh_interval", TimeValue.timeValueSeconds(5)); this.refreshInterval = componentSettings.getAsTime("refresh_interval", TimeValue.timeValueSeconds(5));
@ -93,7 +89,7 @@ public class NetworkService extends AbstractComponent {
} }
public synchronized NetworkStats stats() { public synchronized NetworkStats stats() {
if ((timerService.estimatedTimeInMillis() - cachedStats.timestamp()) > refreshInterval.millis()) { if ((System.currentTimeMillis() - cachedStats.timestamp()) > refreshInterval.millis()) {
cachedStats = probe.networkStats(); cachedStats = probe.networkStats();
} }
return cachedStats; return cachedStats;

View File

@ -23,15 +23,12 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.timer.TimerService;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class OsService extends AbstractComponent { public class OsService extends AbstractComponent {
private final TimerService timerService;
private final OsProbe probe; private final OsProbe probe;
private final OsInfo info; private final OsInfo info;
@ -40,10 +37,9 @@ public class OsService extends AbstractComponent {
private OsStats cachedStats; private OsStats cachedStats;
@Inject public OsService(Settings settings, OsProbe probe, TimerService timerService) { @Inject public OsService(Settings settings, OsProbe probe) {
super(settings); super(settings);
this.probe = probe; this.probe = probe;
this.timerService = timerService;
this.refreshInterval = componentSettings.getAsTime("refresh_interval", TimeValue.timeValueSeconds(5)); this.refreshInterval = componentSettings.getAsTime("refresh_interval", TimeValue.timeValueSeconds(5));
@ -59,7 +55,7 @@ public class OsService extends AbstractComponent {
} }
public synchronized OsStats stats() { public synchronized OsStats stats() {
if ((timerService.estimatedTimeInMillis() - cachedStats.timestamp()) > refreshInterval.millis()) { if ((System.currentTimeMillis() - cachedStats.timestamp()) > refreshInterval.millis()) {
cachedStats = probe.osStats(); cachedStats = probe.osStats();
} }
return cachedStats; return cachedStats;

View File

@ -23,15 +23,12 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.timer.TimerService;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class ProcessService extends AbstractComponent { public class ProcessService extends AbstractComponent {
private final TimerService timerService;
private final ProcessProbe probe; private final ProcessProbe probe;
private final ProcessInfo info; private final ProcessInfo info;
@ -40,9 +37,8 @@ public class ProcessService extends AbstractComponent {
private ProcessStats cachedStats; private ProcessStats cachedStats;
@Inject public ProcessService(Settings settings, ProcessProbe probe, TimerService timerService) { @Inject public ProcessService(Settings settings, ProcessProbe probe) {
super(settings); super(settings);
this.timerService = timerService;
this.probe = probe; this.probe = probe;
this.refreshInterval = componentSettings.getAsTime("refresh_interval", TimeValue.timeValueSeconds(5)); this.refreshInterval = componentSettings.getAsTime("refresh_interval", TimeValue.timeValueSeconds(5));
@ -59,7 +55,7 @@ public class ProcessService extends AbstractComponent {
} }
public synchronized ProcessStats stats() { public synchronized ProcessStats stats() {
if ((timerService.estimatedTimeInMillis() - cachedStats.timestamp()) > refreshInterval.millis()) { if ((System.currentTimeMillis() - cachedStats.timestamp()) > refreshInterval.millis()) {
cachedStats = probe.processStats(); cachedStats = probe.processStats();
} }
return cachedStats; return cachedStats;

View File

@ -75,8 +75,6 @@ import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule; import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.timer.TimerModule;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -127,7 +125,6 @@ public final class InternalNode implements Node {
modules.add(new NodeEnvironmentModule()); modules.add(new NodeEnvironmentModule());
modules.add(new ClusterNameModule(settings)); modules.add(new ClusterNameModule(settings));
modules.add(new ThreadPoolModule(settings)); modules.add(new ThreadPoolModule(settings));
modules.add(new TimerModule());
modules.add(new DiscoveryModule(settings)); modules.add(new DiscoveryModule(settings));
modules.add(new ClusterModule(settings)); modules.add(new ClusterModule(settings));
modules.add(new RestModule(settings)); modules.add(new RestModule(settings));
@ -286,8 +283,6 @@ public final class InternalNode implements Node {
stopWatch.stop().start("script"); stopWatch.stop().start("script");
injector.getInstance(ScriptService.class).close(); injector.getInstance(ScriptService.class).close();
stopWatch.stop().start("timer");
injector.getInstance(TimerService.class).close();
stopWatch.stop().start("thread_pool"); stopWatch.stop().start("thread_pool");
injector.getInstance(ThreadPool.class).shutdown(); injector.getInstance(ThreadPool.class).shutdown();
try { try {

View File

@ -50,7 +50,6 @@ import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.query.*; import org.elasticsearch.search.query.*;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.timer.TimerService;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
@ -69,8 +68,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final IndicesService indicesService; private final IndicesService indicesService;
private final TimerService timerService;
private final ScriptService scriptService; private final ScriptService scriptService;
private final DfsPhase dfsPhase; private final DfsPhase dfsPhase;
@ -93,12 +90,11 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final ImmutableMap<String, SearchParseElement> elementParsers; private final ImmutableMap<String, SearchParseElement> elementParsers;
@Inject public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesLifecycle indicesLifecycle, ThreadPool threadPool, TimerService timerService, @Inject public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesLifecycle indicesLifecycle, ThreadPool threadPool,
ScriptService scriptService, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) { ScriptService scriptService, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) {
super(settings); super(settings);
this.clusterService = clusterService; this.clusterService = clusterService;
this.indicesService = indicesService; this.indicesService = indicesService;
this.timerService = timerService;
this.scriptService = scriptService; this.scriptService = scriptService;
this.dfsPhase = dfsPhase; this.dfsPhase = dfsPhase;
this.queryPhase = queryPhase; this.queryPhase = queryPhase;
@ -381,7 +377,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
} }
private void contextProcessedSuccessfully(SearchContext context) { private void contextProcessedSuccessfully(SearchContext context) {
context.accessed(timerService.estimatedTimeInMillis()); context.accessed(System.currentTimeMillis());
} }
private void cleanContext(SearchContext context) { private void cleanContext(SearchContext context) {
@ -471,11 +467,12 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
class Reaper implements Runnable { class Reaper implements Runnable {
@Override public void run() { @Override public void run() {
long time = System.currentTimeMillis();
for (SearchContext context : activeContexts.values()) { for (SearchContext context : activeContexts.values()) {
if (context.lastAccessTime() == -1) { // its being processed or timeout is disabled if (context.lastAccessTime() == -1) { // its being processed or timeout is disabled
continue; continue;
} }
if ((timerService.estimatedTimeInMillis() - context.lastAccessTime() > context.keepAlive())) { if ((time - context.lastAccessTime() > context.keepAlive())) {
freeContext(context); freeContext(context);
} }
} }

View File

@ -1,32 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.timer;
import org.elasticsearch.common.inject.AbstractModule;
/**
* @author kimchy (Shay Banon)
*/
public class TimerModule extends AbstractModule {
@Override protected void configure() {
bind(TimerService.class).asEagerSingleton();
}
}

View File

@ -1,126 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.timer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.timer.HashedWheelTimer;
import org.elasticsearch.common.timer.Timeout;
import org.elasticsearch.common.timer.Timer;
import org.elasticsearch.common.timer.TimerTask;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.unit.TimeValue.*;
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
/**
* @author kimchy (Shay Banon)
*/
public class TimerService extends AbstractComponent {
public static enum ExecutionType {
DEFAULT,
THREADED
}
private final ThreadPool threadPool;
private final Timer timer;
private final TimeValue tickDuration;
private final int ticksPerWheel;
public TimerService(ThreadPool threadPool) {
this(ImmutableSettings.Builder.EMPTY_SETTINGS, threadPool);
}
@Inject public TimerService(Settings settings, ThreadPool threadPool) {
super(settings);
this.threadPool = threadPool;
this.tickDuration = componentSettings.getAsTime("tick_duration", timeValueMillis(100));
this.ticksPerWheel = componentSettings.getAsInt("ticks_per_wheel", 1024);
this.timer = new HashedWheelTimer(logger, daemonThreadFactory(settings, "timer"), tickDuration.millis(), TimeUnit.MILLISECONDS, ticksPerWheel);
}
public void close() {
timer.stop();
}
public long estimatedTimeInMillis() {
// don't use the scheduled estimator so we won't wake up a thread each time
return System.currentTimeMillis();
}
public Timeout newTimeout(TimerTask task, TimeValue delay, ExecutionType executionType) {
return newTimeout(task, delay.nanos(), TimeUnit.NANOSECONDS, executionType);
}
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit, ExecutionType executionType) {
if (executionType == ExecutionType.THREADED) {
task = new ThreadedTimerTask(threadPool, task);
}
return timer.newTimeout(task, delay, unit);
}
private class ThreadedTimerTask implements TimerTask {
private final ThreadPool threadPool;
private final TimerTask task;
private ThreadedTimerTask(ThreadPool threadPool, TimerTask task) {
this.threadPool = threadPool;
this.task = task;
}
@Override public void run(final Timeout timeout) throws Exception {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try {
task.run(timeout);
} catch (Exception e) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + ".", e);
}
}
});
}
}
private static class TimeEstimator implements Runnable {
private long time = System.currentTimeMillis();
@Override public void run() {
this.time = System.currentTimeMillis();
}
public long time() {
return this.time;
}
}
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.cached.CachedThreadPool; import org.elasticsearch.threadpool.cached.CachedThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty.NettyTransport; import org.elasticsearch.transport.netty.NettyTransport;
import org.testng.annotations.Test; import org.testng.annotations.Test;
@ -45,7 +44,6 @@ public class UnicastZenPingTests {
@Test public void testSimplePings() { @Test public void testSimplePings() {
ThreadPool threadPool = new CachedThreadPool(); ThreadPool threadPool = new CachedThreadPool();
TimerService timerService = new TimerService(threadPool);
ClusterName clusterName = new ClusterName("test"); ClusterName clusterName = new ClusterName("test");
NettyTransport transportA = new NettyTransport(threadPool); NettyTransport transportA = new NettyTransport(threadPool);
final TransportService transportServiceA = new TransportService(transportA, threadPool).start(); final TransportService transportServiceA = new TransportService(transportA, threadPool).start();

View File

@ -28,7 +28,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.cached.CachedThreadPool; import org.elasticsearch.threadpool.cached.CachedThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -59,7 +58,6 @@ public class BenchmarkNettyClient {
final ThreadPool threadPool = new CachedThreadPool(settings); final ThreadPool threadPool = new CachedThreadPool(settings);
// final ThreadPool threadPool = new ScalingThreadPool(settings); // final ThreadPool threadPool = new ScalingThreadPool(settings);
final TimerService timerService = new TimerService(settings, threadPool);
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool).start(); final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
final DiscoveryNode node = new DiscoveryNode("server", new InetSocketTransportAddress("localhost", 9999)); final DiscoveryNode node = new DiscoveryNode("server", new InetSocketTransportAddress("localhost", 9999));

View File

@ -28,7 +28,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.cached.CachedThreadPool; import org.elasticsearch.threadpool.cached.CachedThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import org.elasticsearch.transport.netty.NettyTransport; import org.elasticsearch.transport.netty.NettyTransport;
@ -51,7 +50,6 @@ public class BenchmarkNettyLargeMessages {
.build(); .build();
final ThreadPool threadPool = new CachedThreadPool(settings); final ThreadPool threadPool = new CachedThreadPool(settings);
final TimerService timerService = new TimerService(settings, threadPool);
final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool), threadPool).start(); final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool), threadPool).start(); final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();