mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
Internal: Only clear the alert execution threadpool's queue when alerts plugin stops.
Before we shutdown the alert execution threadpool, which caused us to use a hacky workaround to get the thread pool started again when alerts is going to run again. Clearing the threadpool's queue is sufficient for stopping fired alerts from being ran. Only fired alerts already being executed by TP will won't be stopped. Also removed the volatile previousFiredAlerts field, because execution the fired alert doesn't need the AlertService anymore the purpose of this field doesn't exist any more. Original commit: elastic/x-pack-elasticsearch@6a622b5579
This commit is contained in:
parent
469acfa551
commit
4363acb09b
@ -18,17 +18,16 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
@ -46,9 +45,6 @@ public class HistoryService extends AbstractComponent {
|
||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||
private final AtomicInteger initializationRetries = new AtomicInteger();
|
||||
|
||||
// Holds fired alerts that were fired before on a different elected master node, but never had the chance to run.
|
||||
private volatile ImmutableList<FiredAlert> previousFiredAlerts = ImmutableList.of();
|
||||
|
||||
@Inject
|
||||
public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool,
|
||||
AlertsStore alertsStore, AlertLockService alertLockService, Scheduler scheduler,
|
||||
@ -74,36 +70,11 @@ public class HistoryService extends AbstractComponent {
|
||||
retry(callback);
|
||||
return;
|
||||
}
|
||||
this.previousFiredAlerts = ImmutableList.copyOf(loadResult);
|
||||
if (!previousFiredAlerts.isEmpty()) {
|
||||
logger.debug("loaded [{}] actions from the alert history index into actions queue", previousFiredAlerts.size());
|
||||
}
|
||||
logger.debug("starting history service");
|
||||
if (started.compareAndSet(false, true)) {
|
||||
if (alertsThreadPool().isShutdown()) {
|
||||
logger.info("Restarting thread pool that had been shutdown");
|
||||
// this update thread pool settings work around is for restarting the alerts thread pool,
|
||||
// that creates a new alerts thread pool and cleans up the existing one that has previously been shutdown.
|
||||
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
|
||||
/***
|
||||
*TODO Horrible horrible hack to make sure that settings are always different from the previous settings
|
||||
*
|
||||
* THIS NEEDS TO CHANGE ASAP
|
||||
*/
|
||||
int queueSize = alertsThreadPool().getQueue().remainingCapacity();
|
||||
if (queueSize % 2 == 0){
|
||||
queueSize = queueSize + 1;
|
||||
} else {
|
||||
queueSize = queueSize - 1;
|
||||
}
|
||||
//TODO END HORRIBLE HACK
|
||||
|
||||
threadPool.updateSettings(AlertsPlugin.alertThreadPoolSettings(availableProcessors, queueSize));
|
||||
assert !alertsThreadPool().isShutdown();
|
||||
}
|
||||
logger.debug("starting history service");
|
||||
executePreviouslyFiredAlerts(loadResult);
|
||||
logger.debug("started history service");
|
||||
}
|
||||
executePreviouslyFiredAlerts();
|
||||
callback.onSuccess(state);
|
||||
}
|
||||
|
||||
@ -112,7 +83,8 @@ public class HistoryService extends AbstractComponent {
|
||||
logger.debug("stopping history service");
|
||||
// We could also rely on the shutdown in #updateSettings call, but
|
||||
// this is a forceful shutdown that also interrupts the worker threads in the threadpool
|
||||
List<Runnable> cancelledTasks = alertsThreadPool().shutdownNow();
|
||||
List<Runnable> cancelledTasks = new ArrayList<>();
|
||||
alertsThreadPool().getQueue().drainTo(cancelledTasks);
|
||||
logger.debug("cancelled [{}] queued tasks", cancelledTasks.size());
|
||||
logger.debug("stopped history service");
|
||||
}
|
||||
@ -144,9 +116,6 @@ public class HistoryService extends AbstractComponent {
|
||||
|
||||
void execute(FiredAlert firedAlert, Alert alert) {
|
||||
try {
|
||||
if (alertsThreadPool().isShutdown()) {
|
||||
throw new AlertsException("attempting to add to a shutdown thread pool");
|
||||
}
|
||||
alertsThreadPool().execute(new AlertExecutionTask(firedAlert, alert));
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
logger.debug("[{}] failed to execute fired alert", firedAlert.name());
|
||||
@ -155,18 +124,19 @@ public class HistoryService extends AbstractComponent {
|
||||
}
|
||||
}
|
||||
|
||||
void executePreviouslyFiredAlerts() {
|
||||
ImmutableList<FiredAlert> firedAlerts = this.previousFiredAlerts;
|
||||
if (firedAlerts != null) {
|
||||
this.previousFiredAlerts = ImmutableList.of();
|
||||
for (FiredAlert firedAlert : firedAlerts) {
|
||||
void executePreviouslyFiredAlerts(HistoryStore.LoadResult loadResult) {
|
||||
if (loadResult != null) {
|
||||
int counter = 0;
|
||||
for (FiredAlert firedAlert : loadResult) {
|
||||
Alert alert = alertsStore.getAlert(firedAlert.name());
|
||||
if (alert == null) {
|
||||
logger.warn("unable to find alert [{}] in alert store, perhaps it has been deleted. skipping...", firedAlert.name());
|
||||
continue;
|
||||
}
|
||||
execute(firedAlert, alert);
|
||||
counter++;
|
||||
}
|
||||
logger.debug("executed [{}] not executed previous fired alerts from the alert history index ", counter);
|
||||
}
|
||||
}
|
||||
|
||||
@ -213,7 +183,8 @@ public class HistoryService extends AbstractComponent {
|
||||
@Override
|
||||
public void run() {
|
||||
if (!started.get()) {
|
||||
throw new ElasticsearchIllegalStateException("not started");
|
||||
logger.debug("can't run alert execution, because history service is not started, ignoring it...");
|
||||
return;
|
||||
}
|
||||
AlertLockService.Lock lock = alertLockService.acquire(alert.name());
|
||||
try {
|
||||
|
@ -107,6 +107,9 @@ public class BootStrapTest extends AbstractAlertingTests {
|
||||
.get();
|
||||
assertTrue(indexResponse.isCreated());
|
||||
|
||||
// The executor doesn't get shutdown when alerts stops, so the largest queue size don't get reset.
|
||||
long previousLargestQueueSize = response.getAlertActionManagerLargestQueueSize();
|
||||
|
||||
stopAlerting();
|
||||
startAlerting();
|
||||
|
||||
@ -114,7 +117,7 @@ public class BootStrapTest extends AbstractAlertingTests {
|
||||
assertTrue(response.isAlertActionManagerStarted());
|
||||
assertThat(response.getAlertManagerStarted(), equalTo(AlertsService.State.STARTED));
|
||||
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(1L));
|
||||
assertThat(response.getAlertActionManagerLargestQueueSize(), equalTo(1L));
|
||||
assertThat(response.getAlertActionManagerLargestQueueSize() - previousLargestQueueSize, equalTo(1L));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -123,6 +123,7 @@ public class NoMasterNodeTests extends AbstractAlertingTests {
|
||||
String alertName = "alert" + i;
|
||||
assertAlertWithMinimumPerformedActionsCount(alertName, i);
|
||||
}
|
||||
ensureGreen();
|
||||
stopElectedMasterNodeAndWait();
|
||||
startElectedMasterNodeAndWait();
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user