Only primary shards are now required to be available during alerting initialization.

Original commit: elastic/x-pack-elasticsearch@4ef5fd87dc
This commit is contained in:
Martijn van Groningen 2015-01-15 10:43:07 +01:00
parent fc5cbd5d08
commit 30506ef41d
2 changed files with 12 additions and 7 deletions

View File

@ -156,7 +156,7 @@ public class AlertsStore extends AbstractComponent {
if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
logger.debug("Previous alerting index with active primary shards");
try {
loadAlerts();
loadAlerts(alertIndexMetaData.numberOfShards());
} catch (Exception e) {
logger.warn("Failed to load previously stored alerts. Schedule to retry alert loading...", e);
alertMap.clear();
@ -195,15 +195,16 @@ public class AlertsStore extends AbstractComponent {
return indexRequest;
}
private void loadAlerts() {
private void loadAlerts(int numPrimaryShards) {
assert alertMap.isEmpty() : "No alerts should reside, but there are " + alertMap.size() + " alerts.";
RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_INDEX)).actionGet();
if (refreshResponse.getTotalShards() != refreshResponse.getSuccessfulShards()) {
throw new ElasticsearchException("Not all shards have been refreshed");
if (refreshResponse.getSuccessfulShards() < numPrimaryShards) {
throw new ElasticsearchException("Not all required shards have been refreshed");
}
SearchResponse response = client.prepareSearch(ALERT_INDEX)
.setTypes(ALERT_TYPE)
.setPreference("_primary")
.setSearchType(SearchType.SCAN)
.setScroll(scrollTimeout)
.setSize(scrollSize)

View File

@ -115,18 +115,21 @@ public class AlertActionManager extends AbstractComponent {
doStart();
return true;
}
int numPrimaryShards = 0;
for (String index : indices) {
IndexMetaData indexMetaData = state.getMetaData().index(index);
if (indexMetaData != null) {
if (!state.routingTable().index(index).allPrimaryShardsActive()) {
logger.warn("Not all primary shards of the [{}] index are started. Schedule to retry alert action loading..", index);
return false;
} else {
numPrimaryShards += indexMetaData.numberOfShards();
}
}
}
try {
loadQueue();
loadQueue(numPrimaryShards);
} catch (Exception e) {
logger.warn("Failed to load unfinished alert actions. Schedule to retry alert action loading...", e);
actionsToBeProcessed.clear();
@ -165,10 +168,10 @@ public class AlertActionManager extends AbstractComponent {
return ALERT_HISTORY_INDEX_PREFIX + alertHistoryIndexTimeFormat.print(time);
}
private void loadQueue() {
private void loadQueue(int numPrimaryShards) {
assert actionsToBeProcessed.isEmpty() : "Queue should be empty, but contains " + actionsToBeProcessed.size() + " elements.";
RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_HISTORY_INDEX_PREFIX + "*")).actionGet();
if (refreshResponse.getTotalShards() != refreshResponse.getSuccessfulShards()) {
if (refreshResponse.getSuccessfulShards() < numPrimaryShards) {
throw new ElasticsearchException("Not all shards have been refreshed");
}
@ -178,6 +181,7 @@ public class AlertActionManager extends AbstractComponent {
.setScroll(scrollTimeout)
.setSize(scrollSize)
.setTypes(ALERT_HISTORY_TYPE)
.setPreference("_primary")
.get();
try {
if (response.getTotalShards() != response.getSuccessfulShards()) {