diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java index 44617ddd3ea..faa617356bf 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java @@ -30,19 +30,24 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.index.Index; +import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.search.SearchHit; import java.io.IOException; import java.util.*; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -70,7 +75,7 @@ public class AlertManager extends AbstractLifecycleComponent { private final Client client; private AlertScheduler scheduler; - private final Map alertMap; + private final ConcurrentMap alertMap; private AtomicBoolean started = new AtomicBoolean(false); private final Thread starter; @@ -106,10 +111,8 @@ public class AlertManager extends AbstractLifecycleComponent { } private void sendAlertsToScheduler() { - synchronized (alertMap) { - for (Map.Entry entry : alertMap.entrySet()) { - scheduler.addAlert(entry.getKey(), entry.getValue()); - } + for (Map.Entry entry : alertMap.entrySet()) { + scheduler.addAlert(entry.getKey(), entry.getValue()); } } @@ -143,12 +146,13 @@ public class AlertManager extends AbstractLifecycleComponent { @Inject - public AlertManager(Settings settings, Client client) { + public AlertManager(Settings settings, Client client, IndicesLifecycle indicesLifecycle) { super(settings); logger.warn("Initing AlertManager"); this.client = client; - alertMap = new HashMap(); + alertMap = ConcurrentCollections.newConcurrentMap(); starter = new Thread(new StarterThread()); + indicesLifecycle.addListener(new IndicesLifeCycleListener()); //scheduleAlerts(); } @@ -179,24 +183,22 @@ public class AlertManager extends AbstractLifecycleComponent { Alert indexedAlert; try { indexedAlert = getAlertFromIndex(alertName); - synchronized (alertMap) { - Alert inMemoryAlert = alertMap.get(alertName); - if (indexedAlert == null) { - //Alert has been deleted out from underneath us - alertMap.remove(alertName); - return false; - } else if (inMemoryAlert == null) { - logger.warn("Got claim attempt for alert [{}] that alert manager does not have but is in the index.", alertName); - alertMap.put(alertName, indexedAlert); //This is an odd state to get into - } else { - if (!inMemoryAlert.isSameAlert(indexedAlert)) { - alertMap.put(alertName, indexedAlert); //Probably has been changed by another process and we missed the notification - } - } - if (!indexedAlert.enabled()) { - return false; + Alert inMemoryAlert = alertMap.get(alertName); + if (indexedAlert == null) { + //Alert has been deleted out from underneath us + alertMap.remove(alertName); + return false; + } else if (inMemoryAlert == null) { + logger.warn("Got claim attempt for alert [{}] that alert manager does not have but is in the index.", alertName); + alertMap.put(alertName, indexedAlert); //This is an odd state to get into + } else { + if (!inMemoryAlert.isSameAlert(indexedAlert)) { + alertMap.put(alertName, indexedAlert); //Probably has been changed by another process and we missed the notification } } + if (!indexedAlert.enabled()) { + return false; + } if (indexedAlert.running().equals(scheduleRunTime) || indexedAlert.running().isAfter(scheduleRunTime)) { //Someone else is already running this alert or this alert time has passed return false; @@ -228,10 +230,9 @@ public class AlertManager extends AbstractLifecycleComponent { return false; } - synchronized (alertMap) { //Update the alert map - if (alertMap.containsKey(alertName)) { //Check here since it may have been deleted - alertMap.get(alertName).running(scheduleRunTime); - } + Alert alert = alertMap.get(alertName); + if (alert != null) { + alert.running(scheduleRunTime); } return true; } @@ -262,28 +263,25 @@ public class AlertManager extends AbstractLifecycleComponent { } private void loadAlerts() { - synchronized (alertMap) { - if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) { - createAlertsIndex(); - } - - SearchResponse searchResponse = client.prepareSearch().setSource( - "{ \"query\" : " + - "{ \"match_all\" : {}}," + - "\"size\" : \"100\"" + - "}" - ).setTypes(ALERT_TYPE).setIndices(ALERT_INDEX).execute().actionGet(); - for (SearchHit sh : searchResponse.getHits()) { - String alertId = sh.getId(); - try { - Alert alert = parseAlert(alertId, sh); - alertMap.put(alertId, alert); - } catch (ElasticsearchException e) { - logger.error("Unable to parse [{}] as an alert this alert will be skipped.",e,sh); - } - } - logger.warn("Loaded [{}] alerts from the alert index.", alertMap.size()); + if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) { + createAlertsIndex(); } + SearchResponse searchResponse = client.prepareSearch().setSource( + "{ \"query\" : " + + "{ \"match_all\" : {}}," + + "\"size\" : \"100\"" + + "}" + ).setTypes(ALERT_TYPE).setIndices(ALERT_INDEX).execute().actionGet(); + for (SearchHit sh : searchResponse.getHits()) { + String alertId = sh.getId(); + try { + Alert alert = parseAlert(alertId, sh); + alertMap.put(alertId, alert); + } catch (ElasticsearchException e) { + logger.error("Unable to parse [{}] as an alert this alert will be skipped.",e,sh); + } + } + logger.warn("Loaded [{}] alerts from the alert index.", alertMap.size()); } public long getLastEventCount(String alertName){ @@ -292,26 +290,24 @@ public class AlertManager extends AbstractLifecycleComponent { public boolean updateLastRan(String alertName, DateTime fireTime, DateTime scheduledTime, boolean firedAction) throws Exception { try { - synchronized (alertMap) { - Alert alert = getAlertForName(alertName); - alert.lastRan(fireTime); - XContentBuilder alertBuilder = XContentFactory.jsonBuilder().prettyPrint(); - if (firedAction) { - logger.error("Fired action [{}]",firedAction); - alert.lastActionFire(scheduledTime); - } - alert.toXContent(alertBuilder, ToXContent.EMPTY_PARAMS); - logger.error(XContentHelper.convertToJson(alertBuilder.bytes(),false,true)); - UpdateRequest updateRequest = new UpdateRequest(); - updateRequest.id(alertName); - updateRequest.index(ALERT_INDEX); - updateRequest.type(ALERT_TYPE); - - updateRequest.doc(alertBuilder); - updateRequest.refresh(true); - client.update(updateRequest).actionGet(); - return true; + Alert alert = getAlertForName(alertName); + alert.lastRan(fireTime); + XContentBuilder alertBuilder = XContentFactory.jsonBuilder().prettyPrint(); + if (firedAction) { + logger.error("Fired action [{}]",firedAction); + alert.lastActionFire(scheduledTime); } + alert.toXContent(alertBuilder, ToXContent.EMPTY_PARAMS); + logger.error(XContentHelper.convertToJson(alertBuilder.bytes(),false,true)); + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.id(alertName); + updateRequest.index(ALERT_INDEX); + updateRequest.type(ALERT_TYPE); + + updateRequest.doc(alertBuilder); + updateRequest.refresh(true); + client.update(updateRequest).actionGet(); + return true; } catch (Throwable t) { logger.error("Failed to update alert [{}] with lastRan of [{}]",t, alertName, fireTime); return false; @@ -359,79 +355,68 @@ public class AlertManager extends AbstractLifecycleComponent { return true; } - public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException{ - synchronized (alertMap) { - if (alertMap.containsKey(alertName)) { - scheduler.deleteAlertFromSchedule(alertName); - alertMap.remove(alertName); - try { - DeleteRequest deleteRequest = new DeleteRequest(); - deleteRequest.id(alertName); - deleteRequest.index(ALERT_INDEX); - deleteRequest.type(ALERT_TYPE); - deleteRequest.operationThreaded(false); - deleteRequest.refresh(true); - if (client.delete(deleteRequest).actionGet().isFound()) { - return true; - } else { - logger.warn("Couldn't find [{}] in the index triggering a full refresh", alertName); - //Something went wrong refresh - refreshAlerts(); - return false; - } - } - catch (Exception e){ - logger.warn("Something went wrong when deleting [{}] from the index triggering a full refresh", e, alertName); + public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException { + if (alertMap.remove(alertName) != null) { + scheduler.deleteAlertFromSchedule(alertName); + try { + DeleteRequest deleteRequest = new DeleteRequest(); + deleteRequest.id(alertName); + deleteRequest.index(ALERT_INDEX); + deleteRequest.type(ALERT_TYPE); + deleteRequest.operationThreaded(false); + deleteRequest.refresh(true); + if (client.delete(deleteRequest).actionGet().isFound()) { + return true; + } else { + logger.warn("Couldn't find [{}] in the index triggering a full refresh", alertName); //Something went wrong refresh refreshAlerts(); - throw e; + return false; } } + catch (Exception e){ + logger.warn("Something went wrong when deleting [{}] from the index triggering a full refresh", e, alertName); + //Something went wrong refresh + refreshAlerts(); + throw e; + } } return false; } public boolean addAlert(String alertName, Alert alert, boolean persist) { - synchronized (alertMap) { - if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) { - createAlertsIndex(); - } + if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) { + createAlertsIndex(); + } - if (alertMap.containsKey(alertName)) { - throw new ElasticsearchIllegalArgumentException("There is already an alert named ["+alertName+"]"); + if (alertMap.putIfAbsent(alertName, alert) == null) { + scheduler.addAlert(alertName, alert); + if (persist) { + return persistAlert(alertName, alert, IndexRequest.OpType.CREATE); } else { - alertMap.put(alertName, alert); - scheduler.addAlert(alertName,alert); - - if (persist) { - return persistAlert(alertName, alert, IndexRequest.OpType.CREATE); - } else { - return true; - } + return true; } + } else { + throw new ElasticsearchIllegalArgumentException("There is already an alert named ["+alertName+"]"); } } public boolean disableAlert(String alertName) { - synchronized (alertMap) { - Alert alert = alertMap.get(alertName); - if (alert == null) { - throw new ElasticsearchIllegalArgumentException("Could not find an alert named [" + alertName + "]"); - } - alert.enabled(false); - return persistAlert(alertName, alert, IndexRequest.OpType.INDEX); + Alert alert = alertMap.get(alertName); + if (alert == null) { + throw new ElasticsearchIllegalArgumentException("Could not find an alert named [" + alertName + "]"); } + alert.enabled(false); + return persistAlert(alertName, alert, IndexRequest.OpType.INDEX); } public boolean enableAlert(String alertName) { - synchronized (alertMap) { - Alert alert = alertMap.get(alertName); - if (alert == null) { - throw new ElasticsearchIllegalArgumentException("Could not find an alert named [" + alertName + "]"); - } - alert.enabled(true); - return persistAlert(alertName, alert, IndexRequest.OpType.INDEX); + Alert alert = alertMap.get(alertName); + if (alert == null) { + throw new ElasticsearchIllegalArgumentException("Could not find an alert named [" + alertName + "]"); } + alert.enabled(true); + return persistAlert(alertName, alert, IndexRequest.OpType.INDEX); } private boolean persistAlert(String alertName, Alert alert, IndexRequest.OpType opType) { @@ -567,14 +552,20 @@ public class AlertManager extends AbstractLifecycleComponent { } public Map getSafeAlertMap() { - synchronized (alertMap) { - return new HashMap<>(alertMap); - } + return ImmutableMap.copyOf(alertMap); } public Alert getAlertForName(String alertName) { - synchronized (alertMap) { - return alertMap.get(alertName); + return alertMap.get(alertName); + } + + private final class IndicesLifeCycleListener extends IndicesLifecycle.Listener { + + @Override + public void afterIndexClosed(Index index) { + if (index.getName().equals(ALERT_INDEX)) { + alertMap.clear(); + } } } diff --git a/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java b/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java index 7beb9b78dd8..4c8ce0898b5 100644 --- a/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java +++ b/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java @@ -9,9 +9,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.alerts.Alert; -import org.elasticsearch.alerts.actions.AlertActionManager; import org.elasticsearch.alerts.AlertManager; import org.elasticsearch.alerts.AlertResult; +import org.elasticsearch.alerts.actions.AlertActionManager; import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -89,7 +89,8 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste if (run.compareAndSet(true, false)) { try { logger.info("Stopping scheduler"); - scheduler.shutdown(true); + scheduler.clear(); + scheduler.shutdown(false); } catch (SchedulerException se){ logger.error("Failed to stop quartz scheduler",se); } diff --git a/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java b/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java index 3e494b7c441..46c979ae5ee 100644 --- a/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java +++ b/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.alerts; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.alerts.actions.AlertAction; import org.elasticsearch.alerts.actions.AlertActionFactory; import org.elasticsearch.alerts.actions.AlertActionManager; @@ -22,7 +23,8 @@ import org.junit.Test; import java.io.IOException; import java.util.Arrays; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.core.Is.is; @@ -67,7 +69,7 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest { assertThat(alertScheduler.isRunning(), is(true)); AlertManager alertManager = internalCluster().getInstance(AlertManager.class, internalCluster().getMasterName()); - final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean alertActionInvoked = new AtomicBoolean(false); final AlertAction alertAction = new AlertAction() { @Override public String getActionName() { @@ -84,7 +86,7 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest { @Override public boolean doAction(String alertName, AlertResult alert) { logger.info("Alert {} invoked: {}", alertName, alert); - latch.countDown(); + alertActionInvoked.set(true); return true; } }; @@ -102,7 +104,7 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest { alertTrigger, TimeValue.timeValueSeconds(1), Arrays.asList(alertAction), - "* * * * * ? *", + "0/5 * * * * ? *", null, Arrays.asList("my-index"), null, @@ -111,7 +113,14 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest { true ); alertManager.addAlert("my-first-alert", alert, true); - latch.await(); + assertBusy(new Runnable() { + @Override + public void run() { + assertThat(alertActionInvoked.get(), is(true)); + IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertManager.ALERT_HISTORY_INDEX).get(); + assertThat(indicesExistsResponse.isExists(), is(true)); + } + }, 30, TimeUnit.SECONDS); } }