Test: Added no master node test

Original commit: elastic/x-pack-elasticsearch@0edeaba3a7
This commit is contained in:
Martijn van Groningen 2014-11-07 13:21:15 +01:00
parent 387cc6fb76
commit f2453f53ac
5 changed files with 158 additions and 14 deletions

View File

@ -41,6 +41,8 @@ import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
*/
public class AlertsStore extends AbstractComponent {
@ -106,9 +108,9 @@ public class AlertsStore extends AbstractComponent {
/**
* Updates the specified alert by making sure that the made changes are persisted.
*/
public IndexResponse updateAlert(Alert alert) {
public IndexResponse updateAlert(Alert alert) throws IOException {
IndexResponse response = client.prepareIndex(ALERT_INDEX, ALERT_TYPE, alert.alertName())
.setSource()
.setSource(jsonBuilder().value(alert)) // TODO: the content type should be based on the provided content type when the alert was initially added.
.setVersion(alert.version())
.setOpType(IndexRequest.OpType.INDEX)
.get();
@ -158,8 +160,11 @@ public class AlertsStore extends AbstractComponent {
public void start(ClusterState state, final LoadingListener listener) {
IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX);
if (alertIndexMetaData != null) {
logger.info("Previous alerting index");
if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
logger.info("Previous alerting index with active primary shards");
if (this.state.compareAndSet(State.STOPPED, State.LOADING)) {
logger.info("Started loading");
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
@ -185,6 +190,7 @@ public class AlertsStore extends AbstractComponent {
}
}
} else {
logger.info("No previous .alert index");
if (AlertsStore.this.state.compareAndSet(State.STOPPED, State.STARTED)) {
listener.onSuccess();
}
@ -210,20 +216,24 @@ public class AlertsStore extends AbstractComponent {
}
private void loadAlerts() {
SearchResponse response = client.prepareSearch()
SearchResponse response = client.prepareSearch(ALERT_INDEX)
.setTypes(ALERT_TYPE)
.setSearchType(SearchType.SCAN)
.setScroll(scrollTimeout)
.setSize(scrollSize)
.setTypes(ALERT_TYPE)
.setIndices(ALERT_INDEX).get();
.setVersion(true)
.get();
try {
while (response.getHits().hits().length != 0) {
for (SearchHit sh : response.getHits()) {
String alertId = sh.getId();
Alert alert = parseAlert(alertId, sh);
alertMap.put(alertId, alert);
}
if (response.getHits().getTotalHits() > 0) {
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
while (response.getHits().hits().length != 0) {
for (SearchHit sh : response.getHits()) {
String alertId = sh.getId();
Alert alert = parseAlert(alertId, sh);
alertMap.put(alertId, alert);
}
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
}
}
} finally {
client.prepareClearScroll().addScrollId(response.getScrollId()).get();
@ -318,6 +328,15 @@ public class AlertsStore extends AbstractComponent {
if (alert.lastActionFire() == null) {
alert.lastActionFire(new DateTime(0));
}
if (alert.schedule() == null) {
throw new ElasticsearchIllegalArgumentException("Schedule is a required field");
}
if (alert.trigger() == null) {
throw new ElasticsearchIllegalArgumentException("Trigger is a required field");
}
return alert;
}

View File

@ -145,7 +145,7 @@ public class AlertActionManager extends AbstractComponent {
public void loadQueue() {
SearchResponse response = client.prepareSearch()
.setQuery(QueryBuilders.termQuery(AlertActionState.FIELD_NAME, AlertActionState.SEARCH_NEEDED))
.setQuery(QueryBuilders.termQuery(AlertActionState.FIELD_NAME, AlertActionState.SEARCH_NEEDED.toString()))
.setSearchType(SearchType.SCAN)
.setScroll(scrollTimeout)
.setSize(scrollSize)

View File

@ -54,7 +54,8 @@ public class TransportDeleteAlertAction extends TransportMasterNodeOperationActi
@Override
protected void masterOperation(DeleteAlertRequest request, ClusterState state, ActionListener<DeleteAlertResponse> listener) throws ElasticsearchException {
try {
listener.onResponse(new DeleteAlertResponse(alertManager.deleteAlert(request.getAlertName())));
DeleteAlertResponse response = new DeleteAlertResponse(alertManager.deleteAlert(request.getAlertName()));
listener.onResponse(response);
} catch (Exception e) {
listener.onFailure(e);
}

View File

@ -23,7 +23,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
*/
public class AbstractAlertingTests extends ElasticsearchIntegrationTest {
public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest {
@Override
protected Settings nodeSettings(int nodeOrdinal) {

View File

@ -0,0 +1,124 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.actions.AlertActionState;
import org.elasticsearch.alerts.client.AlertsClientInterface;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.base.Predicate;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numClientNodes = 0, transportClientRatio = 0, numDataNodes = 0)
public class NoMasterNodeTests extends AbstractAlertingTests {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings settings = super.nodeSettings(nodeOrdinal);
return ImmutableSettings.builder()
.put(settings)
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2)
.build();
}
@Test
public void testSimpleFailure() throws Exception {
internalCluster().startNodesAsync(2).get();
AlertsClientInterface alertsClient = alertClient();
createIndex("my-index");
// Have a sample document in the index, the alert is going to evaluate
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
SearchRequest searchRequest = new SearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
BytesReference alertSource = createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1");
alertsClient.prepareIndexAlert("my-first-alert")
.setAlertSource(alertSource)
.get();
assertBusy(new Runnable() {
@Override
public void run() {
IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertActionManager.ALERT_HISTORY_INDEX).get();
assertThat(indicesExistsResponse.isExists(), is(true));
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX)
.setQuery(termQuery("state", AlertActionState.ACTION_PERFORMED.toString()))
.addField("response.hits.total")
.setSize(1)
.get();
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
assertThat((Integer) searchResponse.getHits().getAt(0).field("response.hits.total").getValue(), equalTo(1));
}
}, 30, TimeUnit.SECONDS);
// Stop the elected master, no new master will be elected b/c of m_m_n is set to 2
internalCluster().stopCurrentMasterNode();
assertThat(awaitBusy(new Predicate<Object>() {
public boolean apply(Object obj) {
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
return state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID);
}
}), equalTo(true));
// Need to fetch a new client the old one maybe an internal client of the node we just killed.
alertsClient = alertClient();
try {
// any alerting action should fail, because there is no elected master node
alertsClient.prepareDeleteAlert("my-first-alert").setMasterNodeTimeout(TimeValue.timeValueSeconds(1)).get();
fail();
} catch (Exception e) {
assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(MasterNotDiscoveredException.class));
}
// Bring back the 2nd node and wait for elected master node to come back and alerting to work as expected.
internalCluster().startNode();
ensureGreen();
// Delete an existing alert
DeleteAlertResponse response = alertsClient.prepareDeleteAlert("my-first-alert").get();
assertThat(response.deleteResponse().isFound(), is(true));
// Add a new alert and wait for it get triggered
alertsClient.prepareIndexAlert("my-second-alert")
.setAlertSource(alertSource)
.get();
assertBusy(new Runnable() {
@Override
public void run() {
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX)
.setQuery(boolQuery().must(termQuery("state", AlertActionState.ACTION_PERFORMED.toString())).must(termQuery("alert_name", "my-second-alert")))
.addField("response.hits.total")
.setSize(1)
.get();
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
assertThat((Integer) searchResponse.getHits().getAt(0).field("response.hits.total").getValue(), equalTo(1));
}
}, 30, TimeUnit.SECONDS);
}
}