Fixed build by:

* Using cluster state listener to clear alerts when .alerts index is removed. (when running on multiple nodes the .alerts index is scattered so indices listener doesn't work).
* Remove the starting / loading thread and move the initial loading to cluster state listener.

Original commit: elastic/x-pack-elasticsearch@b8f41db2ea
This commit is contained in:
Martijn van Groningen 2014-10-25 23:36:15 +02:00
parent 266a53d913
commit a23487cd38
2 changed files with 57 additions and 49 deletions

View File

@ -27,6 +27,10 @@ import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
@ -41,12 +45,13 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -78,38 +83,10 @@ public class AlertManager extends AbstractLifecycleComponent {
private final ConcurrentMap<String,Alert> alertMap; private final ConcurrentMap<String,Alert> alertMap;
private AtomicBoolean started = new AtomicBoolean(false); private AtomicBoolean started = new AtomicBoolean(false);
private final Thread starter;
private AlertActionManager actionManager; private AlertActionManager actionManager;
final TimeValue defaultTimePeriod = new TimeValue(300*1000); //TODO : read from config final TimeValue defaultTimePeriod = new TimeValue(300*1000); //TODO : read from config
class StarterThread implements Runnable {
@Override
public void run() {
logger.warn("Starting thread to get alerts");
int attempts = 0;
while (attempts < 2) {
try {
logger.warn("Sleeping [{}]", attempts);
Thread.sleep(5000);
logger.warn("Slept");
break;
} catch (InterruptedException ie) {
++attempts;
}
}
logger.warn("Loading alerts");
try {
refreshAlerts();
started.set(true);
} catch (Throwable t) {
logger.error("Failed to load alerts", t);
}
}
}
private void sendAlertsToScheduler() { private void sendAlertsToScheduler() {
for (Map.Entry<String, Alert> entry : alertMap.entrySet()) { for (Map.Entry<String, Alert> entry : alertMap.entrySet()) {
scheduler.addAlert(entry.getKey(), entry.getValue()); scheduler.addAlert(entry.getKey(), entry.getValue());
@ -123,20 +100,11 @@ public class AlertManager extends AbstractLifecycleComponent {
@Override @Override
protected void doStart() throws ElasticsearchException { protected void doStart() throws ElasticsearchException {
logger.warn("STARTING"); logger.warn("STARTING");
starter.start();
} }
@Override @Override
protected void doStop() throws ElasticsearchException { protected void doStop() throws ElasticsearchException {
logger.warn("STOPPING"); logger.warn("STOPPING");
starter.interrupt();
/*
try {
starter.join();
} catch (InterruptedException ie) {
logger.warn("Interrupted on joining start thread.", ie);
}
*/
} }
@Override @Override
@ -146,14 +114,12 @@ public class AlertManager extends AbstractLifecycleComponent {
@Inject @Inject
public AlertManager(Settings settings, Client client, IndicesLifecycle indicesLifecycle) { public AlertManager(Settings settings, Client client, ClusterService clusterService) {
super(settings); super(settings);
logger.warn("Initing AlertManager"); logger.warn("Initing AlertManager");
this.client = client; this.client = client;
alertMap = ConcurrentCollections.newConcurrentMap(); alertMap = ConcurrentCollections.newConcurrentMap();
starter = new Thread(new StarterThread()); clusterService.add(new AlertsClusterStateListener());
indicesLifecycle.addListener(new IndicesLifeCycleListener());
//scheduleAlerts();
} }
public void setAlertScheduler(AlertScheduler scheduler){ public void setAlertScheduler(AlertScheduler scheduler){
@ -178,7 +144,6 @@ public class AlertManager extends AbstractLifecycleComponent {
} }
} }
public boolean claimAlertRun(String alertName, DateTime scheduleRunTime) { public boolean claimAlertRun(String alertName, DateTime scheduleRunTime) {
Alert indexedAlert; Alert indexedAlert;
try { try {
@ -238,6 +203,10 @@ public class AlertManager extends AbstractLifecycleComponent {
} }
private Alert getAlertFromIndex(String alertName) { private Alert getAlertFromIndex(String alertName) {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
GetRequest getRequest = Requests.getRequest(ALERT_INDEX); GetRequest getRequest = Requests.getRequest(ALERT_INDEX);
getRequest.type(ALERT_TYPE); getRequest.type(ALERT_TYPE);
getRequest.id(alertName); getRequest.id(alertName);
@ -356,6 +325,10 @@ public class AlertManager extends AbstractLifecycleComponent {
} }
public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException { public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
if (alertMap.remove(alertName) != null) { if (alertMap.remove(alertName) != null) {
scheduler.deleteAlertFromSchedule(alertName); scheduler.deleteAlertFromSchedule(alertName);
try { try {
@ -385,6 +358,10 @@ public class AlertManager extends AbstractLifecycleComponent {
} }
public boolean addAlert(String alertName, Alert alert, boolean persist) { public boolean addAlert(String alertName, Alert alert, boolean persist) {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) { if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) {
createAlertsIndex(); createAlertsIndex();
} }
@ -559,14 +536,37 @@ public class AlertManager extends AbstractLifecycleComponent {
return alertMap.get(alertName); return alertMap.get(alertName);
} }
private final class IndicesLifeCycleListener extends IndicesLifecycle.Listener { public boolean isStarted() {
return started.get();
}
private final class AlertsClusterStateListener implements ClusterStateListener {
@Override @Override
public void afterIndexClosed(Index index) { public void clusterChanged(ClusterChangedEvent event) {
if (index.getName().equals(ALERT_INDEX)) { if (event.indicesDeleted().contains(ALERT_INDEX)) {
alertMap.clear(); alertMap.clear();
} }
if (!started.get()) {
IndexMetaData alertIndexMetaData = event.state().getMetaData().index(ALERT_INDEX);
if (alertIndexMetaData != null) {
if (event.state().routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
// TODO: Do on a different thread and have some kind of retry mechanism?
try {
loadAlerts();
sendAlertsToScheduler();
} catch (Exception e) {
logger.warn("Error during loading of alerts from an existing .alerts index... refresh the alerts manually");
}
started.set(true);
}
} else {
started.set(true);
}
}
} }
} }
} }

View File

@ -68,7 +68,15 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
AlertScheduler alertScheduler = internalCluster().getInstance(AlertScheduler.class, internalCluster().getMasterName()); AlertScheduler alertScheduler = internalCluster().getInstance(AlertScheduler.class, internalCluster().getMasterName());
assertThat(alertScheduler.isRunning(), is(true)); assertThat(alertScheduler.isRunning(), is(true));
AlertManager alertManager = internalCluster().getInstance(AlertManager.class, internalCluster().getMasterName()); final AlertManager alertManager = internalCluster().getInstance(AlertManager.class, internalCluster().getMasterName());
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(alertManager.isStarted(), is(true));
}
});
final AtomicBoolean alertActionInvoked = new AtomicBoolean(false); final AtomicBoolean alertActionInvoked = new AtomicBoolean(false);
final AlertAction alertAction = new AlertAction() { final AlertAction alertAction = new AlertAction() {
@Override @Override