From beb4fada5f0d027b98839ef8e571e50d9d3d5f46 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 21 Nov 2014 18:15:09 +0100 Subject: [PATCH] Core: Changed the loading logic in AlertManager to happen all in a single forked thread only. Also retry attempts stay on the the same thread and is only done if there is a new cluster state version. Test: Added first version of test that fails the elected master multiple times. Original commit: elastic/x-pack-elasticsearch@2f7b840f5a1dacaaba16bda211894e9bca5afc20 --- .../elasticsearch/alerts/AlertManager.java | 233 +++++++++--------- .../alerts/AbstractAlertingTests.java | 20 +- .../alerts/NoMasterNodeTests.java | 81 ++++-- 3 files changed, 185 insertions(+), 149 deletions(-) diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java index a0cccadab33..ed21fde2ce5 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java @@ -49,7 +49,6 @@ public class AlertManager extends AbstractComponent { private final ClusterService clusterService; private final KeyedLock alertLock = new KeyedLock<>(); private final AtomicReference state = new AtomicReference<>(State.STOPPED); - private final AlertsClusterStateListener alertsClusterStateListener = new AlertsClusterStateListener(); private volatile boolean manuallyStopped; @@ -67,7 +66,7 @@ public class AlertManager extends AbstractComponent { this.actionManager.setAlertManager(this); this.actionRegistry = actionRegistry; this.clusterService = clusterService; - clusterService.add(alertsClusterStateListener); + clusterService.add(new AlertsClusterStateListener()); manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true); // Close if the indices service is being stopped, so we don't run into search failures (locally) that will // happen because we're shutting down and an alert is scheduled. @@ -126,8 +125,8 @@ public class AlertManager extends AbstractComponent { try { actionManager.addAlertAction(alert, scheduledFireTime, fireTime); - } catch (IOException ioe) { - logger.error("Failed to add alert action for [{}]", ioe, alert); + } catch (Exception e) { + logger.error("Failed to add alert action for [{}]", e, alert); } } finally { alertLock.release(alertName); @@ -168,69 +167,8 @@ public class AlertManager extends AbstractComponent { } } - private boolean isActionThrottled(Alert alert) { - if (alert.getThrottlePeriod() != null && alert.getTimeLastActionExecuted() != null) { - TimeValue timeSinceLastExeuted = new TimeValue((new DateTime()).getMillis() - alert.getTimeLastActionExecuted().getMillis()); - if (timeSinceLastExeuted.getMillis() <= alert.getThrottlePeriod().getMillis()) { - return true; - } - } - if (alert.getAckState() == AlertAckState.ACKED) { - return true; - } - return false; - } - - public void start() { - if (state.compareAndSet(State.STOPPED, State.LOADING)) { - manuallyStopped = false; - logger.info("Starting alert manager..."); - ClusterState state = clusterService.state(); - alertsClusterStateListener.initialize(state); - } - } - - public void stop() { - manuallyStopped = true; - internalStop(); - } - - // This is synchronized, because this may first be called from the cluster changed event and then from before close - // when a node closes. The stop also stops the scheduler which has several background threads. If this method is - // invoked in that order that node closes and the test framework complains then about the fact that there are still - // threads alive. - private synchronized void internalStop() { - if (state.compareAndSet(State.LOADING, State.STOPPED) || state.compareAndSet(State.STARTED, State.STOPPED)) { - logger.info("Stopping alert manager..."); - actionManager.stop(); - scheduler.stop(); - alertsStore.stop(); - logger.info("Alert manager has stopped"); - } - } - - /** - * For testing only to clear the alerts between tests. - */ - public void clear() { - scheduler.clearAlerts(); - alertsStore.clear(); - } - - private void ensureStarted() { - if (state.get() != State.STARTED) { - throw new ElasticsearchIllegalStateException("not started"); - } - } - - public long getNumberOfAlerts() { - return alertsStore.getAlerts().size(); - } - /** * Acks the alert if needed - * @param alertName - * @return */ public AlertAckState ackAlert(String alertName) { ensureStarted(); @@ -256,10 +194,110 @@ public class AlertManager extends AbstractComponent { } } + /** + * Manually starts alerting if not already started + */ + public void start() { + manuallyStopped = false; + logger.info("Starting alert manager..."); + ClusterState state = clusterService.state(); + internalStart(state); + } + + /** + * Manually stops alerting if not already stopped. + */ + public void stop() { + manuallyStopped = true; + internalStop(); + } + + /** + * For testing only to clear the alerts between tests. + */ + public void clear() { + scheduler.clearAlerts(); + alertsStore.clear(); + } + + // This is synchronized, because this may first be called from the cluster changed event and then from before close + // when a node closes. The stop also stops the scheduler which has several background threads. If this method is + // invoked in that order that node closes and the test framework complains then about the fact that there are still + // threads alive. + private synchronized void internalStop() { + if (state.compareAndSet(State.LOADING, State.STOPPED) || state.compareAndSet(State.STARTED, State.STOPPED)) { + logger.info("Stopping alert manager..."); + actionManager.stop(); + scheduler.stop(); + alertsStore.stop(); + logger.info("Alert manager has stopped"); + } + } + + private synchronized void internalStart(ClusterState initialState) { + if (state.compareAndSet(State.STOPPED, State.LOADING)) { + ClusterState clusterState = initialState; + while (state.get() == State.LOADING && clusterState != null) { + if (actionManager.start(clusterState)) { + break; + } + clusterState = newClusterState(clusterState); + } + + while (state.get() == State.LOADING && clusterState != null) { + if (alertsStore.start(clusterState)) { + break; + } + clusterState = newClusterState(clusterState); + } + + if (state.compareAndSet(State.LOADING, State.STARTED)) { + scheduler.start(alertsStore.getAlerts()); + logger.info("Alert manager has started"); + } else { + logger.info("Didn't start alert manager, because it state was [{}] while [{}] was expected", state.get(), State.LOADING); + } + } + } + + private void ensureStarted() { + if (state.get() != State.STARTED) { + throw new ElasticsearchIllegalStateException("not started"); + } + } + + public long getNumberOfAlerts() { + return alertsStore.getAlerts().size(); + } + + /** + * Return once a cluster state version appears that is never than the version + */ + private ClusterState newClusterState(ClusterState previous) { + ClusterState current; + while (state.get() == State.LOADING) { + current = clusterService.state(); + if (current.getVersion() > previous.getVersion()) { + return current; + } + } + return null; + } + + private boolean isActionThrottled(Alert alert) { + if (alert.getThrottlePeriod() != null && alert.getTimeLastActionExecuted() != null) { + TimeValue timeSinceLastExeuted = new TimeValue((new DateTime()).getMillis() - alert.getTimeLastActionExecuted().getMillis()); + if (timeSinceLastExeuted.getMillis() <= alert.getThrottlePeriod().getMillis()) { + return true; + } + } + return alert.getAckState() == AlertAckState.ACKED; + } + private final class AlertsClusterStateListener implements ClusterStateListener { @Override - public void clusterChanged(ClusterChangedEvent event) { + public void clusterChanged(final ClusterChangedEvent event) { if (!event.localNodeMaster()) { // We're no longer the master so we need to stop alerting. // Stopping alerting may take a while since it will wait on the scheduler to complete shutdown, @@ -277,66 +315,17 @@ public class AlertManager extends AbstractComponent { // a .alertshistory index, but they may not have been restored from the cluster state on disk return; } - if (state.compareAndSet(State.STOPPED, State.LOADING)) { - initialize(event.state()); + if (state.get() == State.STOPPED && !manuallyStopped) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + internalStart(event.state()); + } + }); } } } - private void initialize(final ClusterState state) { - if (manuallyStopped) { - return; - } - - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - if (alertsStore.start(state)) { - startIfReady(); - } else { - retry(); - } - } - }); - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - if (actionManager.start(state)) { - startIfReady(); - } else { - retry(); - } - } - }); - } - - private void startIfReady() { - if (alertsStore.started() && actionManager.started()) { - if (state.compareAndSet(State.LOADING, State.STARTED)) { - scheduler.start(alertsStore.getAlerts()); - logger.info("Alert manager has started"); - } else { - logger.info("Didn't start alert manager, because it state was [{}] while [{}] was expected", state.get(), State.LOADING); - } - } - } - - private void retry() { - // Only retry if our state is loading - if (state.get() == State.LOADING) { - final ClusterState newState = clusterService.state(); - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - // Retry with the same event: - initialize(newState); - } - }); - } else { - logger.info("Didn't retry to initialize the alert manager, because it state was [{}] while [{}] was expected", state.get(), State.LOADING); - } - } - } private enum State { diff --git a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java index 8e5a6e1506f..2b5573acd88 100644 --- a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java +++ b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java @@ -107,7 +107,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest return internalTestCluster().getInstance(AlertsClient.class); } - protected void assertAlertTriggered(final String alertName, final long expectedAlertActionsWithActionPerformed) throws Exception { + protected void assertAlertTriggered(final String alertName, final long minimumExpectedAlertActionsWithActionPerformed) throws Exception { assertBusy(new Runnable() { @Override public void run() { @@ -123,7 +123,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString()))) .get(); - assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(expectedAlertActionsWithActionPerformed)); + assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(minimumExpectedAlertActionsWithActionPerformed)); assertThat((Integer) XContentMapValues.extractValue("response.hits.total", searchResponse.getHits().getAt(0).sourceAsMap()), greaterThanOrEqualTo(1)); } }); @@ -159,6 +159,15 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest }); } + protected void ensureAlertingStopped() throws Exception { + assertBusy(new Runnable() { + @Override + public void run() { + assertThat(alertClient().prepareAlertsStats().get().isAlertManagerStarted(), is(false)); + } + }); + } + protected void startAlerting() throws Exception { alertClient().prepareAlertService().start().get(); ensureAlertingStarted(); @@ -166,12 +175,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest protected void stopAlerting() throws Exception { alertClient().prepareAlertService().stop().get(); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(alertClient().prepareAlertsStats().get().isAlertManagerStarted(), is(false)); - } - }); + ensureAlertingStopped(); } protected static InternalTestCluster internalTestCluster() { diff --git a/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java b/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java index 77fbd3fe32b..6a5bf8b8687 100644 --- a/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java +++ b/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java @@ -7,8 +7,8 @@ package org.elasticsearch.alerts; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.alerts.client.AlertsClient; import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse; +import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.base.Predicate; import org.elasticsearch.common.bytes.BytesReference; @@ -51,48 +51,91 @@ public class NoMasterNodeTests extends AbstractAlertingTests { public void testSimpleFailure() throws Exception { config = new ClusterDiscoveryConfiguration.UnicastZen(2); internalTestCluster().startNodesAsync(2).get(); - AlertsClient alertsClient = alertClient(); createIndex("my-index"); // Have a sample document in the index, the alert is going to evaluate client().prepareIndex("my-index", "my-type").setSource("field", "value").get(); SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value"))); BytesReference alertSource = createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1"); - alertsClient.preparePutAlert("my-first-alert") + alertClient().preparePutAlert("my-first-alert") .setAlertSource(alertSource) .get(); assertAlertTriggered("my-first-alert", 1); // Stop the elected master, no new master will be elected b/c of m_m_n is set to 2 - internalTestCluster().stopCurrentMasterNode(); - assertThat(awaitBusy(new Predicate() { - public boolean apply(Object obj) { - ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); - return state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID); - } - }), equalTo(true)); - - // Need to fetch a new client the old one maybe an internal client of the node we just killed. - alertsClient = alertClient(); + stopElectedMasterNodeAndWait(); try { // any alerting action should fail, because there is no elected master node - alertsClient.prepareDeleteAlert("my-first-alert").setMasterNodeTimeout(TimeValue.timeValueSeconds(1)).get(); + alertClient().prepareDeleteAlert("my-first-alert").setMasterNodeTimeout(TimeValue.timeValueSeconds(1)).get(); fail(); } catch (Exception e) { assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(MasterNotDiscoveredException.class)); } - // Bring back the 2nd node and wait for elected master node to come back and alerting to work as expected. - internalTestCluster().startNode(); - ensureAlertingStarted(); + startElectedMasterNodeAndWait(); // Delete an existing alert - DeleteAlertResponse response = alertsClient.prepareDeleteAlert("my-first-alert").get(); + DeleteAlertResponse response = alertClient().prepareDeleteAlert("my-first-alert").get(); assertThat(response.deleteResponse().isFound(), is(true)); // Add a new alert and wait for it get triggered - alertsClient.preparePutAlert("my-second-alert") + alertClient().preparePutAlert("my-second-alert") .setAlertSource(alertSource) .get(); assertAlertTriggered("my-second-alert", 2); } + @Test + public void testMultipleFailures() throws Exception { + // TODO: increase number of times we kill the elected master. + // It is not good enough yet: after multi a couple of kills errors occur and assertions fail. + int numberOfFailures = 1;//scaledRandomIntBetween(2, 9); + int numberOfAlerts = scaledRandomIntBetween(numberOfFailures, 12); + config = new ClusterDiscoveryConfiguration.UnicastZen(2 + numberOfFailures); + internalTestCluster().startNodesAsync(2).get(); + createIndex("my-index"); + client().prepareIndex("my-index", "my-type").setSource("field", "value").get(); + + for (int i = 1; i <= numberOfAlerts; i++) { + String alertName = "alert" + i; + SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value"))); + BytesReference alertSource = createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1"); + alertClient().preparePutAlert(alertName) + .setAlertSource(alertSource) + .get(); + } + + for (int i = 1; i <= numberOfFailures; i++) { + logger.info("Failure round {}", i); + + for (int j = 1; j < numberOfAlerts; j++) { + String alertName = "alert" + i; + assertAlertTriggered(alertName, i); + } + stopElectedMasterNodeAndWait(); + startElectedMasterNodeAndWait(); + + AlertsStatsResponse statsResponse = alertClient().prepareAlertsStats().get(); + assertThat(statsResponse.getNumberOfRegisteredAlerts(), equalTo((long) numberOfAlerts)); + } + } + + private void stopElectedMasterNodeAndWait() throws Exception { + internalTestCluster().stopCurrentMasterNode(); + // Can't use ensureAlertingStopped, b/c that relies on the alerts stats api which requires an elected master node + assertThat(awaitBusy(new Predicate() { + public boolean apply(Object obj) { + ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); + return state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID); + } + }), equalTo(true)); + // Ensure that the alert manager doesn't run elsewhere + for (AlertManager alertManager : internalTestCluster().getInstances(AlertManager.class)) { + assertThat(alertManager.isStarted(), is(false)); + } + } + + private void startElectedMasterNodeAndWait() throws Exception { + internalTestCluster().startNode(); + ensureAlertingStarted(); + } + }