Improving the starting and stop logic of the alert manager.

- Remove synchronized
- Add STOPPING state
- Expose alert state in stats api
- Let the test framework disable alerting before closing the test cluster

Original commit: elastic/x-pack-elasticsearch@5794f5fd8f
This commit is contained in:
Martijn van Groningen 2014-11-24 10:22:23 +01:00
parent 4e543ded65
commit 1a4e118d0d
11 changed files with 117 additions and 60 deletions

View File

@ -105,8 +105,8 @@ public class AlertManager extends AbstractComponent {
}
}
public boolean isStarted() {
return state.get() == State.STARTED;
public State getState() {
return state.get();
}
public void scheduleAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){
@ -220,31 +220,28 @@ public class AlertManager extends AbstractComponent {
alertsStore.clear();
}
// This is synchronized, because this may first be called from the cluster changed event and then from before close
// when a node closes. The stop also stops the scheduler which has several background threads. If this method is
// invoked in that order that node closes and the test framework complains then about the fact that there are still
// threads alive.
private synchronized void internalStop() {
if (state.compareAndSet(State.LOADING, State.STOPPED) || state.compareAndSet(State.STARTED, State.STOPPED)) {
private void internalStop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
logger.info("Stopping alert manager...");
actionManager.stop();
scheduler.stop();
alertsStore.stop();
state.set(State.STOPPED);
logger.info("Alert manager has stopped");
}
}
private synchronized void internalStart(ClusterState initialState) {
if (state.compareAndSet(State.STOPPED, State.LOADING)) {
private void internalStart(ClusterState initialState) {
if (state.compareAndSet(State.STOPPED, State.STARTING)) {
ClusterState clusterState = initialState;
while (state.get() == State.LOADING && clusterState != null) {
while (true) {
if (actionManager.start(clusterState)) {
break;
}
clusterState = newClusterState(clusterState);
}
while (state.get() == State.LOADING && clusterState != null) {
while (true) {
if (alertsStore.start(clusterState)) {
break;
}
@ -252,11 +249,8 @@ public class AlertManager extends AbstractComponent {
}
scheduler.start(alertsStore.getAlerts());
if (state.compareAndSet(State.LOADING, State.STARTED)) {
logger.info("Alert manager has started");
} else {
logger.info("Didn't start alert manager, because it state was [{}] while [{}] was expected", state.get(), State.LOADING);
}
state.set(State.STARTED);
logger.info("Alert manager has started");
}
}
@ -275,13 +269,12 @@ public class AlertManager extends AbstractComponent {
*/
private ClusterState newClusterState(ClusterState previous) {
ClusterState current;
while (state.get() == State.LOADING) {
while (true) {
current = clusterService.state();
if (current.getVersion() > previous.getVersion()) {
return current;
}
}
return null;
}
private boolean isActionThrottled(Alert alert) {
@ -328,12 +321,4 @@ public class AlertManager extends AbstractComponent {
}
private enum State {
STOPPED,
LOADING,
STARTED
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
/**
* Encapsulates the state of the alerts plugin.
*/
public enum State {
/**
* The alerts plugin is not running and not functional.
*/
STOPPED(0),
/**
* The alerts plugin is performing the necessary operations to get into a started state.
*/
STARTING(1),
/**
* The alerts plugin is running and completely functional.
*/
STARTED(2),
/**
* The alerts plugin is shutting down and not functional.
*/
STOPPING(3);
private final byte id;
State(int id) {
this.id = (byte) id;
}
public byte getId() {
return id;
}
public static State fromId(byte id) {
switch (id) {
case 0:
return STOPPED;
case 1:
return STARTING;
case 2:
return STARTED;
case 3:
return STOPPING;
default:
throw new ElasticsearchIllegalArgumentException("Unknown id: " + id);
}
}
}

View File

@ -16,6 +16,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestBuilderListener;
import java.util.Locale;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestStatus.OK;
@ -40,7 +42,7 @@ public class RestAlertsStatsAction extends BaseRestHandler {
@Override
public RestResponse buildResponse(AlertsStatsResponse alertsStatsResponse, XContentBuilder builder) throws Exception {
builder.startObject()
.field("alert_manager_started", alertsStatsResponse.isAlertManagerStarted())
.field("alert_manager_state", alertsStatsResponse.getAlertManagerStarted().toString().toLowerCase(Locale.ENGLISH))
.field("alert_action_manager_started", alertsStatsResponse.isAlertActionManagerStarted())
.field("alert_action_queue_size", alertsStatsResponse.getAlertActionManagerQueueSize())
.field("number_of_alerts", alertsStatsResponse.getNumberOfRegisteredAlerts())

View File

@ -56,13 +56,12 @@ public class AlertScheduler extends AbstractComponent {
properties.setProperty(StdSchedulerFactory.PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN, "true");
properties.setProperty(StdSchedulerFactory.PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN_WITH_WAIT, "true");
SchedulerFactory schFactory = new StdSchedulerFactory(properties);
Scheduler scheduler = schFactory.getScheduler();
scheduler = schFactory.getScheduler();
scheduler.setJobFactory(new SimpleJobFactory());
scheduler.start();
this.scheduler = scheduler;
for (Map.Entry<String, Alert> entry : alerts.entrySet()) {
add(entry.getKey(), entry.getValue());
}
scheduler.start();
} catch (SchedulerException se){
logger.error("Failed to start quartz scheduler", se);
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.alerts.transport.actions.stats;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.alerts.State;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -17,7 +18,7 @@ import java.io.IOException;
public class AlertsStatsResponse extends ActionResponse {
private long numberOfRegisteredAlerts;
private boolean alertManagerStarted;
private State alertManagerState;
private boolean alertActionManagerStarted;
private long alertActionManagerQueueSize;
@ -60,19 +61,14 @@ public class AlertsStatsResponse extends ActionResponse {
}
/**
* Returns true if the alert manager is successfully started
* @return
* Returns the state of the alert manager.
*/
public boolean isAlertManagerStarted() {
return alertManagerStarted;
public State getAlertManagerStarted() {
return alertManagerState;
}
/**
* Sets if the alert manager is started
* @param alertManagerStarted
*/
public void setAlertManagerStarted(boolean alertManagerStarted) {
this.alertManagerStarted = alertManagerStarted;
void setAlertManagerState(State alertManagerState) {
this.alertManagerState = alertManagerState;
}
/**
@ -114,7 +110,7 @@ public class AlertsStatsResponse extends ActionResponse {
numberOfRegisteredAlerts = in.readLong();
alertActionManagerQueueSize = in.readLong();
alertActionManagerLargestQueueSize = in.readLong();
alertManagerStarted = in.readBoolean();
alertManagerState = State.fromId(in.readByte());
alertActionManagerStarted = in.readBoolean();
}
@ -124,7 +120,7 @@ public class AlertsStatsResponse extends ActionResponse {
out.writeLong(numberOfRegisteredAlerts);
out.writeLong(alertActionManagerQueueSize);
out.writeLong(alertActionManagerLargestQueueSize);
out.writeBoolean(alertManagerStarted);
out.writeByte(alertManagerState.getId());
out.writeBoolean(alertActionManagerStarted);
}
}

View File

@ -54,7 +54,7 @@ public class TransportAlertStatsAction extends TransportMasterNodeOperationActio
@Override
protected void masterOperation(AlertsStatsRequest request, ClusterState state, ActionListener<AlertsStatsResponse> listener) throws ElasticsearchException {
AlertsStatsResponse statsResponse = new AlertsStatsResponse();
statsResponse.setAlertManagerStarted(alertManager.isStarted());
statsResponse.setAlertManagerState(alertManager.getState());
statsResponse.setAlertActionManagerStarted(alertActionManager.started());
statsResponse.setAlertActionManagerQueueSize(alertActionManager.getQueueSize());
statsResponse.setNumberOfRegisteredAlerts(alertManager.getNumberOfAlerts());

View File

@ -31,10 +31,7 @@ import org.junit.After;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.*;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
@ -155,7 +152,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(alertClient().prepareAlertsStats().get().isAlertManagerStarted(), is(true));
assertThat(alertClient().prepareAlertsStats().get().getAlertManagerStarted(), is(State.STARTED));
}
});
}
@ -164,7 +161,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(alertClient().prepareAlertsStats().get().isAlertManagerStarted(), is(false));
assertThat(alertClient().prepareAlertsStats().get().getAlertManagerStarted(), is(State.STOPPED));
}
});
}
@ -183,7 +180,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
return (InternalTestCluster) ((AlertingWrappingCluster) cluster()).testCluster;
}
private static class AlertingWrappingCluster extends TestCluster {
private final class AlertingWrappingCluster extends TestCluster {
private final TestCluster testCluster;
@ -254,6 +251,24 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
@Override
public void close() throws IOException {
InternalTestCluster _testCluster = (InternalTestCluster) testCluster;
Set<String> nodes = new HashSet<>(Arrays.asList(_testCluster.getNodeNames()));
String masterNode = _testCluster.getMasterName();
nodes.remove(masterNode);
// First manually stop alerting on non elected master node, this will prevent that alerting becomes active
// on these nodes
for (String node : nodes) {
_testCluster.getInstance(AlertManager.class, node).stop();
}
// Then stop alerting on elected master node and wait until alerting has stopped on it.
AlertManager alertManager = _testCluster.getInstance(AlertManager.class, masterNode);
alertManager.stop();
while (alertManager.getState() != State.STOPPED) {}
// Now when can close nodes, without alerting trying to become active while nodes briefly become master
// during cluster shutdown.
testCluster.close();
}

View File

@ -48,7 +48,7 @@ public class BootStrapTest extends AbstractAlertingTests {
AlertsStatsResponse response = alertClient().prepareAlertsStats().get();
assertTrue(response.isAlertActionManagerStarted());
assertTrue(response.isAlertManagerStarted());
assertThat(response.getAlertManagerStarted(), equalTo(State.STARTED));
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(1L));
}
@ -59,7 +59,7 @@ public class BootStrapTest extends AbstractAlertingTests {
AlertsStatsResponse response = alertClient().prepareAlertsStats().get();
assertTrue(response.isAlertActionManagerStarted());
assertTrue(response.isAlertManagerStarted());
assertThat(response.getAlertManagerStarted(), equalTo(State.STARTED));
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(0L));
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
@ -86,7 +86,7 @@ public class BootStrapTest extends AbstractAlertingTests {
response = alertClient().prepareAlertsStats().get();
assertTrue(response.isAlertActionManagerStarted());
assertTrue(response.isAlertManagerStarted());
assertThat(response.getAlertManagerStarted(), equalTo(State.STARTED));
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(0L));
assertThat(response.getAlertActionManagerLargestQueueSize(), equalTo(1L));
}

View File

@ -137,7 +137,7 @@ public class NoMasterNodeTests extends AbstractAlertingTests {
}), equalTo(true));
// Ensure that the alert manager doesn't run elsewhere
for (AlertManager alertManager : internalTestCluster().getInstances(AlertManager.class)) {
assertThat(alertManager.isStarted(), is(false));
assertThat(alertManager.getState(), is(State.STOPPED));
}
}

View File

@ -117,7 +117,7 @@ public class AlertActionsTest extends AbstractAlertingTests {
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(alertManager.isStarted(), is(true));
assertThat(alertManager.getState(), is(State.STARTED));
}
});
final AtomicBoolean alertActionInvoked = new AtomicBoolean(false);

View File

@ -7,6 +7,7 @@ package org.elasticsearch.alerts.actions;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.alerts.AbstractAlertingTests;
import org.elasticsearch.alerts.State;
import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsRequest;
import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse;
@ -31,7 +32,7 @@ public class AlertStatsTests extends AbstractAlertingTests {
AlertsStatsResponse response = alertClient().alertsStats(alertsStatsRequest).actionGet();
assertTrue(response.isAlertActionManagerStarted());
assertTrue(response.isAlertManagerStarted());
assertThat(response.getAlertManagerStarted(), equalTo(State.STARTED));
assertThat(response.getAlertActionManagerQueueSize(), equalTo(0L));
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(0L));
assertThat(response.getAlertActionManagerLargestQueueSize(), equalTo(0L));
@ -45,7 +46,7 @@ public class AlertStatsTests extends AbstractAlertingTests {
AlertsStatsResponse response = alertsClient.alertsStats(alertsStatsRequest).actionGet();
assertTrue(response.isAlertActionManagerStarted());
assertTrue(response.isAlertManagerStarted());
assertThat(response.getAlertManagerStarted(), equalTo(State.STARTED));
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
BytesReference alertSource = createAlertSource("* * * * * ? *", searchRequest, "hits.total == 1");
@ -60,7 +61,7 @@ public class AlertStatsTests extends AbstractAlertingTests {
Thread.sleep(waitTime.getMillis());
assertTrue(response.isAlertActionManagerStarted());
assertTrue(response.isAlertManagerStarted());
assertThat(response.getAlertManagerStarted(), equalTo(State.STARTED));
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(1L));
//assertThat(response.getAlertActionManagerLargestQueueSize(), greaterThan(0L));
}