diff --git a/src/main/java/org/elasticsearch/alerts/AlertBootstrap.java b/src/main/java/org/elasticsearch/alerts/AlertBootstrap.java new file mode 100644 index 00000000000..151fe0c414e --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/AlertBootstrap.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; + +/** + */ +public class AlertBootstrap extends AbstractComponent implements ClusterStateListener { + + private final ThreadPool threadPool; + private final AlertsService alertsService; + private final ClusterService clusterService; + + private volatile boolean manuallyStopped; + + @Inject + public AlertBootstrap(Settings settings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, AlertsService alertsService) { + super(settings); + this.clusterService = clusterService; + this.threadPool = threadPool; + this.alertsService = alertsService; + clusterService.add(this); + // Close if the indices service is being stopped, so we don't run into search failures (locally) that will + // happen because we're shutting down and an alert is scheduled. + indicesService.addLifecycleListener(new LifecycleListener() { + @Override + public void beforeStop() { + AlertBootstrap.this.alertsService.stop(); + } + }); + manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true); + } + + public void start() { + manuallyStopped = false; + alertsService.start(clusterService.state()); + } + + public void stop() { + manuallyStopped = true; + alertsService.stop(); + } + + @Override + public void clusterChanged(final ClusterChangedEvent event) { + if (!event.localNodeMaster()) { + // We're no longer the master so we need to stop alerting. + // Stopping alerting may take a while since it will wait on the scheduler to complete shutdown, + // so we fork here so that we don't wait too long. Other events may need to be processed and + // other cluster state listeners may need to be executed as well for this event. + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + alertsService.stop(); + } + }); + } else { + if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // wait until the gateway has recovered from disk, otherwise we think may not have .alerts and + // a .alerts_history index, but they may not have been restored from the cluster state on disk + return; + } + if (alertsService.state() == AlertsService.State.STOPPED && !manuallyStopped) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + alertsService.start(event.state()); + } + }); + } + } + } +} diff --git a/src/main/java/org/elasticsearch/alerts/AlertsModule.java b/src/main/java/org/elasticsearch/alerts/AlertsModule.java index 3014274c401..7efed13c42d 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsModule.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsModule.java @@ -47,6 +47,7 @@ public class AlertsModule extends AbstractModule implements SpawnModules { bind(Alert.Parser.class).asEagerSingleton(); bind(AlertLockService.class).asEagerSingleton(); + bind(AlertBootstrap.class).asEagerSingleton(); bind(AlertsService.class).asEagerSingleton(); bind(AlertsStore.class).asEagerSingleton(); bind(TemplateUtils.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/alerts/AlertsService.java b/src/main/java/org/elasticsearch/alerts/AlertsService.java index 11abe5a3eec..4fc6712bb4e 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsService.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsService.java @@ -11,18 +11,11 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.alerts.history.HistoryService; import org.elasticsearch.alerts.scheduler.Scheduler; import org.elasticsearch.alerts.support.Callback; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.concurrent.ExecutionException; @@ -32,76 +25,21 @@ public class AlertsService extends AbstractComponent { private final Scheduler scheduler; private final AlertsStore alertsStore; - private final ThreadPool threadPool; - private final ClusterService clusterService; private final AlertLockService alertLockService; private final HistoryService historyService; private final AtomicReference state = new AtomicReference<>(State.STOPPED); - private volatile boolean manuallyStopped; - @Inject - public AlertsService(Settings settings, ClusterService clusterService, Scheduler scheduler, AlertsStore alertsStore, - IndicesService indicesService, HistoryService historyService, ThreadPool threadPool, + public AlertsService(Settings settings, Scheduler scheduler, AlertsStore alertsStore, HistoryService historyService, AlertLockService alertLockService) { super(settings); this.scheduler = scheduler; - this.threadPool = threadPool; this.alertsStore = alertsStore; - this.clusterService = clusterService; this.alertLockService = alertLockService; this.historyService = historyService; - - clusterService.add(new AlertsClusterStateListener()); - // Close if the indices service is being stopped, so we don't run into search failures (locally) that will - // happen because we're shutting down and an alert is scheduled. - indicesService.addLifecycleListener(new LifecycleListener() { - @Override - public void beforeStop() { - stop(); - } - }); - manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true); } - // TODO: consider making this adding start/stop lock - // The currently mechanism isn't broken, but in tests it is annoying that if stop has been invoked concurrently - // and the first invocation sets the state to STOPPING then the second invocation will just return, because the - // first invocation will do the work to stop alerts plugin. If the second invocation was caused by a test teardown - // then the thread lead detection will fail, because it assumes that everything should have been stopped & closed. - // This isn't the case in the situation, although this will happen, but to late for the thread leak detection to - // not see it as a failure. - - /** - * Manually starts alerting if not already started - */ - public void start() { - manuallyStopped = false; - ClusterState state = clusterService.state(); - internalStart(state); - } - - /** - * Manually stops alerting if not already stopped. - */ - public void stop() { - manuallyStopped = true; - internalStop(); - } - - private void internalStop() { - if (state.compareAndSet(State.STARTED, State.STOPPING)) { - logger.info("stopping alert service..."); - alertLockService.stop(); - historyService.stop(); - scheduler.stop(); - alertsStore.stop(); - state.set(State.STOPPED); - logger.info("alert service has stopped"); - } - } - - private void internalStart(ClusterState clusterState) { + public void start(ClusterState clusterState) { if (state.compareAndSet(State.STOPPED, State.STARTING)) { logger.info("starting alert service..."); alertLockService.start(); @@ -135,6 +73,18 @@ public class AlertsService extends AbstractComponent { } } + public void stop() { + if (state.compareAndSet(State.STARTED, State.STOPPING)) { + logger.info("stopping alert service..."); + alertLockService.stop(); + historyService.stop(); + scheduler.stop(); + alertsStore.stop(); + state.set(State.STOPPED); + logger.info("alert service has stopped"); + } + } + public AlertsStore.AlertDelete deleteAlert(String name) throws InterruptedException, ExecutionException { ensureStarted(); AlertLockService.Lock lock = alertLockService.acquire(name); @@ -209,40 +159,6 @@ public class AlertsService extends AbstractComponent { } } - private final class AlertsClusterStateListener implements ClusterStateListener { - - @Override - public void clusterChanged(final ClusterChangedEvent event) { - if (!event.localNodeMaster()) { - // We're no longer the master so we need to stop alerting. - // Stopping alerting may take a while since it will wait on the scheduler to complete shutdown, - // so we fork here so that we don't wait too long. Other events may need to be processed and - // other cluster state listeners may need to be executed as well for this event. - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - internalStop(); - } - }); - } else { - if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { - // wait until the gateway has recovered from disk, otherwise we think may not have .alerts and - // a .alerts_history index, but they may not have been restored from the cluster state on disk - return; - } - if (state.get() == State.STOPPED && !manuallyStopped) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - internalStart(event.state()); - } - }); - } - } - } - - } - /** * Encapsulates the state of the alerts plugin. */ diff --git a/src/main/java/org/elasticsearch/alerts/transport/actions/service/TransportAlertsServiceAction.java b/src/main/java/org/elasticsearch/alerts/transport/actions/service/TransportAlertsServiceAction.java index 70e78bdcecb..3c8c2f0638c 100644 --- a/src/main/java/org/elasticsearch/alerts/transport/actions/service/TransportAlertsServiceAction.java +++ b/src/main/java/org/elasticsearch/alerts/transport/actions/service/TransportAlertsServiceAction.java @@ -10,7 +10,7 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; -import org.elasticsearch.alerts.AlertsService; +import org.elasticsearch.alerts.AlertBootstrap; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -24,12 +24,12 @@ import org.elasticsearch.transport.TransportService; */ public class TransportAlertsServiceAction extends TransportMasterNodeOperationAction { - private final AlertsService alertsService; + private final AlertBootstrap alertBootstrap; @Inject - public TransportAlertsServiceAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, AlertsService alertsService) { + public TransportAlertsServiceAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, AlertBootstrap alertBootstrap) { super(settings, actionName, transportService, clusterService, threadPool, actionFilters); - this.alertsService = alertsService; + this.alertBootstrap = alertBootstrap; } @Override @@ -51,14 +51,14 @@ public class TransportAlertsServiceAction extends TransportMasterNodeOperationAc protected void masterOperation(AlertsServiceRequest request, ClusterState state, ActionListener listener) throws ElasticsearchException { switch (request.getCommand()) { case "start": - alertsService.start(); + alertBootstrap.start(); break; case "stop": - alertsService.stop(); + alertBootstrap.stop(); break; case "restart": - alertsService.start(); - alertsService.stop(); + alertBootstrap.start(); + alertBootstrap.stop(); break; default: listener.onFailure(new ElasticsearchIllegalArgumentException("Command [" + request.getCommand() + "] is undefined"));