Changes to AlertManager:

* Made use of IndicesLifecycle, to catch when the .alerts index gets deleted, so we can clear the alerts
* Moved to concurrent hashmap in favour over normal hashmap with synchronized blocks

Original commit: elastic/x-pack-elasticsearch@5599d01c78
This commit is contained in:
Martijn van Groningen 2014-10-24 16:51:12 +02:00
parent 16a7991d6d
commit 1a32243781
3 changed files with 132 additions and 131 deletions

View File

@ -30,19 +30,24 @@ import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -70,7 +75,7 @@ public class AlertManager extends AbstractLifecycleComponent {
private final Client client; private final Client client;
private AlertScheduler scheduler; private AlertScheduler scheduler;
private final Map<String,Alert> alertMap; private final ConcurrentMap<String,Alert> alertMap;
private AtomicBoolean started = new AtomicBoolean(false); private AtomicBoolean started = new AtomicBoolean(false);
private final Thread starter; private final Thread starter;
@ -106,12 +111,10 @@ public class AlertManager extends AbstractLifecycleComponent {
} }
private void sendAlertsToScheduler() { private void sendAlertsToScheduler() {
synchronized (alertMap) {
for (Map.Entry<String, Alert> entry : alertMap.entrySet()) { for (Map.Entry<String, Alert> entry : alertMap.entrySet()) {
scheduler.addAlert(entry.getKey(), entry.getValue()); scheduler.addAlert(entry.getKey(), entry.getValue());
} }
} }
}
public void setActionManager(AlertActionManager actionManager){ public void setActionManager(AlertActionManager actionManager){
this.actionManager = actionManager; this.actionManager = actionManager;
@ -143,12 +146,13 @@ public class AlertManager extends AbstractLifecycleComponent {
@Inject @Inject
public AlertManager(Settings settings, Client client) { public AlertManager(Settings settings, Client client, IndicesLifecycle indicesLifecycle) {
super(settings); super(settings);
logger.warn("Initing AlertManager"); logger.warn("Initing AlertManager");
this.client = client; this.client = client;
alertMap = new HashMap(); alertMap = ConcurrentCollections.newConcurrentMap();
starter = new Thread(new StarterThread()); starter = new Thread(new StarterThread());
indicesLifecycle.addListener(new IndicesLifeCycleListener());
//scheduleAlerts(); //scheduleAlerts();
} }
@ -179,7 +183,6 @@ public class AlertManager extends AbstractLifecycleComponent {
Alert indexedAlert; Alert indexedAlert;
try { try {
indexedAlert = getAlertFromIndex(alertName); indexedAlert = getAlertFromIndex(alertName);
synchronized (alertMap) {
Alert inMemoryAlert = alertMap.get(alertName); Alert inMemoryAlert = alertMap.get(alertName);
if (indexedAlert == null) { if (indexedAlert == null) {
//Alert has been deleted out from underneath us //Alert has been deleted out from underneath us
@ -196,7 +199,6 @@ public class AlertManager extends AbstractLifecycleComponent {
if (!indexedAlert.enabled()) { if (!indexedAlert.enabled()) {
return false; return false;
} }
}
if (indexedAlert.running().equals(scheduleRunTime) || indexedAlert.running().isAfter(scheduleRunTime)) { if (indexedAlert.running().equals(scheduleRunTime) || indexedAlert.running().isAfter(scheduleRunTime)) {
//Someone else is already running this alert or this alert time has passed //Someone else is already running this alert or this alert time has passed
return false; return false;
@ -228,10 +230,9 @@ public class AlertManager extends AbstractLifecycleComponent {
return false; return false;
} }
synchronized (alertMap) { //Update the alert map Alert alert = alertMap.get(alertName);
if (alertMap.containsKey(alertName)) { //Check here since it may have been deleted if (alert != null) {
alertMap.get(alertName).running(scheduleRunTime); alert.running(scheduleRunTime);
}
} }
return true; return true;
} }
@ -262,11 +263,9 @@ public class AlertManager extends AbstractLifecycleComponent {
} }
private void loadAlerts() { private void loadAlerts() {
synchronized (alertMap) {
if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) { if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) {
createAlertsIndex(); createAlertsIndex();
} }
SearchResponse searchResponse = client.prepareSearch().setSource( SearchResponse searchResponse = client.prepareSearch().setSource(
"{ \"query\" : " + "{ \"query\" : " +
"{ \"match_all\" : {}}," + "{ \"match_all\" : {}}," +
@ -284,7 +283,6 @@ public class AlertManager extends AbstractLifecycleComponent {
} }
logger.warn("Loaded [{}] alerts from the alert index.", alertMap.size()); logger.warn("Loaded [{}] alerts from the alert index.", alertMap.size());
} }
}
public long getLastEventCount(String alertName){ public long getLastEventCount(String alertName){
return 0; return 0;
@ -292,7 +290,6 @@ public class AlertManager extends AbstractLifecycleComponent {
public boolean updateLastRan(String alertName, DateTime fireTime, DateTime scheduledTime, boolean firedAction) throws Exception { public boolean updateLastRan(String alertName, DateTime fireTime, DateTime scheduledTime, boolean firedAction) throws Exception {
try { try {
synchronized (alertMap) {
Alert alert = getAlertForName(alertName); Alert alert = getAlertForName(alertName);
alert.lastRan(fireTime); alert.lastRan(fireTime);
XContentBuilder alertBuilder = XContentFactory.jsonBuilder().prettyPrint(); XContentBuilder alertBuilder = XContentFactory.jsonBuilder().prettyPrint();
@ -311,7 +308,6 @@ public class AlertManager extends AbstractLifecycleComponent {
updateRequest.refresh(true); updateRequest.refresh(true);
client.update(updateRequest).actionGet(); client.update(updateRequest).actionGet();
return true; return true;
}
} catch (Throwable t) { } catch (Throwable t) {
logger.error("Failed to update alert [{}] with lastRan of [{}]",t, alertName, fireTime); logger.error("Failed to update alert [{}] with lastRan of [{}]",t, alertName, fireTime);
return false; return false;
@ -360,10 +356,8 @@ public class AlertManager extends AbstractLifecycleComponent {
} }
public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException { public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException {
synchronized (alertMap) { if (alertMap.remove(alertName) != null) {
if (alertMap.containsKey(alertName)) {
scheduler.deleteAlertFromSchedule(alertName); scheduler.deleteAlertFromSchedule(alertName);
alertMap.remove(alertName);
try { try {
DeleteRequest deleteRequest = new DeleteRequest(); DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.id(alertName); deleteRequest.id(alertName);
@ -387,33 +381,27 @@ public class AlertManager extends AbstractLifecycleComponent {
throw e; throw e;
} }
} }
}
return false; return false;
} }
public boolean addAlert(String alertName, Alert alert, boolean persist) { public boolean addAlert(String alertName, Alert alert, boolean persist) {
synchronized (alertMap) {
if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) { if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) {
createAlertsIndex(); createAlertsIndex();
} }
if (alertMap.containsKey(alertName)) { if (alertMap.putIfAbsent(alertName, alert) == null) {
throw new ElasticsearchIllegalArgumentException("There is already an alert named ["+alertName+"]");
} else {
alertMap.put(alertName, alert);
scheduler.addAlert(alertName, alert); scheduler.addAlert(alertName, alert);
if (persist) { if (persist) {
return persistAlert(alertName, alert, IndexRequest.OpType.CREATE); return persistAlert(alertName, alert, IndexRequest.OpType.CREATE);
} else { } else {
return true; return true;
} }
} } else {
throw new ElasticsearchIllegalArgumentException("There is already an alert named ["+alertName+"]");
} }
} }
public boolean disableAlert(String alertName) { public boolean disableAlert(String alertName) {
synchronized (alertMap) {
Alert alert = alertMap.get(alertName); Alert alert = alertMap.get(alertName);
if (alert == null) { if (alert == null) {
throw new ElasticsearchIllegalArgumentException("Could not find an alert named [" + alertName + "]"); throw new ElasticsearchIllegalArgumentException("Could not find an alert named [" + alertName + "]");
@ -421,10 +409,8 @@ public class AlertManager extends AbstractLifecycleComponent {
alert.enabled(false); alert.enabled(false);
return persistAlert(alertName, alert, IndexRequest.OpType.INDEX); return persistAlert(alertName, alert, IndexRequest.OpType.INDEX);
} }
}
public boolean enableAlert(String alertName) { public boolean enableAlert(String alertName) {
synchronized (alertMap) {
Alert alert = alertMap.get(alertName); Alert alert = alertMap.get(alertName);
if (alert == null) { if (alert == null) {
throw new ElasticsearchIllegalArgumentException("Could not find an alert named [" + alertName + "]"); throw new ElasticsearchIllegalArgumentException("Could not find an alert named [" + alertName + "]");
@ -432,7 +418,6 @@ public class AlertManager extends AbstractLifecycleComponent {
alert.enabled(true); alert.enabled(true);
return persistAlert(alertName, alert, IndexRequest.OpType.INDEX); return persistAlert(alertName, alert, IndexRequest.OpType.INDEX);
} }
}
private boolean persistAlert(String alertName, Alert alert, IndexRequest.OpType opType) { private boolean persistAlert(String alertName, Alert alert, IndexRequest.OpType opType) {
XContentBuilder builder; XContentBuilder builder;
@ -567,15 +552,21 @@ public class AlertManager extends AbstractLifecycleComponent {
} }
public Map<String,Alert> getSafeAlertMap() { public Map<String,Alert> getSafeAlertMap() {
synchronized (alertMap) { return ImmutableMap.copyOf(alertMap);
return new HashMap<>(alertMap);
}
} }
public Alert getAlertForName(String alertName) { public Alert getAlertForName(String alertName) {
synchronized (alertMap) {
return alertMap.get(alertName); return alertMap.get(alertName);
} }
private final class IndicesLifeCycleListener extends IndicesLifecycle.Listener {
@Override
public void afterIndexClosed(Index index) {
if (index.getName().equals(ALERT_INDEX)) {
alertMap.clear();
}
}
} }
} }

View File

@ -9,9 +9,9 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.AlertManager; import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.AlertResult; import org.elasticsearch.alerts.AlertResult;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
@ -89,7 +89,8 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste
if (run.compareAndSet(true, false)) { if (run.compareAndSet(true, false)) {
try { try {
logger.info("Stopping scheduler"); logger.info("Stopping scheduler");
scheduler.shutdown(true); scheduler.clear();
scheduler.shutdown(false);
} catch (SchedulerException se){ } catch (SchedulerException se){
logger.error("Failed to stop quartz scheduler",se); logger.error("Failed to stop quartz scheduler",se);
} }

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.alerts; package org.elasticsearch.alerts;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.alerts.actions.AlertAction; import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionFactory; import org.elasticsearch.alerts.actions.AlertActionFactory;
import org.elasticsearch.alerts.actions.AlertActionManager; import org.elasticsearch.alerts.actions.AlertActionManager;
@ -22,7 +23,8 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
@ -67,7 +69,7 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
assertThat(alertScheduler.isRunning(), is(true)); assertThat(alertScheduler.isRunning(), is(true));
AlertManager alertManager = internalCluster().getInstance(AlertManager.class, internalCluster().getMasterName()); AlertManager alertManager = internalCluster().getInstance(AlertManager.class, internalCluster().getMasterName());
final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean alertActionInvoked = new AtomicBoolean(false);
final AlertAction alertAction = new AlertAction() { final AlertAction alertAction = new AlertAction() {
@Override @Override
public String getActionName() { public String getActionName() {
@ -84,7 +86,7 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
@Override @Override
public boolean doAction(String alertName, AlertResult alert) { public boolean doAction(String alertName, AlertResult alert) {
logger.info("Alert {} invoked: {}", alertName, alert); logger.info("Alert {} invoked: {}", alertName, alert);
latch.countDown(); alertActionInvoked.set(true);
return true; return true;
} }
}; };
@ -102,7 +104,7 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
alertTrigger, alertTrigger,
TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(1),
Arrays.asList(alertAction), Arrays.asList(alertAction),
"* * * * * ? *", "0/5 * * * * ? *",
null, null,
Arrays.asList("my-index"), Arrays.asList("my-index"),
null, null,
@ -111,7 +113,14 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
true true
); );
alertManager.addAlert("my-first-alert", alert, true); alertManager.addAlert("my-first-alert", alert, true);
latch.await(); assertBusy(new Runnable() {
@Override
public void run() {
assertThat(alertActionInvoked.get(), is(true));
IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertManager.ALERT_HISTORY_INDEX).get();
assertThat(indicesExistsResponse.isExists(), is(true));
}
}, 30, TimeUnit.SECONDS);
} }
} }