* Log EsRejectedExecutionException better than it is now. (letting it bubble up the the thread that is actually firing triggers, which then prints uncaught exception...)

* if we have EsRejectedExecutionException log on debug level.

Original commit: elastic/x-pack-elasticsearch@e8eb8fcf36
This commit is contained in:
Martijn van Groningen 2015-04-16 12:07:17 +02:00
parent a1b3d41822
commit c8a0c27934
5 changed files with 80 additions and 9 deletions

View File

@ -15,7 +15,9 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.support.clock.Clock;
@ -24,6 +26,7 @@ import org.elasticsearch.watcher.trigger.schedule.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
/**
@ -118,7 +121,19 @@ public class HashWheelScheduleTriggerEngine extends ScheduleTriggerEngine {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime));
for (Listener listener : listeners) {
executor.execute(new ListenerRunnable(listener, name, event));
try {
executor.execute(new ListenerRunnable(listener, name, event));
} catch (EsRejectedExecutionException e) {
if (logger.isDebugEnabled()) {
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
long rejected = -1;
if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) {
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
}
int queueCapacity = executor.getQueue().size();
logger.debug("can't execute trigger on the [" + THREAD_POOL_NAME + "] thread pool, rejected tasks [" + rejected + "] queue capacity [" + queueCapacity +"]");
}
}
}
}

View File

@ -11,7 +11,9 @@ import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
@ -23,6 +25,7 @@ import org.quartz.impl.StdSchedulerFactory;
import org.quartz.simpl.SimpleJobFactory;
import java.util.*;
import java.util.concurrent.RejectedExecutionHandler;
/**
*
@ -159,7 +162,19 @@ public class QuartzScheduleTriggerEngine extends ScheduleTriggerEngine {
protected void notifyListeners(String name, JobExecutionContext ctx) {
ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(ctx.getFireTime()), new DateTime(ctx.getScheduledFireTime()));
for (Listener listener : listeners) {
executor.execute(new ListenerRunnable(listener, name, event));
try {
executor.execute(new ListenerRunnable(listener, name, event));
} catch (EsRejectedExecutionException e) {
if (logger.isDebugEnabled()) {
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
long rejected = -1;
if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) {
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
}
int queueCapacity = executor.getQueue().size();
logger.debug("can't execute trigger on the [" + THREAD_POOL_NAME + "] thread pool, rejected tasks [" + rejected + "] queue capacity [" + queueCapacity +"]");
}
}
}
}

View File

@ -11,7 +11,9 @@ import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.support.clock.Clock;
@ -20,10 +22,7 @@ import org.elasticsearch.watcher.trigger.schedule.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
/**
*
@ -109,7 +108,19 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime));
for (Listener listener : listeners) {
executor.execute(new ListenerRunnable(listener, name, event));
try {
executor.execute(new ListenerRunnable(listener, name, event));
} catch (EsRejectedExecutionException e) {
if (logger.isDebugEnabled()) {
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
long rejected = -1;
if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) {
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
}
int queueCapacity = executor.getQueue().size();
logger.debug("can't execute trigger on the [" + THREAD_POOL_NAME + "] thread pool, rejected tasks [" + rejected + "] queue capacity [" + queueCapacity +"]");
}
}
}
}

View File

@ -10,7 +10,9 @@ import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.support.clock.Clock;
@ -20,6 +22,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionHandler;
/**
*
@ -102,7 +105,19 @@ public class SimpleTickerScheduleTriggerEngine extends ScheduleTriggerEngine {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime));
for (Listener listener : listeners) {
executor.execute(new ListenerRunnable(listener, name, event));
try {
executor.execute(new ListenerRunnable(listener, name, event));
} catch (EsRejectedExecutionException e) {
if (logger.isDebugEnabled()) {
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
long rejected = -1;
if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) {
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
}
int queueCapacity = executor.getQueue().size();
logger.debug("can't execute trigger on the [" + THREAD_POOL_NAME + "] thread pool, rejected tasks [" + rejected + "] queue capacity [" + queueCapacity +"]");
}
}
}
}

View File

@ -10,7 +10,9 @@ import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.ThreadPoolSettingsBuilder;
import org.elasticsearch.watcher.support.clock.Clock;
@ -21,6 +23,7 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
/**
*
@ -111,7 +114,19 @@ public class TimerTickerScheduleTriggerEngine extends ScheduleTriggerEngine {
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(triggeredTime), new DateTime(scheduledTime));
for (Listener listener : listeners) {
executor.execute(new ListenerRunnable(listener, name, event));
try {
executor.execute(new ListenerRunnable(listener, name, event));
} catch (EsRejectedExecutionException e) {
if (logger.isDebugEnabled()) {
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
long rejected = -1;
if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) {
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
}
int queueCapacity = executor.getQueue().size();
logger.debug("can't execute trigger on the [" + THREAD_POOL_NAME + "] thread pool, rejected tasks [" + rejected + "] queue capacity [" + queueCapacity +"]");
}
}
}
}