Removed the watcher executor thread pool as it is redundant now with the async indexing.

Original commit: elastic/x-pack-elasticsearch@6afaf0308b
This commit is contained in:
Martijn van Groningen 2015-04-22 22:21:58 +02:00
parent 7851ad6993
commit 34c9d6af62
11 changed files with 25 additions and 141 deletions

View File

@ -16,7 +16,6 @@ import org.elasticsearch.watcher.actions.email.service.InternalEmailService;
import org.elasticsearch.watcher.history.HistoryModule;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.init.InitializingService;
import org.elasticsearch.watcher.trigger.schedule.ScheduleModule;
import java.util.Collection;
@ -69,7 +68,6 @@ public class WatcherPlugin extends AbstractPlugin {
return ImmutableSettings.EMPTY;
}
Settings additionalSettings = settingsBuilder()
.put(ScheduleModule.additionalSettings(settings))
.put(HistoryModule.additionalSettings(settings))
.build();

View File

@ -35,9 +35,7 @@ public abstract class CronnableSchedule implements Schedule {
@Override
public long nextScheduledTimeAfter(long startTime, long time) {
if (time <= startTime) {
return startTime;
}
assert time >= startTime;
long nextTime = Long.MAX_VALUE;
for (Cron cron : crons) {
nextTime = Math.min(nextTime, cron.getNextValidTimeAfter(time));

View File

@ -34,8 +34,9 @@ public class IntervalSchedule implements Schedule {
@Override
public long nextScheduledTimeAfter(long startTime, long time) {
if (time <= startTime) {
return startTime;
assert time >= startTime;
if (startTime == time) {
time++;
}
long delta = time - startTime;
return startTime + (delta / interval.millis + 1) * interval.millis;

View File

@ -8,12 +8,10 @@ package org.elasticsearch.watcher.trigger.schedule;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.trigger.TriggerEngine;
import org.elasticsearch.watcher.trigger.schedule.engine.*;
import org.elasticsearch.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine;
import org.elasticsearch.watcher.trigger.schedule.engine.TickerScheduleTriggerEngine;
import java.util.HashMap;
import java.util.Locale;
@ -24,8 +22,6 @@ import java.util.Map;
*/
public class ScheduleModule extends AbstractModule {
public static final String THREAD_POOL_NAME = "watcher_scheduler";
private final Map<String, Class<? extends Schedule.Parser>> parsers = new HashMap<>();
public ScheduleModule() {
@ -59,20 +55,6 @@ public class ScheduleModule extends AbstractModule {
bind(ScheduleRegistry.class).asEagerSingleton();
}
public static Settings additionalSettings(Settings nodeSettings) {
Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME);
if (!settings.names().isEmpty()) {
// scheduler TP is already configured in the node settings
// no need for additional settings
return ImmutableSettings.EMPTY;
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME)
.size(availableProcessors * 2)
.queueSize(1000)
.build();
}
public enum Engine {
SCHEDULER() {

View File

@ -11,10 +11,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.*;
@ -22,7 +18,10 @@ import org.elasticsearch.watcher.trigger.schedule.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
*
@ -32,13 +31,11 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
private final Clock clock;
private volatile Schedules schedules;
private ScheduledExecutorService scheduler;
private EsThreadPoolExecutor executor;
@Inject
public SchedulerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock, ThreadPool threadPool) {
public SchedulerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
super(settings, scheduleRegistry);
this.clock = clock;
this.executor = (EsThreadPoolExecutor) threadPool.executor(ScheduleModule.THREAD_POOL_NAME);
}
@Override
@ -66,7 +63,6 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.getQueue().clear();
logger.debug("schedule engine stopped");
}
@ -92,19 +88,7 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(triggeredTime), new DateTime(scheduledTime));
for (Listener listener : listeners) {
try {
executor.execute(new ListenerRunnable(listener, event));
} catch (EsRejectedExecutionException e) {
if (logger.isDebugEnabled()) {
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
long rejected = -1;
if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) {
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
}
int queueCapacity = executor.getQueue().size();
logger.debug("can't execute trigger on the [{}] thread pool, rejected tasks [{}] queue capacity [{}]", ScheduleModule.THREAD_POOL_NAME, rejected, queueCapacity);
}
}
listener.triggered(ImmutableList.<TriggerEvent>of(event));
}
}
@ -146,22 +130,6 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
}
}
static class ListenerRunnable implements Runnable {
private final Listener listener;
private final ScheduleTriggerEvent event;
public ListenerRunnable(Listener listener, ScheduleTriggerEvent event) {
this.listener = listener;
this.event = event;
}
@Override
public void run() {
listener.triggered(ImmutableList.<TriggerEvent>of(event));
}
}
static class Schedules {
private final ActiveSchedule[] schedules;

View File

@ -10,10 +10,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.*;
@ -24,7 +20,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionHandler;
/**
*
@ -36,15 +31,13 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
private final TimeValue tickInterval;
private volatile Map<String, ActiveSchedule> schedules;
private Ticker ticker;
private EsThreadPoolExecutor executor;
@Inject
public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock, ThreadPool threadPool) {
public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
super(settings, scheduleRegistry);
this.tickInterval = settings.getAsTime("watcher.trigger.schedule.ticker.tick_interval", TimeValue.timeValueMillis(500));
this.schedules = new ConcurrentHashMap<>();
this.clock = clock;
this.executor = (EsThreadPoolExecutor) threadPool.executor(ScheduleModule.THREAD_POOL_NAME);
}
@Override
@ -64,7 +57,6 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
@Override
public void stop() {
ticker.close();
executor.getQueue().clear();
}
@Override
@ -100,19 +92,7 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
protected void notifyListeners(List<TriggerEvent> events) {
for (Listener listener : listeners) {
try {
executor.execute(new ListenerRunnable(listener, events));
} catch (EsRejectedExecutionException e) {
if (logger.isDebugEnabled()) {
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
long rejected = -1;
if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) {
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
}
int queueCapacity = executor.getQueue().size();
logger.debug("can't execute trigger on the [" + ScheduleModule.THREAD_POOL_NAME + "] thread pool, rejected tasks [" + rejected + "] queue capacity [" + queueCapacity +"]");
}
}
listener.triggered(events);
}
}
@ -146,22 +126,6 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
}
}
static class ListenerRunnable implements Runnable {
private final Listener listener;
private final List<TriggerEvent> events;
public ListenerRunnable(Listener listener, List<TriggerEvent> events) {
this.listener = listener;
this.events = events;
}
@Override
public void run() {
listener.triggered(events);
}
}
class Ticker extends Thread {
private volatile boolean active = true;

View File

@ -8,7 +8,6 @@ package org.elasticsearch.watcher.test.bench;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.trigger.Trigger;
import org.elasticsearch.watcher.trigger.TriggerEngine;
@ -49,14 +48,12 @@ public class ScheduleEngineTriggerBenchmark {
System.out.println("Running benchmark with numWatches=" + numWatches + " benchTime=" + benchTime + " interval=" + interval);
Settings settings = ImmutableSettings.builder()
.put(ScheduleModule.additionalSettings(ImmutableSettings.EMPTY))
.put("name", "test")
.build();
List<TriggerEngine.Job> jobs = new ArrayList<>(numWatches);
for (int i = 0; i < numWatches; i++) {
jobs.add(new SimpleJob("job_" + i, interval(interval + "s")));
}
ThreadPool threadPool = new ThreadPool(settings, null);
ScheduleRegistry scheduleRegistry = new ScheduleRegistry(Collections.<String, Schedule.Parser>emptyMap());
List<String> impls = new ArrayList<>(Arrays.asList(new String[]{"schedule", "ticker"}));
Collections.shuffle(impls);
@ -75,7 +72,7 @@ public class ScheduleEngineTriggerBenchmark {
final ScheduleTriggerEngine scheduler;
switch (impl) {
case "schedule":
scheduler = new SchedulerScheduleTriggerEngine(ImmutableSettings.EMPTY, scheduleRegistry, SystemClock.INSTANCE, threadPool) {
scheduler = new SchedulerScheduleTriggerEngine(ImmutableSettings.EMPTY, scheduleRegistry, SystemClock.INSTANCE) {
@Override
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
@ -86,7 +83,7 @@ public class ScheduleEngineTriggerBenchmark {
};
break;
case "ticker":
scheduler = new TickerScheduleTriggerEngine(settings, scheduleRegistry, SystemClock.INSTANCE, threadPool) {
scheduler = new TickerScheduleTriggerEngine(settings, scheduleRegistry, SystemClock.INSTANCE) {
@Override
protected void notifyListeners(List<TriggerEvent> events) {

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@ -150,6 +151,7 @@ public class WatcherScheduleEngineBenchmark {
if (actualLoadedWatches != numWatches) {
throw new IllegalStateException("Expected [" + numWatches + "] watched to be loaded, but only [" + actualLoadedWatches + "] watches were actually loaded");
}
long startTime = clock.millis();
System.out.println("==> watcher started, waiting [" + benchTime + "] seconds now...");
final AtomicBoolean start = new AtomicBoolean(true);
@ -170,7 +172,6 @@ public class WatcherScheduleEngineBenchmark {
}
});
sampleThread.start();
long startTime = clock.millis();
Thread.sleep(benchTime);
long endTime = clock.millis();
start.set(false);
@ -179,14 +180,12 @@ public class WatcherScheduleEngineBenchmark {
NodesStatsResponse response = client.admin().cluster().prepareNodesStats().setThreadPool(true).get();
for (NodeStats nodeStats : response) {
for (ThreadPoolStats.Stats threadPoolStats : nodeStats.getThreadPool()) {
if ("watcher_scheduler".equals(threadPoolStats.getName())) {
stats.setWatcherExecutorThreadPoolStats(threadPoolStats);
} else if ("watcher".equals(threadPoolStats.getName())) {
if ("watcher".equals(threadPoolStats.getName())) {
stats.setWatcherThreadPoolStats(threadPoolStats);
}
}
}
client.admin().indices().prepareRefresh("_all").get();
client.admin().indices().prepareRefresh(HistoryStore.INDEX_PREFIX + "*").get();
SearchResponse searchResponse = client.prepareSearch(HistoryStore.INDEX_PREFIX + "*")
.setQuery(QueryBuilders.rangeQuery("trigger_event.schedule.scheduled_time").gte(startTime).lte(endTime))
.addAggregation(terms("state").field("state"))
@ -224,8 +223,8 @@ public class WatcherScheduleEngineBenchmark {
System.out.println();
System.out.println("### Watcher execution and watcher thread pool stats");
System.out.println();
System.out.println(" Name | avg heap used | wetp rejected | wetp completed | wtp rejected | wtp completed");
System.out.println("---------- | ------------- | ------------- | -------------- | ------------ | -------------");
System.out.println(" Name | avg heap used | wtp rejected | wtp completed");
System.out.println("---------- | ------------- | ------------ | -------------");
for (BenchStats benchStats : results.values()) {
benchStats.printThreadStats();
}
@ -252,9 +251,7 @@ public class WatcherScheduleEngineBenchmark {
private final String name;
private final int numWatches;
private ThreadPoolStats.Stats watcherThreadPoolStats;
private ThreadPoolStats.Stats watcherExecutorThreadPoolStats;
private Terms stateStats;
private Histogram delayStats;
@ -284,14 +281,6 @@ public class WatcherScheduleEngineBenchmark {
this.watcherThreadPoolStats = watcherThreadPoolStats;
}
public ThreadPoolStats.Stats getWatcherExecutorThreadPoolStats() {
return watcherExecutorThreadPoolStats;
}
public void setWatcherExecutorThreadPoolStats(ThreadPoolStats.Stats watcherExecutorThreadPoolStats) {
this.watcherExecutorThreadPoolStats = watcherExecutorThreadPoolStats;
}
public Terms getStateStats() {
return stateStats;
}
@ -323,9 +312,8 @@ public class WatcherScheduleEngineBenchmark {
public void printThreadStats() throws IOException {
System.out.printf(
Locale.ENGLISH,
"%10s | %13s | %13d | %14d | %12d | %13d \n",
"%10s | %13s | %12d | %13d \n",
name, new ByteSizeValue(avgHeapUsed),
watcherExecutorThreadPoolStats.getRejected(), watcherExecutorThreadPoolStats.getCompleted(),
watcherThreadPoolStats.getRejected(), watcherThreadPoolStats.getCompleted()
);
}

View File

@ -8,11 +8,7 @@ package org.elasticsearch.watcher.trigger.schedule.engine;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.WatcherPlugin;
import org.elasticsearch.watcher.trigger.Trigger;
import org.elasticsearch.watcher.trigger.TriggerEngine;
import org.elasticsearch.watcher.trigger.TriggerEvent;
@ -42,17 +38,10 @@ import static org.hamcrest.Matchers.is;
@Ignore
public abstract class BaseTriggerEngineTests extends ElasticsearchTestCase {
protected ThreadPool threadPool;
private TriggerEngine engine;
@Before
public void init() throws Exception {
WatcherPlugin plugin = new WatcherPlugin(ImmutableSettings.EMPTY);
Settings settings = ImmutableSettings.builder()
.put(plugin.additionalSettings())
.put("name", "test")
.build();
threadPool = new ThreadPool(settings, null);
engine = createEngine();
}
@ -61,7 +50,6 @@ public abstract class BaseTriggerEngineTests extends ElasticsearchTestCase {
@After
public void cleanup() throws Exception {
engine.stop();
threadPool.shutdownNow();
}
@Test

View File

@ -20,7 +20,7 @@ import static org.mockito.Mockito.mock;
public class SchedulerScheduleEngineTests extends BaseTriggerEngineTests {
protected TriggerEngine createEngine() {
return new SchedulerScheduleTriggerEngine(ImmutableSettings.EMPTY, mock(ScheduleRegistry.class), SystemClock.INSTANCE, threadPool);
return new SchedulerScheduleTriggerEngine(ImmutableSettings.EMPTY, mock(ScheduleRegistry.class), SystemClock.INSTANCE);
}
}

View File

@ -18,6 +18,6 @@ public class TickerScheduleEngineTests extends BaseTriggerEngineTests {
@Override
protected TriggerEngine createEngine() {
return new TickerScheduleTriggerEngine(ImmutableSettings.EMPTY, mock(ScheduleRegistry.class), SystemClock.INSTANCE, threadPool);
return new TickerScheduleTriggerEngine(ImmutableSettings.EMPTY, mock(ScheduleRegistry.class), SystemClock.INSTANCE);
}
}