First stop the alert action manager and enforce started property inside alert action manager.

Original commit: elastic/x-pack-elasticsearch@75a98a93f5
This commit is contained in:
Martijn van Groningen 2014-11-20 12:39:09 +01:00
parent cfdc061908
commit 15d9101ea9
2 changed files with 22 additions and 11 deletions

View File

@ -158,8 +158,8 @@ public class AlertManager extends AbstractComponent {
public synchronized void stop() {
if (state.compareAndSet(State.LOADING, State.STOPPED) || state.compareAndSet(State.STARTED, State.STOPPED)) {
logger.info("Stopping alert manager...");
scheduler.stop();
actionManager.stop();
scheduler.stop();
alertsStore.stop();
logger.info("Alert manager has stopped");
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
@ -123,9 +124,11 @@ public class AlertActionManager extends AbstractComponent {
}
public void stop() {
actionsToBeProcessed.clear();
actionsToBeProcessed.add(END_ENTRY);
logger.info("Stopped job queue");
if (started.compareAndSet(true, false)) {
actionsToBeProcessed.clear();
actionsToBeProcessed.add(END_ENTRY);
logger.info("Stopped job queue");
}
}
public boolean started() {
@ -236,6 +239,7 @@ public class AlertActionManager extends AbstractComponent {
}
public void addAlertAction(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
ensureStarted();
AlertActionEntry entry = new AlertActionEntry(alert, scheduledFireTime, fireTime, AlertActionState.SEARCH_NEEDED);
IndexResponse response = client.prepareIndex(ALERT_HISTORY_INDEX, ALERT_HISTORY_TYPE, entry.getId())
.setSource(XContentFactory.jsonBuilder().value(entry))
@ -257,7 +261,16 @@ public class AlertActionManager extends AbstractComponent {
}
}
public long getQueueSize() {
return actionsToBeProcessed.size();
}
public long getLargestQueueSize() {
return largestQueueSize.get();
}
private void updateHistoryEntry(AlertActionEntry entry, AlertActionState actionPerformed) throws IOException {
ensureStarted();
entry.setState(actionPerformed);
IndexResponse response = client.prepareIndex(ALERT_HISTORY_INDEX, ALERT_HISTORY_TYPE, entry.getId())
.setSource(XContentFactory.jsonBuilder().value(entry))
@ -265,12 +278,10 @@ public class AlertActionManager extends AbstractComponent {
entry.setVersion(response.getVersion());
}
public long getQueueSize() {
return actionsToBeProcessed.size();
}
public long getLargestQueueSize() {
return largestQueueSize.get();
private void ensureStarted() {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
}
private class AlertHistoryRunnable implements Runnable {
@ -321,7 +332,7 @@ public class AlertActionManager extends AbstractComponent {
while (started()) {
AlertActionEntry entry = actionsToBeProcessed.take();
if (!started() || entry == END_ENTRY) {
logger.debug("Stopping thread to read from the job queue");
logger.info("Stopping thread to read from the job queue");
return;
}
threadPool.executor(AlertsPlugin.ALERT_THREAD_POOL_NAME).execute(new AlertHistoryRunnable(entry));