diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java index d0e2faa08fa..aedb9dba293 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; @@ -103,28 +102,21 @@ public class AlertActionManager extends AbstractComponent { if (started.get()) { return true; } - try { - String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*"); - if (indices.length == 0) { - logger.info("No previous .alerthistory index, skip loading of alert actions"); - templateHelper.checkAndUploadIndexTemplate(state, "alerthistory"); - doStart(); - return true; - } - - - for (String index : indices) { - IndexMetaData indexMetaData = state.getMetaData().index(index); - if (indexMetaData != null) { - if (!state.routingTable().index(index).allPrimaryShardsActive()) { - logger.info("Not all primary shards of the [{}] index are started", index); - return false; - } + String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*"); + if (indices.length == 0) { + logger.info("No previous .alerthistory index, skip loading of alert actions"); + templateHelper.checkAndUploadIndexTemplate(state, "alerthistory"); + doStart(); + return true; + } + for (String index : indices) { + IndexMetaData indexMetaData = state.getMetaData().index(index); + if (indexMetaData != null) { + if (!state.routingTable().index(index).allPrimaryShardsActive()) { + logger.info("Not all primary shards of the [{}] index are started", index); + return false; } } - } catch (Exception e){ - logger.error("Unable to check index availability", e); - return false; } try { @@ -263,30 +255,15 @@ public class AlertActionManager extends AbstractComponent { } public void addAlertAction(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException { - addAlertAction(alert, scheduledFireTime, fireTime, true); - } - - public void addAlertAction(Alert alert, DateTime scheduledFireTime, DateTime fireTime, boolean retry) throws IOException { ensureStarted(); logger.debug("Adding alert action for alert [{}]", alert.alertName()); String alertHistoryIndex = getAlertHistoryIndexNameForTime(scheduledFireTime); AlertActionEntry entry = new AlertActionEntry(alert, scheduledFireTime, fireTime, AlertActionState.SEARCH_NEEDED); - try { - IndexResponse response = client.prepareIndex(alertHistoryIndex, ALERT_HISTORY_TYPE, entry.getId()) - .setSource(XContentFactory.jsonBuilder().value(entry)) - .setOpType(IndexRequest.OpType.CREATE) - .get(); - entry.setVersion(response.getVersion()); - } catch (IndexMissingException ime) { - ///@TODO This really shouldn't be happening - if (retry) { - logger.error("Unable to dynamically implicitly create alert history index [" + alertHistoryIndex + "] creating explicitly"); - client.admin().indices().prepareCreate(alertHistoryIndex).get(); - addAlertAction(alert, scheduledFireTime, fireTime, false); - } else { - throw new ElasticsearchException("Unable to create alert history index [" + alertHistoryIndex + "]", ime); - } - } + IndexResponse response = client.prepareIndex(alertHistoryIndex, ALERT_HISTORY_TYPE, entry.getId()) + .setSource(XContentFactory.jsonBuilder().value(entry)) + .setOpType(IndexRequest.OpType.CREATE) + .get(); + entry.setVersion(response.getVersion()); long currentSize = actionsToBeProcessed.size() + 1; actionsToBeProcessed.add(entry); diff --git a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java index 3010ca4547b..6292f2b6f9b 100644 --- a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java +++ b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.alerts; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -17,6 +16,7 @@ import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.common.settings.ImmutableSettings; @@ -37,8 +37,7 @@ import java.util.*; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.*; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsNot.not; @@ -132,8 +131,15 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest assertBusy(new Runnable() { @Override public void run() { - IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX+"*").get(); - assertThat(indicesExistsResponse.isExists(), is(true)); + ClusterState state = client().admin().cluster().prepareState().get().getState(); + String[] alertHistoryIndices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*"); + assertThat(alertHistoryIndices, not(emptyArray())); + for (String index : alertHistoryIndices) { + IndexRoutingTable routingTable = state.getRoutingTable().index(index); + assertThat(routingTable, notNullValue()); + assertThat(routingTable.allPrimaryShardsActive(), is(true)); + } + SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*") .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString()))) @@ -158,8 +164,15 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest assertBusy(new Runnable() { @Override public void run() { - IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX+"*").get(); - assertThat(indicesExistsResponse.isExists(), is(true)); + // The alerthistory index gets created in the background when the first alert fires, so we to check first is this index is created and shards are started + ClusterState state = client().admin().cluster().prepareState().get().getState(); + String[] alertHistoryIndices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*"); + assertThat(alertHistoryIndices, not(emptyArray())); + for (String index : alertHistoryIndices) { + IndexRoutingTable routingTable = state.getRoutingTable().index(index); + assertThat(routingTable, notNullValue()); + assertThat(routingTable.allPrimaryShardsActive(), is(true)); + } SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*") .setIndicesOptions(IndicesOptions.lenientExpandOpen())