diff --git a/src/main/java/org/elasticsearch/alerts/AlertBootstrap.java b/src/main/java/org/elasticsearch/alerts/AlertsLifeCycleService.java similarity index 87% rename from src/main/java/org/elasticsearch/alerts/AlertBootstrap.java rename to src/main/java/org/elasticsearch/alerts/AlertsLifeCycleService.java index 151fe0c414e..a1721e53fec 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertBootstrap.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsLifeCycleService.java @@ -18,16 +18,17 @@ import org.elasticsearch.threadpool.ThreadPool; /** */ -public class AlertBootstrap extends AbstractComponent implements ClusterStateListener { +public class AlertsLifeCycleService extends AbstractComponent implements ClusterStateListener { private final ThreadPool threadPool; private final AlertsService alertsService; private final ClusterService clusterService; + // Maybe this should be a setting in the cluster settings? private volatile boolean manuallyStopped; @Inject - public AlertBootstrap(Settings settings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, AlertsService alertsService) { + public AlertsLifeCycleService(Settings settings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, AlertsService alertsService) { super(settings); this.clusterService = clusterService; this.threadPool = threadPool; @@ -38,18 +39,18 @@ public class AlertBootstrap extends AbstractComponent implements ClusterStateLis indicesService.addLifecycleListener(new LifecycleListener() { @Override public void beforeStop() { - AlertBootstrap.this.alertsService.stop(); + AlertsLifeCycleService.this.alertsService.stop(); } }); manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true); } - public void start() { + public synchronized void start() { manuallyStopped = false; alertsService.start(clusterService.state()); } - public void stop() { + public synchronized void stop() { manuallyStopped = true; alertsService.stop(); } diff --git a/src/main/java/org/elasticsearch/alerts/AlertsModule.java b/src/main/java/org/elasticsearch/alerts/AlertsModule.java index 7efed13c42d..dc1febc7f52 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsModule.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsModule.java @@ -47,7 +47,7 @@ public class AlertsModule extends AbstractModule implements SpawnModules { bind(Alert.Parser.class).asEagerSingleton(); bind(AlertLockService.class).asEagerSingleton(); - bind(AlertBootstrap.class).asEagerSingleton(); + bind(AlertsLifeCycleService.class).asEagerSingleton(); bind(AlertsService.class).asEagerSingleton(); bind(AlertsStore.class).asEagerSingleton(); bind(TemplateUtils.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/alerts/transport/actions/service/TransportAlertsServiceAction.java b/src/main/java/org/elasticsearch/alerts/transport/actions/service/TransportAlertsServiceAction.java index 3c8c2f0638c..8bdcd5a5de7 100644 --- a/src/main/java/org/elasticsearch/alerts/transport/actions/service/TransportAlertsServiceAction.java +++ b/src/main/java/org/elasticsearch/alerts/transport/actions/service/TransportAlertsServiceAction.java @@ -10,7 +10,7 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; -import org.elasticsearch.alerts.AlertBootstrap; +import org.elasticsearch.alerts.AlertsLifeCycleService; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -24,12 +24,12 @@ import org.elasticsearch.transport.TransportService; */ public class TransportAlertsServiceAction extends TransportMasterNodeOperationAction { - private final AlertBootstrap alertBootstrap; + private final AlertsLifeCycleService alertsLifeCycleService; @Inject - public TransportAlertsServiceAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, AlertBootstrap alertBootstrap) { + public TransportAlertsServiceAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, AlertsLifeCycleService alertsLifeCycleService) { super(settings, actionName, transportService, clusterService, threadPool, actionFilters); - this.alertBootstrap = alertBootstrap; + this.alertsLifeCycleService = alertsLifeCycleService; } @Override @@ -51,14 +51,14 @@ public class TransportAlertsServiceAction extends TransportMasterNodeOperationAc protected void masterOperation(AlertsServiceRequest request, ClusterState state, ActionListener listener) throws ElasticsearchException { switch (request.getCommand()) { case "start": - alertBootstrap.start(); + alertsLifeCycleService.start(); break; case "stop": - alertBootstrap.stop(); + alertsLifeCycleService.stop(); break; case "restart": - alertBootstrap.start(); - alertBootstrap.stop(); + alertsLifeCycleService.start(); + alertsLifeCycleService.stop(); break; default: listener.onFailure(new ElasticsearchIllegalArgumentException("Command [" + request.getCommand() + "] is undefined")); diff --git a/src/test/java/org/elasticsearch/alerts/AlertsLifeCycleServiceTest.java b/src/test/java/org/elasticsearch/alerts/AlertsLifeCycleServiceTest.java new file mode 100644 index 00000000000..ff3f3d4269a --- /dev/null +++ b/src/test/java/org/elasticsearch/alerts/AlertsLifeCycleServiceTest.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts; + +import com.google.common.util.concurrent.MoreExecutors; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.*; + +/** + */ +public class AlertsLifeCycleServiceTest extends ElasticsearchTestCase { + + private ThreadPool threadPool; + private AlertsService alertsService; + private ClusterService clusterService; + private IndicesService indicesService; + private AlertsLifeCycleService lifeCycleService; + + @Before + public void prepareServices() { + threadPool = mock(ThreadPool.class); + when(threadPool.executor(anyString())).thenReturn(MoreExecutors.sameThreadExecutor()); + alertsService = mock(AlertsService.class); + clusterService = mock(ClusterService.class); + indicesService = mock(IndicesService.class); + lifeCycleService = new AlertsLifeCycleService(ImmutableSettings.EMPTY, clusterService, indicesService, threadPool, alertsService); + } + + @Test + public void testStartAndStopCausedByClusterState() throws Exception { + // starting... local node is master node + DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1"); + ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(nodes).build(); + when(alertsService.state()).thenReturn(AlertsService.State.STOPPED); + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); + verify(alertsService, times(1)).start(clusterState); + verify(alertsService, never()).stop(); + + // Trying to start a second time, but that should have no affect. + when(alertsService.state()).thenReturn(AlertsService.State.STARTED); + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); + verify(alertsService, times(1)).start(clusterState); + verify(alertsService, never()).stop(); + + // Stopping because local node is no longer master node + nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id2"); + ClusterState noMasterClusterState = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(nodes).build(); + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", noMasterClusterState, noMasterClusterState)); + verify(alertsService, times(1)).stop(); + verify(alertsService, times(1)).start(clusterState); + } + + @Test + public void testStartWithStateNotRecoveredBlock() throws Exception { + DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1"); + ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) + .blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) + .nodes(nodes).build(); + when(alertsService.state()).thenReturn(AlertsService.State.STOPPED); + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); + verify(alertsService, never()).start(any(ClusterState.class)); + } + + @Test + public void testManualStartStop() { + lifeCycleService.start(); + verify(alertsService, times(1)).start(any(ClusterState.class)); + verify(alertsService, never()).stop(); + + lifeCycleService.stop(); + verify(alertsService, times(1)).start(any(ClusterState.class)); + verify(alertsService, times(1)).stop(); + + // Starting via cluster state update, we shouldn't start because we have been stopped manually. + DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1"); + ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(nodes).build(); + when(alertsService.state()).thenReturn(AlertsService.State.STOPPED); + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); + verify(alertsService, times(1)).start(any(ClusterState.class)); + verify(alertsService, times(1)).stop(); + + // we can only start, if we start manually + lifeCycleService.start(); + verify(alertsService, times(2)).start(any(ClusterState.class)); + verify(alertsService, times(1)).stop(); + + // stop alerting via cluster state update + nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id2"); + clusterState = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(nodes).build(); + when(alertsService.state()).thenReturn(AlertsService.State.STOPPED); + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); + verify(alertsService, times(2)).start(any(ClusterState.class)); + verify(alertsService, times(2)).stop(); + + // starting alerting via cluster state update, which should work, because we manually started before + nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1"); + clusterState = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(nodes).build(); + when(alertsService.state()).thenReturn(AlertsService.State.STOPPED); + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); + verify(alertsService, times(3)).start(any(ClusterState.class)); + verify(alertsService, times(2)).stop(); + } + +}