From c070e932c33020068e245d56c94396bd922392c3 Mon Sep 17 00:00:00 2001 From: Brian Murphy Date: Thu, 13 Nov 2014 16:38:49 +0000 Subject: [PATCH] CORE : add keyed lock. This commit adds a keyed lock to prevent concurrent modification of the alert store and alert index. Original commit: elastic/x-pack-elasticsearch@416351c06da050b654bb906cbec86d43b91d9dae --- .../elasticsearch/alerts/AlertManager.java | 92 ++++++++++++------- .../alerts/actions/AlertActionManager.java | 8 +- 2 files changed, 62 insertions(+), 38 deletions(-) diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java index 510caa7211d..2b03dbb90a5 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.alerts.actions.AlertActionEntry; import org.elasticsearch.alerts.actions.AlertActionManager; import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.scheduler.AlertScheduler; @@ -25,6 +26,7 @@ import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; @@ -33,9 +35,6 @@ import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -// TODO: Add lock synchronization via KeyedLock so that we can lock concurrent operations for the same alert. -// For example 2 concurrent deletes for the same alert, so that at least one fails, but not that 2 deletes are half done. -// The KeyedLock make sure that we only lock on the same alert, but not on different alerts. public class AlertManager extends AbstractComponent { private final AlertScheduler scheduler; @@ -45,6 +44,7 @@ public class AlertManager extends AbstractComponent { private final AlertActionRegistry actionRegistry; private final AtomicBoolean started = new AtomicBoolean(false); private final ThreadPool threadPool; + private final KeyedLock alertLock = new KeyedLock<>(); @Inject public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore, @@ -68,22 +68,33 @@ public class AlertManager extends AbstractComponent { stop(); } }); + } public DeleteResponse deleteAlert(String name) throws InterruptedException, ExecutionException { ensureStarted(); - DeleteResponse deleteResponse = alertsStore.deleteAlert(name); - if (deleteResponse.isFound()) { - scheduler.remove(name); + alertLock.acquire(name); + try { + DeleteResponse deleteResponse = alertsStore.deleteAlert(name); + if (deleteResponse.isFound()) { + scheduler.remove(name); + } + return deleteResponse; + } finally { + alertLock.release(name); } - return deleteResponse; } public IndexResponse addAlert(String alertName, BytesReference alertSource) { ensureStarted(); - Tuple result = alertsStore.addAlert(alertName, alertSource); - scheduler.add(alertName, result.v1()); - return result.v2(); + alertLock.acquire(alertName); + try { + Tuple result = alertsStore.addAlert(alertName, alertSource); + scheduler.add(alertName, result.v1()); + return result.v2(); + } finally { + alertLock.release(alertName); + } } public boolean isStarted() { @@ -92,36 +103,49 @@ public class AlertManager extends AbstractComponent { public void scheduleAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){ ensureStarted(); - Alert alert = alertsStore.getAlert(alertName); - if (alert == null) { - logger.warn("Unable to find [{}] in the alert store, perhaps it has been deleted", alertName); - return; - } - if (!alert.enabled()) { - logger.debug("Alert [{}] is not enabled", alert.alertName()); - return; - } - + alertLock.acquire(alertName); try { - actionManager.addAlertAction(alert, scheduledFireTime, fireTime); - } catch (IOException ioe) { - logger.error("Failed to add alert action for [{}]", ioe, alert); + Alert alert = alertsStore.getAlert(alertName); + if (alert == null) { + logger.warn("Unable to find [{}] in the alert store, perhaps it has been deleted", alertName); + return; + } + if (!alert.enabled()) { + logger.debug("Alert [{}] is not enabled", alert.alertName()); + return; + } + + try { + actionManager.addAlertAction(alert, scheduledFireTime, fireTime); + } catch (IOException ioe) { + logger.error("Failed to add alert action for [{}]", ioe, alert); + } + } finally { + alertLock.release(alertName); } } - public TriggerResult executeAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime) throws IOException { + public TriggerResult executeAlert(AlertActionEntry entry) throws IOException { ensureStarted(); - Alert alert = alertsStore.getAlert(alertName); - if (alert == null) { - throw new ElasticsearchException("Alert is not available"); + alertLock.acquire(entry.getAlertName()); + try { + Alert alert = alertsStore.getAlert(entry.getAlertName()); + if (alert == null) { + throw new ElasticsearchException("Alert is not available"); + } + TriggerResult triggerResult = triggerManager.isTriggered(alert, entry.getScheduledTime(), entry.getFireTime()); + entry.setSearchRequest(triggerResult.getRequest()); + entry.setSearchResponse(triggerResult.getResponse()); + if (triggerResult.isTriggered()) { + entry.setTriggered(true); + actionRegistry.doAction(alert, triggerResult); + alert.lastActionFire(entry.getFireTime()); + alertsStore.updateAlert(alert); + } + return triggerResult; + } finally { + alertLock.release(entry.getAlertName()); } - TriggerResult triggerResult = triggerManager.isTriggered(alert, scheduledFireTime, fireTime); - if (triggerResult.isTriggered()) { - actionRegistry.doAction(alert, triggerResult); - alert.lastActionFire(fireTime); - alertsStore.updateAlert(alert); - } - return triggerResult; } diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java index 018268b50b6..4445ea66e37 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java @@ -288,12 +288,12 @@ public class AlertActionManager extends AbstractComponent { entry.setErrorMsg("Alert was not found in the alerts store"); updateHistoryEntry(entry, AlertActionState.ERROR); return; + } else if (!alert.enabled()) { + updateHistoryEntry(entry, AlertActionState.NO_ACTION_NEEDED); ///@TODO DISABLED + return; } updateHistoryEntry(entry, AlertActionState.SEARCH_UNDERWAY); - TriggerResult trigger = alertManager.executeAlert(alert.alertName(), entry.getScheduledTime(), entry.getFireTime()); - entry.setTriggered(trigger.isTriggered()); - entry.setSearchRequest(trigger.getRequest()); - entry.setSearchResponse(trigger.getResponse()); + TriggerResult trigger = alertManager.executeAlert(entry); updateHistoryEntry(entry, trigger.isTriggered() ? AlertActionState.ACTION_PERFORMED : AlertActionState.NO_ACTION_NEEDED); } catch (Exception e) { if (started()) {