diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index 1bf2b5cbe84..beb02b5cadf 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -41,6 +41,8 @@ import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + /** */ public class AlertsStore extends AbstractComponent { @@ -106,9 +108,9 @@ public class AlertsStore extends AbstractComponent { /** * Updates the specified alert by making sure that the made changes are persisted. */ - public IndexResponse updateAlert(Alert alert) { + public IndexResponse updateAlert(Alert alert) throws IOException { IndexResponse response = client.prepareIndex(ALERT_INDEX, ALERT_TYPE, alert.alertName()) - .setSource() + .setSource(jsonBuilder().value(alert)) // TODO: the content type should be based on the provided content type when the alert was initially added. .setVersion(alert.version()) .setOpType(IndexRequest.OpType.INDEX) .get(); @@ -158,8 +160,11 @@ public class AlertsStore extends AbstractComponent { public void start(ClusterState state, final LoadingListener listener) { IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX); if (alertIndexMetaData != null) { + logger.info("Previous alerting index"); if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) { + logger.info("Previous alerting index with active primary shards"); if (this.state.compareAndSet(State.STOPPED, State.LOADING)) { + logger.info("Started loading"); threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { @Override public void run() { @@ -185,6 +190,7 @@ public class AlertsStore extends AbstractComponent { } } } else { + logger.info("No previous .alert index"); if (AlertsStore.this.state.compareAndSet(State.STOPPED, State.STARTED)) { listener.onSuccess(); } @@ -210,20 +216,24 @@ public class AlertsStore extends AbstractComponent { } private void loadAlerts() { - SearchResponse response = client.prepareSearch() + SearchResponse response = client.prepareSearch(ALERT_INDEX) + .setTypes(ALERT_TYPE) .setSearchType(SearchType.SCAN) .setScroll(scrollTimeout) .setSize(scrollSize) - .setTypes(ALERT_TYPE) - .setIndices(ALERT_INDEX).get(); + .setVersion(true) + .get(); try { - while (response.getHits().hits().length != 0) { - for (SearchHit sh : response.getHits()) { - String alertId = sh.getId(); - Alert alert = parseAlert(alertId, sh); - alertMap.put(alertId, alert); - } + if (response.getHits().getTotalHits() > 0) { response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); + while (response.getHits().hits().length != 0) { + for (SearchHit sh : response.getHits()) { + String alertId = sh.getId(); + Alert alert = parseAlert(alertId, sh); + alertMap.put(alertId, alert); + } + response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); + } } } finally { client.prepareClearScroll().addScrollId(response.getScrollId()).get(); @@ -318,6 +328,15 @@ public class AlertsStore extends AbstractComponent { if (alert.lastActionFire() == null) { alert.lastActionFire(new DateTime(0)); } + + if (alert.schedule() == null) { + throw new ElasticsearchIllegalArgumentException("Schedule is a required field"); + } + + if (alert.trigger() == null) { + throw new ElasticsearchIllegalArgumentException("Trigger is a required field"); + } + return alert; } diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java index 69ec5777643..b77c128fdc6 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java @@ -145,7 +145,7 @@ public class AlertActionManager extends AbstractComponent { public void loadQueue() { SearchResponse response = client.prepareSearch() - .setQuery(QueryBuilders.termQuery(AlertActionState.FIELD_NAME, AlertActionState.SEARCH_NEEDED)) + .setQuery(QueryBuilders.termQuery(AlertActionState.FIELD_NAME, AlertActionState.SEARCH_NEEDED.toString())) .setSearchType(SearchType.SCAN) .setScroll(scrollTimeout) .setSize(scrollSize) diff --git a/src/main/java/org/elasticsearch/alerts/transport/actions/delete/TransportDeleteAlertAction.java b/src/main/java/org/elasticsearch/alerts/transport/actions/delete/TransportDeleteAlertAction.java index 27d2f155275..f4bd109e30e 100644 --- a/src/main/java/org/elasticsearch/alerts/transport/actions/delete/TransportDeleteAlertAction.java +++ b/src/main/java/org/elasticsearch/alerts/transport/actions/delete/TransportDeleteAlertAction.java @@ -54,7 +54,8 @@ public class TransportDeleteAlertAction extends TransportMasterNodeOperationActi @Override protected void masterOperation(DeleteAlertRequest request, ClusterState state, ActionListener listener) throws ElasticsearchException { try { - listener.onResponse(new DeleteAlertResponse(alertManager.deleteAlert(request.getAlertName()))); + DeleteAlertResponse response = new DeleteAlertResponse(alertManager.deleteAlert(request.getAlertName())); + listener.onResponse(response); } catch (Exception e) { listener.onFailure(e); } diff --git a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java index 0dd3f0c1ba6..5e438e85b5d 100644 --- a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java +++ b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java @@ -23,7 +23,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; /** */ -public class AbstractAlertingTests extends ElasticsearchIntegrationTest { +public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest { @Override protected Settings nodeSettings(int nodeOrdinal) { diff --git a/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java b/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java new file mode 100644 index 00000000000..5706a7c7348 --- /dev/null +++ b/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.alerts.actions.AlertActionManager; +import org.elasticsearch.alerts.actions.AlertActionState; +import org.elasticsearch.alerts.client.AlertsClientInterface; +import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.base.Predicate; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.index.query.QueryBuilders.boolQuery; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.core.Is.is; + +/** + */ +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numClientNodes = 0, transportClientRatio = 0, numDataNodes = 0) +public class NoMasterNodeTests extends AbstractAlertingTests { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings settings = super.nodeSettings(nodeOrdinal); + return ImmutableSettings.builder() + .put(settings) + .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2) + .build(); + } + + @Test + public void testSimpleFailure() throws Exception { + internalCluster().startNodesAsync(2).get(); + AlertsClientInterface alertsClient = alertClient(); + createIndex("my-index"); + // Have a sample document in the index, the alert is going to evaluate + client().prepareIndex("my-index", "my-type").setSource("field", "value").get(); + SearchRequest searchRequest = new SearchRequest("my-index").source(searchSource().query(termQuery("field", "value"))); + BytesReference alertSource = createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1"); + alertsClient.prepareIndexAlert("my-first-alert") + .setAlertSource(alertSource) + .get(); + + assertBusy(new Runnable() { + @Override + public void run() { + IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertActionManager.ALERT_HISTORY_INDEX).get(); + assertThat(indicesExistsResponse.isExists(), is(true)); + + SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX) + .setQuery(termQuery("state", AlertActionState.ACTION_PERFORMED.toString())) + .addField("response.hits.total") + .setSize(1) + .get(); + assertThat(searchResponse.getHits().getHits().length, equalTo(1)); + assertThat((Integer) searchResponse.getHits().getAt(0).field("response.hits.total").getValue(), equalTo(1)); + } + }, 30, TimeUnit.SECONDS); + + // Stop the elected master, no new master will be elected b/c of m_m_n is set to 2 + internalCluster().stopCurrentMasterNode(); + assertThat(awaitBusy(new Predicate() { + public boolean apply(Object obj) { + ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + return state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID); + } + }), equalTo(true)); + + // Need to fetch a new client the old one maybe an internal client of the node we just killed. + alertsClient = alertClient(); + try { + // any alerting action should fail, because there is no elected master node + alertsClient.prepareDeleteAlert("my-first-alert").setMasterNodeTimeout(TimeValue.timeValueSeconds(1)).get(); + fail(); + } catch (Exception e) { + assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(MasterNotDiscoveredException.class)); + } + + // Bring back the 2nd node and wait for elected master node to come back and alerting to work as expected. + internalCluster().startNode(); + ensureGreen(); + + // Delete an existing alert + DeleteAlertResponse response = alertsClient.prepareDeleteAlert("my-first-alert").get(); + assertThat(response.deleteResponse().isFound(), is(true)); + // Add a new alert and wait for it get triggered + alertsClient.prepareIndexAlert("my-second-alert") + .setAlertSource(alertSource) + .get(); + assertBusy(new Runnable() { + @Override + public void run() { + SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX) + .setQuery(boolQuery().must(termQuery("state", AlertActionState.ACTION_PERFORMED.toString())).must(termQuery("alert_name", "my-second-alert"))) + .addField("response.hits.total") + .setSize(1) + .get(); + assertThat(searchResponse.getHits().getHits().length, equalTo(1)); + assertThat((Integer) searchResponse.getHits().getAt(0).field("response.hits.total").getValue(), equalTo(1)); + } + }, 30, TimeUnit.SECONDS); + } + +}