If loading or alerts or alert entries fail the alert manager should retry and if the search response are partial alert manager should retry as well.

Original commit: elastic/x-pack-elasticsearch@ea6a5a6372
This commit is contained in:
Martijn van Groningen 2014-11-27 13:57:09 +01:00
parent 6da23d412a
commit 385ea45b7c
2 changed files with 11 additions and 3 deletions

View File

@ -157,6 +157,7 @@ public class AlertsStore extends AbstractComponent {
loadAlerts();
} catch (Exception e) {
logger.warn("Failed to load alerts", e);
return false;
}
templateHelper.checkAndUploadIndexTemplate(state, "alerts");
started.set(true);
@ -192,8 +193,8 @@ public class AlertsStore extends AbstractComponent {
}
private void loadAlerts() {
assert alertMap.isEmpty() : "No alerts should reside, but there are " + alertMap.size() + " alerts.";
client.admin().indices().refresh(new RefreshRequest(ALERT_INDEX)).actionGet();
SearchResponse response = client.prepareSearch(ALERT_INDEX)
.setTypes(ALERT_TYPE)
.setSearchType(SearchType.SCAN)
@ -202,6 +203,10 @@ public class AlertsStore extends AbstractComponent {
.setVersion(true)
.get();
try {
if (response.getTotalShards() != response.getSuccessfulShards()) {
throw new ElasticsearchException("Partial response while loading alerts");
}
if (response.getHits().getTotalHits() > 0) {
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
while (response.getHits().hits().length != 0) {

View File

@ -123,7 +123,7 @@ public class AlertActionManager extends AbstractComponent {
loadQueue();
} catch (Exception e) {
logger.error("Unable to load unfinished jobs into the job queue", e);
actionsToBeProcessed.clear();
return false;
}
templateHelper.checkAndUploadIndexTemplate(state, "alerthistory");
doStart();
@ -158,7 +158,6 @@ public class AlertActionManager extends AbstractComponent {
public void loadQueue() {
assert actionsToBeProcessed.isEmpty() : "Queue should be empty, but contains " + actionsToBeProcessed.size() + " elements.";
client.admin().indices().refresh(new RefreshRequest(ALERT_HISTORY_INDEX_PREFIX + "*")).actionGet();
SearchResponse response = client.prepareSearch(ALERT_HISTORY_INDEX_PREFIX + "*")
.setQuery(QueryBuilders.termQuery(STATE, AlertActionState.SEARCH_NEEDED.toString()))
.setSearchType(SearchType.SCAN)
@ -167,6 +166,10 @@ public class AlertActionManager extends AbstractComponent {
.setTypes(ALERT_HISTORY_TYPE)
.get();
try {
if (response.getTotalShards() != response.getSuccessfulShards()) {
throw new ElasticsearchException("Partial response while loading alert actions");
}
if (response.getHits().getTotalHits() > 0) {
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
while (response.getHits().hits().length != 0) {