CORE : add keyed lock.

This commit adds a keyed lock to prevent concurrent modification of the alert store and alert index.

Original commit: elastic/x-pack-elasticsearch@416351c06d
This commit is contained in:
Brian Murphy 2014-11-13 16:38:49 +00:00
parent 1e593a4075
commit c070e932c3
2 changed files with 62 additions and 38 deletions

View File

@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.alerts.actions.AlertActionEntry;
import org.elasticsearch.alerts.actions.AlertActionManager; import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.scheduler.AlertScheduler; import org.elasticsearch.alerts.scheduler.AlertScheduler;
@ -25,6 +26,7 @@ import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -33,9 +35,6 @@ import java.io.IOException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
// TODO: Add lock synchronization via KeyedLock so that we can lock concurrent operations for the same alert.
// For example 2 concurrent deletes for the same alert, so that at least one fails, but not that 2 deletes are half done.
// The KeyedLock make sure that we only lock on the same alert, but not on different alerts.
public class AlertManager extends AbstractComponent { public class AlertManager extends AbstractComponent {
private final AlertScheduler scheduler; private final AlertScheduler scheduler;
@ -45,6 +44,7 @@ public class AlertManager extends AbstractComponent {
private final AlertActionRegistry actionRegistry; private final AlertActionRegistry actionRegistry;
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final KeyedLock<String> alertLock = new KeyedLock<>();
@Inject @Inject
public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore, public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore,
@ -68,22 +68,33 @@ public class AlertManager extends AbstractComponent {
stop(); stop();
} }
}); });
} }
public DeleteResponse deleteAlert(String name) throws InterruptedException, ExecutionException { public DeleteResponse deleteAlert(String name) throws InterruptedException, ExecutionException {
ensureStarted(); ensureStarted();
DeleteResponse deleteResponse = alertsStore.deleteAlert(name); alertLock.acquire(name);
if (deleteResponse.isFound()) { try {
scheduler.remove(name); DeleteResponse deleteResponse = alertsStore.deleteAlert(name);
if (deleteResponse.isFound()) {
scheduler.remove(name);
}
return deleteResponse;
} finally {
alertLock.release(name);
} }
return deleteResponse;
} }
public IndexResponse addAlert(String alertName, BytesReference alertSource) { public IndexResponse addAlert(String alertName, BytesReference alertSource) {
ensureStarted(); ensureStarted();
Tuple<Alert, IndexResponse> result = alertsStore.addAlert(alertName, alertSource); alertLock.acquire(alertName);
scheduler.add(alertName, result.v1()); try {
return result.v2(); Tuple<Alert, IndexResponse> result = alertsStore.addAlert(alertName, alertSource);
scheduler.add(alertName, result.v1());
return result.v2();
} finally {
alertLock.release(alertName);
}
} }
public boolean isStarted() { public boolean isStarted() {
@ -92,36 +103,49 @@ public class AlertManager extends AbstractComponent {
public void scheduleAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){ public void scheduleAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){
ensureStarted(); ensureStarted();
Alert alert = alertsStore.getAlert(alertName); alertLock.acquire(alertName);
if (alert == null) {
logger.warn("Unable to find [{}] in the alert store, perhaps it has been deleted", alertName);
return;
}
if (!alert.enabled()) {
logger.debug("Alert [{}] is not enabled", alert.alertName());
return;
}
try { try {
actionManager.addAlertAction(alert, scheduledFireTime, fireTime); Alert alert = alertsStore.getAlert(alertName);
} catch (IOException ioe) { if (alert == null) {
logger.error("Failed to add alert action for [{}]", ioe, alert); logger.warn("Unable to find [{}] in the alert store, perhaps it has been deleted", alertName);
return;
}
if (!alert.enabled()) {
logger.debug("Alert [{}] is not enabled", alert.alertName());
return;
}
try {
actionManager.addAlertAction(alert, scheduledFireTime, fireTime);
} catch (IOException ioe) {
logger.error("Failed to add alert action for [{}]", ioe, alert);
}
} finally {
alertLock.release(alertName);
} }
} }
public TriggerResult executeAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime) throws IOException { public TriggerResult executeAlert(AlertActionEntry entry) throws IOException {
ensureStarted(); ensureStarted();
Alert alert = alertsStore.getAlert(alertName); alertLock.acquire(entry.getAlertName());
if (alert == null) { try {
throw new ElasticsearchException("Alert is not available"); Alert alert = alertsStore.getAlert(entry.getAlertName());
if (alert == null) {
throw new ElasticsearchException("Alert is not available");
}
TriggerResult triggerResult = triggerManager.isTriggered(alert, entry.getScheduledTime(), entry.getFireTime());
entry.setSearchRequest(triggerResult.getRequest());
entry.setSearchResponse(triggerResult.getResponse());
if (triggerResult.isTriggered()) {
entry.setTriggered(true);
actionRegistry.doAction(alert, triggerResult);
alert.lastActionFire(entry.getFireTime());
alertsStore.updateAlert(alert);
}
return triggerResult;
} finally {
alertLock.release(entry.getAlertName());
} }
TriggerResult triggerResult = triggerManager.isTriggered(alert, scheduledFireTime, fireTime);
if (triggerResult.isTriggered()) {
actionRegistry.doAction(alert, triggerResult);
alert.lastActionFire(fireTime);
alertsStore.updateAlert(alert);
}
return triggerResult;
} }

View File

@ -288,12 +288,12 @@ public class AlertActionManager extends AbstractComponent {
entry.setErrorMsg("Alert was not found in the alerts store"); entry.setErrorMsg("Alert was not found in the alerts store");
updateHistoryEntry(entry, AlertActionState.ERROR); updateHistoryEntry(entry, AlertActionState.ERROR);
return; return;
} else if (!alert.enabled()) {
updateHistoryEntry(entry, AlertActionState.NO_ACTION_NEEDED); ///@TODO DISABLED
return;
} }
updateHistoryEntry(entry, AlertActionState.SEARCH_UNDERWAY); updateHistoryEntry(entry, AlertActionState.SEARCH_UNDERWAY);
TriggerResult trigger = alertManager.executeAlert(alert.alertName(), entry.getScheduledTime(), entry.getFireTime()); TriggerResult trigger = alertManager.executeAlert(entry);
entry.setTriggered(trigger.isTriggered());
entry.setSearchRequest(trigger.getRequest());
entry.setSearchResponse(trigger.getResponse());
updateHistoryEntry(entry, trigger.isTriggered() ? AlertActionState.ACTION_PERFORMED : AlertActionState.NO_ACTION_NEEDED); updateHistoryEntry(entry, trigger.isTriggered() ? AlertActionState.ACTION_PERFORMED : AlertActionState.NO_ACTION_NEEDED);
} catch (Exception e) { } catch (Exception e) {
if (started()) { if (started()) {