From 15d9101ea90824d3100a03741782a0f7dfe7baab Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 20 Nov 2014 12:39:09 +0100 Subject: [PATCH] First stop the alert action manager and enforce started property inside alert action manager. Original commit: elastic/x-pack-elasticsearch@75a98a93f55d7b8a916a5d10e529f1eebfeed77a --- .../elasticsearch/alerts/AlertManager.java | 2 +- .../alerts/actions/AlertActionManager.java | 31 +++++++++++++------ 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java index b2c4914feca..85e45858008 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java @@ -158,8 +158,8 @@ public class AlertManager extends AbstractComponent { public synchronized void stop() { if (state.compareAndSet(State.LOADING, State.STOPPED) || state.compareAndSet(State.STARTED, State.STOPPED)) { logger.info("Stopping alert manager..."); - scheduler.stop(); actionManager.stop(); + scheduler.stop(); alertsStore.stop(); logger.info("Alert manager has stopped"); } diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java index 986ea88f9a8..b17e759d197 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java @@ -7,6 +7,7 @@ package org.elasticsearch.alerts.actions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; @@ -123,9 +124,11 @@ public class AlertActionManager extends AbstractComponent { } public void stop() { - actionsToBeProcessed.clear(); - actionsToBeProcessed.add(END_ENTRY); - logger.info("Stopped job queue"); + if (started.compareAndSet(true, false)) { + actionsToBeProcessed.clear(); + actionsToBeProcessed.add(END_ENTRY); + logger.info("Stopped job queue"); + } } public boolean started() { @@ -236,6 +239,7 @@ public class AlertActionManager extends AbstractComponent { } public void addAlertAction(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException { + ensureStarted(); AlertActionEntry entry = new AlertActionEntry(alert, scheduledFireTime, fireTime, AlertActionState.SEARCH_NEEDED); IndexResponse response = client.prepareIndex(ALERT_HISTORY_INDEX, ALERT_HISTORY_TYPE, entry.getId()) .setSource(XContentFactory.jsonBuilder().value(entry)) @@ -257,7 +261,16 @@ public class AlertActionManager extends AbstractComponent { } } + public long getQueueSize() { + return actionsToBeProcessed.size(); + } + + public long getLargestQueueSize() { + return largestQueueSize.get(); + } + private void updateHistoryEntry(AlertActionEntry entry, AlertActionState actionPerformed) throws IOException { + ensureStarted(); entry.setState(actionPerformed); IndexResponse response = client.prepareIndex(ALERT_HISTORY_INDEX, ALERT_HISTORY_TYPE, entry.getId()) .setSource(XContentFactory.jsonBuilder().value(entry)) @@ -265,12 +278,10 @@ public class AlertActionManager extends AbstractComponent { entry.setVersion(response.getVersion()); } - public long getQueueSize() { - return actionsToBeProcessed.size(); - } - - public long getLargestQueueSize() { - return largestQueueSize.get(); + private void ensureStarted() { + if (!started.get()) { + throw new ElasticsearchIllegalStateException("not started"); + } } private class AlertHistoryRunnable implements Runnable { @@ -321,7 +332,7 @@ public class AlertActionManager extends AbstractComponent { while (started()) { AlertActionEntry entry = actionsToBeProcessed.take(); if (!started() || entry == END_ENTRY) { - logger.debug("Stopping thread to read from the job queue"); + logger.info("Stopping thread to read from the job queue"); return; } threadPool.executor(AlertsPlugin.ALERT_THREAD_POOL_NAME).execute(new AlertHistoryRunnable(entry));