Replaced quartz thread pool by an impl that is backed by ES thread pool

Original commit: elastic/x-pack-elasticsearch@24a055dc00
This commit is contained in:
Martijn van Groningen 2014-11-07 23:47:25 +01:00
parent b11f0bf6df
commit 94cf006b8f
4 changed files with 69 additions and 8 deletions

View File

@ -310,12 +310,14 @@ public class AlertActionManager extends AbstractComponent {
logger.debug("Stopping thread to read from the job queue");
return;
}
threadPool.executor(AlertsPlugin.THREAD_POOL_NAME).execute(new AlertHistoryRunnable(entry));
threadPool.executor(AlertsPlugin.ALERT_THREAD_POOL_NAME).execute(new AlertHistoryRunnable(entry));
}
} catch (Exception e) {
logger.error("Error during reader thread, restarting queue reader thread...", e);
if (started()) {
if (started() && !(e instanceof InterruptedException)) {
logger.error("Error during reader thread, restarting queue reader thread...", e);
threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueReaderThread());
} else {
logger.error("Error during reader thread", e);
}
}
}

View File

@ -17,10 +17,11 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde
public class AlertsPlugin extends AbstractPlugin {
public static final String THREAD_POOL_NAME = "alerts";
public static final String ALERT_THREAD_POOL_NAME = "alerts";
public static final String SCHEDULER_THREAD_POOL_NAME = "alerts";
@Override public String name() {
return THREAD_POOL_NAME;
return ALERT_THREAD_POOL_NAME;
}
@Override public String description() {
@ -37,7 +38,9 @@ public class AlertsPlugin extends AbstractPlugin {
@Override
public Settings additionalSettings() {
return settingsBuilder()
.put("threadpool."+ THREAD_POOL_NAME + ".type","cached")
.put("threadpool." + ALERT_THREAD_POOL_NAME + ".type", "fixed")
.put("threadpool." + ALERT_THREAD_POOL_NAME + ".size", 32) // Executing an alert involves a lot of wait time for networking (search, several index requests + optional trigger logic)
.put("threadpool." + SCHEDULER_THREAD_POOL_NAME + ".type", "cached")
.build();
}

View File

@ -8,22 +8,31 @@ package org.elasticsearch.alerts.scheduler;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.plugin.AlertsPlugin;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.simpl.SimpleJobFactory;
import java.util.Properties;
public class AlertScheduler extends AbstractComponent {
// Not happy about it, but otherwise we're stuck with Quartz's SimpleThreadPool
private volatile static ThreadPool threadPool;
private volatile Scheduler scheduler;
private AlertManager alertManager;
@Inject
public AlertScheduler(Settings settings) {
public AlertScheduler(Settings settings, ThreadPool threadPool) {
super(settings);
AlertScheduler.threadPool = threadPool;
}
public void setAlertManager(AlertManager alertManager){
@ -34,7 +43,9 @@ public class AlertScheduler extends AbstractComponent {
try {
logger.info("Starting scheduler");
// Can't start a scheduler that has been shutdown, so we need to re-create each time start() is invoked
SchedulerFactory schFactory = new StdSchedulerFactory();
Properties properties = new Properties();
properties.setProperty("org.quartz.threadPool.class", AlertQuartzThreadPool.class.getName());
SchedulerFactory schFactory = new StdSchedulerFactory(properties);
scheduler = schFactory.getScheduler();
scheduler.setJobFactory(new SimpleJobFactory());
scheduler.start();
@ -93,4 +104,48 @@ public class AlertScheduler extends AbstractComponent {
}
}
// This Quartz thread pool will always accept. On this thread we will only index an alert action and add it to the work queue
public static final class AlertQuartzThreadPool implements org.quartz.spi.ThreadPool {
private final EsThreadPoolExecutor executor;
public AlertQuartzThreadPool() {
this.executor = (EsThreadPoolExecutor) threadPool.executor(AlertsPlugin.SCHEDULER_THREAD_POOL_NAME);
}
@Override
public boolean runInThread(Runnable runnable) {
executor.execute(runnable);
return true;
}
@Override
public int blockForAvailableThreads() {
return 1;
}
@Override
public void initialize() throws SchedulerConfigException {
}
@Override
public void shutdown(boolean waitForJobsToComplete) {
}
@Override
public int getPoolSize() {
return 1;
}
@Override
public void setInstanceId(String schedInstId) {
}
@Override
public void setInstanceName(String schedName) {
}
}
}

View File

@ -57,6 +57,7 @@ public class BasicAlertingTest extends AbstractAlertingTests {
assertNotNull(deleteAlertResponse.deleteResponse());
assertTrue(deleteAlertResponse.deleteResponse().isFound());
refresh();
assertHitCount(client().prepareCount(AlertsStore.ALERT_INDEX).get(), 0l);
}