From 30506ef41db35510a1d8157396b49b2f93a7879e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 15 Jan 2015 10:43:07 +0100 Subject: [PATCH] Only primary shards are now required to be available during alerting initialization. Original commit: elastic/x-pack-elasticsearch@4ef5fd87dc7e062028a56dbf61bc392891fd4108 --- .../java/org/elasticsearch/alerts/AlertsStore.java | 9 +++++---- .../alerts/actions/AlertActionManager.java | 10 +++++++--- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index 9201c0f11dc..89a925c402b 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -156,7 +156,7 @@ public class AlertsStore extends AbstractComponent { if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) { logger.debug("Previous alerting index with active primary shards"); try { - loadAlerts(); + loadAlerts(alertIndexMetaData.numberOfShards()); } catch (Exception e) { logger.warn("Failed to load previously stored alerts. Schedule to retry alert loading...", e); alertMap.clear(); @@ -195,15 +195,16 @@ public class AlertsStore extends AbstractComponent { return indexRequest; } - private void loadAlerts() { + private void loadAlerts(int numPrimaryShards) { assert alertMap.isEmpty() : "No alerts should reside, but there are " + alertMap.size() + " alerts."; RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_INDEX)).actionGet(); - if (refreshResponse.getTotalShards() != refreshResponse.getSuccessfulShards()) { - throw new ElasticsearchException("Not all shards have been refreshed"); + if (refreshResponse.getSuccessfulShards() < numPrimaryShards) { + throw new ElasticsearchException("Not all required shards have been refreshed"); } SearchResponse response = client.prepareSearch(ALERT_INDEX) .setTypes(ALERT_TYPE) + .setPreference("_primary") .setSearchType(SearchType.SCAN) .setScroll(scrollTimeout) .setSize(scrollSize) diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java index b27f22e1fa4..6aca4ba69b9 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java @@ -115,18 +115,21 @@ public class AlertActionManager extends AbstractComponent { doStart(); return true; } + int numPrimaryShards = 0; for (String index : indices) { IndexMetaData indexMetaData = state.getMetaData().index(index); if (indexMetaData != null) { if (!state.routingTable().index(index).allPrimaryShardsActive()) { logger.warn("Not all primary shards of the [{}] index are started. Schedule to retry alert action loading..", index); return false; + } else { + numPrimaryShards += indexMetaData.numberOfShards(); } } } try { - loadQueue(); + loadQueue(numPrimaryShards); } catch (Exception e) { logger.warn("Failed to load unfinished alert actions. Schedule to retry alert action loading...", e); actionsToBeProcessed.clear(); @@ -165,10 +168,10 @@ public class AlertActionManager extends AbstractComponent { return ALERT_HISTORY_INDEX_PREFIX + alertHistoryIndexTimeFormat.print(time); } - private void loadQueue() { + private void loadQueue(int numPrimaryShards) { assert actionsToBeProcessed.isEmpty() : "Queue should be empty, but contains " + actionsToBeProcessed.size() + " elements."; RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_HISTORY_INDEX_PREFIX + "*")).actionGet(); - if (refreshResponse.getTotalShards() != refreshResponse.getSuccessfulShards()) { + if (refreshResponse.getSuccessfulShards() < numPrimaryShards) { throw new ElasticsearchException("Not all shards have been refreshed"); } @@ -178,6 +181,7 @@ public class AlertActionManager extends AbstractComponent { .setScroll(scrollTimeout) .setSize(scrollSize) .setTypes(ALERT_HISTORY_TYPE) + .setPreference("_primary") .get(); try { if (response.getTotalShards() != response.getSuccessfulShards()) {