Added unit test
Renamed AlertBootstrap to AlertsLifeCycleService Original commit: elastic/x-pack-elasticsearch@5dedefe196
This commit is contained in:
parent
bebfbf9664
commit
127aee514c
|
@ -18,16 +18,17 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class AlertBootstrap extends AbstractComponent implements ClusterStateListener {
|
public class AlertsLifeCycleService extends AbstractComponent implements ClusterStateListener {
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
private final AlertsService alertsService;
|
private final AlertsService alertsService;
|
||||||
private final ClusterService clusterService;
|
private final ClusterService clusterService;
|
||||||
|
|
||||||
|
// Maybe this should be a setting in the cluster settings?
|
||||||
private volatile boolean manuallyStopped;
|
private volatile boolean manuallyStopped;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public AlertBootstrap(Settings settings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, AlertsService alertsService) {
|
public AlertsLifeCycleService(Settings settings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, AlertsService alertsService) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.clusterService = clusterService;
|
this.clusterService = clusterService;
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
|
@ -38,18 +39,18 @@ public class AlertBootstrap extends AbstractComponent implements ClusterStateLis
|
||||||
indicesService.addLifecycleListener(new LifecycleListener() {
|
indicesService.addLifecycleListener(new LifecycleListener() {
|
||||||
@Override
|
@Override
|
||||||
public void beforeStop() {
|
public void beforeStop() {
|
||||||
AlertBootstrap.this.alertsService.stop();
|
AlertsLifeCycleService.this.alertsService.stop();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true);
|
manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
public synchronized void start() {
|
||||||
manuallyStopped = false;
|
manuallyStopped = false;
|
||||||
alertsService.start(clusterService.state());
|
alertsService.start(clusterService.state());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() {
|
public synchronized void stop() {
|
||||||
manuallyStopped = true;
|
manuallyStopped = true;
|
||||||
alertsService.stop();
|
alertsService.stop();
|
||||||
}
|
}
|
|
@ -47,7 +47,7 @@ public class AlertsModule extends AbstractModule implements SpawnModules {
|
||||||
|
|
||||||
bind(Alert.Parser.class).asEagerSingleton();
|
bind(Alert.Parser.class).asEagerSingleton();
|
||||||
bind(AlertLockService.class).asEagerSingleton();
|
bind(AlertLockService.class).asEagerSingleton();
|
||||||
bind(AlertBootstrap.class).asEagerSingleton();
|
bind(AlertsLifeCycleService.class).asEagerSingleton();
|
||||||
bind(AlertsService.class).asEagerSingleton();
|
bind(AlertsService.class).asEagerSingleton();
|
||||||
bind(AlertsStore.class).asEagerSingleton();
|
bind(AlertsStore.class).asEagerSingleton();
|
||||||
bind(TemplateUtils.class).asEagerSingleton();
|
bind(TemplateUtils.class).asEagerSingleton();
|
||||||
|
|
|
@ -10,7 +10,7 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
|
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
|
||||||
import org.elasticsearch.alerts.AlertBootstrap;
|
import org.elasticsearch.alerts.AlertsLifeCycleService;
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||||
|
@ -24,12 +24,12 @@ import org.elasticsearch.transport.TransportService;
|
||||||
*/
|
*/
|
||||||
public class TransportAlertsServiceAction extends TransportMasterNodeOperationAction<AlertsServiceRequest, AlertsServiceResponse> {
|
public class TransportAlertsServiceAction extends TransportMasterNodeOperationAction<AlertsServiceRequest, AlertsServiceResponse> {
|
||||||
|
|
||||||
private final AlertBootstrap alertBootstrap;
|
private final AlertsLifeCycleService alertsLifeCycleService;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public TransportAlertsServiceAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, AlertBootstrap alertBootstrap) {
|
public TransportAlertsServiceAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, AlertsLifeCycleService alertsLifeCycleService) {
|
||||||
super(settings, actionName, transportService, clusterService, threadPool, actionFilters);
|
super(settings, actionName, transportService, clusterService, threadPool, actionFilters);
|
||||||
this.alertBootstrap = alertBootstrap;
|
this.alertsLifeCycleService = alertsLifeCycleService;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -51,14 +51,14 @@ public class TransportAlertsServiceAction extends TransportMasterNodeOperationAc
|
||||||
protected void masterOperation(AlertsServiceRequest request, ClusterState state, ActionListener<AlertsServiceResponse> listener) throws ElasticsearchException {
|
protected void masterOperation(AlertsServiceRequest request, ClusterState state, ActionListener<AlertsServiceResponse> listener) throws ElasticsearchException {
|
||||||
switch (request.getCommand()) {
|
switch (request.getCommand()) {
|
||||||
case "start":
|
case "start":
|
||||||
alertBootstrap.start();
|
alertsLifeCycleService.start();
|
||||||
break;
|
break;
|
||||||
case "stop":
|
case "stop":
|
||||||
alertBootstrap.stop();
|
alertsLifeCycleService.stop();
|
||||||
break;
|
break;
|
||||||
case "restart":
|
case "restart":
|
||||||
alertBootstrap.start();
|
alertsLifeCycleService.start();
|
||||||
alertBootstrap.stop();
|
alertsLifeCycleService.stop();
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
listener.onFailure(new ElasticsearchIllegalArgumentException("Command [" + request.getCommand() + "] is undefined"));
|
listener.onFailure(new ElasticsearchIllegalArgumentException("Command [" + request.getCommand() + "] is undefined"));
|
||||||
|
|
|
@ -0,0 +1,126 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.alerts;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||||
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
|
import org.elasticsearch.gateway.GatewayService;
|
||||||
|
import org.elasticsearch.indices.IndicesService;
|
||||||
|
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class AlertsLifeCycleServiceTest extends ElasticsearchTestCase {
|
||||||
|
|
||||||
|
private ThreadPool threadPool;
|
||||||
|
private AlertsService alertsService;
|
||||||
|
private ClusterService clusterService;
|
||||||
|
private IndicesService indicesService;
|
||||||
|
private AlertsLifeCycleService lifeCycleService;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void prepareServices() {
|
||||||
|
threadPool = mock(ThreadPool.class);
|
||||||
|
when(threadPool.executor(anyString())).thenReturn(MoreExecutors.sameThreadExecutor());
|
||||||
|
alertsService = mock(AlertsService.class);
|
||||||
|
clusterService = mock(ClusterService.class);
|
||||||
|
indicesService = mock(IndicesService.class);
|
||||||
|
lifeCycleService = new AlertsLifeCycleService(ImmutableSettings.EMPTY, clusterService, indicesService, threadPool, alertsService);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStartAndStopCausedByClusterState() throws Exception {
|
||||||
|
// starting... local node is master node
|
||||||
|
DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1");
|
||||||
|
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
|
||||||
|
.nodes(nodes).build();
|
||||||
|
when(alertsService.state()).thenReturn(AlertsService.State.STOPPED);
|
||||||
|
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
|
||||||
|
verify(alertsService, times(1)).start(clusterState);
|
||||||
|
verify(alertsService, never()).stop();
|
||||||
|
|
||||||
|
// Trying to start a second time, but that should have no affect.
|
||||||
|
when(alertsService.state()).thenReturn(AlertsService.State.STARTED);
|
||||||
|
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
|
||||||
|
verify(alertsService, times(1)).start(clusterState);
|
||||||
|
verify(alertsService, never()).stop();
|
||||||
|
|
||||||
|
// Stopping because local node is no longer master node
|
||||||
|
nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id2");
|
||||||
|
ClusterState noMasterClusterState = ClusterState.builder(new ClusterName("my-cluster"))
|
||||||
|
.nodes(nodes).build();
|
||||||
|
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", noMasterClusterState, noMasterClusterState));
|
||||||
|
verify(alertsService, times(1)).stop();
|
||||||
|
verify(alertsService, times(1)).start(clusterState);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStartWithStateNotRecoveredBlock() throws Exception {
|
||||||
|
DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1");
|
||||||
|
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
|
||||||
|
.blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK))
|
||||||
|
.nodes(nodes).build();
|
||||||
|
when(alertsService.state()).thenReturn(AlertsService.State.STOPPED);
|
||||||
|
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
|
||||||
|
verify(alertsService, never()).start(any(ClusterState.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testManualStartStop() {
|
||||||
|
lifeCycleService.start();
|
||||||
|
verify(alertsService, times(1)).start(any(ClusterState.class));
|
||||||
|
verify(alertsService, never()).stop();
|
||||||
|
|
||||||
|
lifeCycleService.stop();
|
||||||
|
verify(alertsService, times(1)).start(any(ClusterState.class));
|
||||||
|
verify(alertsService, times(1)).stop();
|
||||||
|
|
||||||
|
// Starting via cluster state update, we shouldn't start because we have been stopped manually.
|
||||||
|
DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1");
|
||||||
|
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
|
||||||
|
.nodes(nodes).build();
|
||||||
|
when(alertsService.state()).thenReturn(AlertsService.State.STOPPED);
|
||||||
|
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
|
||||||
|
verify(alertsService, times(1)).start(any(ClusterState.class));
|
||||||
|
verify(alertsService, times(1)).stop();
|
||||||
|
|
||||||
|
// we can only start, if we start manually
|
||||||
|
lifeCycleService.start();
|
||||||
|
verify(alertsService, times(2)).start(any(ClusterState.class));
|
||||||
|
verify(alertsService, times(1)).stop();
|
||||||
|
|
||||||
|
// stop alerting via cluster state update
|
||||||
|
nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id2");
|
||||||
|
clusterState = ClusterState.builder(new ClusterName("my-cluster"))
|
||||||
|
.nodes(nodes).build();
|
||||||
|
when(alertsService.state()).thenReturn(AlertsService.State.STOPPED);
|
||||||
|
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
|
||||||
|
verify(alertsService, times(2)).start(any(ClusterState.class));
|
||||||
|
verify(alertsService, times(2)).stop();
|
||||||
|
|
||||||
|
// starting alerting via cluster state update, which should work, because we manually started before
|
||||||
|
nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1");
|
||||||
|
clusterState = ClusterState.builder(new ClusterName("my-cluster"))
|
||||||
|
.nodes(nodes).build();
|
||||||
|
when(alertsService.state()).thenReturn(AlertsService.State.STOPPED);
|
||||||
|
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
|
||||||
|
verify(alertsService, times(3)).start(any(ClusterState.class));
|
||||||
|
verify(alertsService, times(2)).stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue