diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java index 5d6fbd4b675..a0cccadab33 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java @@ -49,6 +49,9 @@ public class AlertManager extends AbstractComponent { private final ClusterService clusterService; private final KeyedLock alertLock = new KeyedLock<>(); private final AtomicReference state = new AtomicReference<>(State.STOPPED); + private final AlertsClusterStateListener alertsClusterStateListener = new AlertsClusterStateListener(); + + private volatile boolean manuallyStopped; @Inject public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore, @@ -64,13 +67,14 @@ public class AlertManager extends AbstractComponent { this.actionManager.setAlertManager(this); this.actionRegistry = actionRegistry; this.clusterService = clusterService; - clusterService.add(new AlertsClusterStateListener()); + clusterService.add(alertsClusterStateListener); + manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true); // Close if the indices service is being stopped, so we don't run into search failures (locally) that will // happen because we're shutting down and an alert is scheduled. indicesService.addLifecycleListener(new LifecycleListener() { @Override public void beforeStop() { - stop(); + internalStop(); } }); @@ -177,12 +181,25 @@ public class AlertManager extends AbstractComponent { return false; } + public void start() { + if (state.compareAndSet(State.STOPPED, State.LOADING)) { + manuallyStopped = false; + logger.info("Starting alert manager..."); + ClusterState state = clusterService.state(); + alertsClusterStateListener.initialize(state); + } + } + + public void stop() { + manuallyStopped = true; + internalStop(); + } // This is synchronized, because this may first be called from the cluster changed event and then from before close // when a node closes. The stop also stops the scheduler which has several background threads. If this method is // invoked in that order that node closes and the test framework complains then about the fact that there are still // threads alive. - public synchronized void stop() { + private synchronized void internalStop() { if (state.compareAndSet(State.LOADING, State.STOPPED) || state.compareAndSet(State.STARTED, State.STOPPED)) { logger.info("Stopping alert manager..."); actionManager.stop(); @@ -251,7 +268,7 @@ public class AlertManager extends AbstractComponent { threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { @Override public void run() { - stop(); + internalStop(); } }); } else { @@ -267,6 +284,10 @@ public class AlertManager extends AbstractComponent { } private void initialize(final ClusterState state) { + if (manuallyStopped) { + return; + } + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { @Override public void run() { @@ -293,6 +314,7 @@ public class AlertManager extends AbstractComponent { if (alertsStore.started() && actionManager.started()) { if (state.compareAndSet(State.LOADING, State.STARTED)) { scheduler.start(alertsStore.getAlerts()); + logger.info("Alert manager has started"); } else { logger.info("Didn't start alert manager, because it state was [{}] while [{}] was expected", state.get(), State.LOADING); } diff --git a/src/main/java/org/elasticsearch/alerts/AlertingModule.java b/src/main/java/org/elasticsearch/alerts/AlertingModule.java index 597b2c21230..32bab823a51 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertingModule.java +++ b/src/main/java/org/elasticsearch/alerts/AlertingModule.java @@ -10,15 +10,13 @@ import org.elasticsearch.alerts.actions.AlertActionManager; import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.client.NodeAlertsClient; import org.elasticsearch.alerts.client.AlertsClient; -import org.elasticsearch.alerts.rest.RestAlertsStatsAction; -import org.elasticsearch.alerts.rest.RestDeleteAlertAction; -import org.elasticsearch.alerts.rest.RestGetAlertAction; -import org.elasticsearch.alerts.rest.RestPutAlertAction; +import org.elasticsearch.alerts.rest.*; import org.elasticsearch.alerts.scheduler.AlertScheduler; import org.elasticsearch.alerts.transport.actions.ack.TransportAckAlertAction; import org.elasticsearch.alerts.transport.actions.delete.TransportDeleteAlertAction; import org.elasticsearch.alerts.transport.actions.get.TransportGetAlertAction; import org.elasticsearch.alerts.transport.actions.put.TransportPutAlertAction; +import org.elasticsearch.alerts.transport.actions.service.TransportAlertsServiceAction; import org.elasticsearch.alerts.transport.actions.stats.TransportAlertStatsAction; import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.common.inject.AbstractModule; @@ -43,6 +41,7 @@ public class AlertingModule extends AbstractModule { bind(TransportGetAlertAction.class).asEagerSingleton(); bind(TransportAlertStatsAction.class).asEagerSingleton(); bind(TransportAckAlertAction.class).asEagerSingleton(); + bind(TransportAlertsServiceAction.class).asEagerSingleton(); bind(AlertsClient.class).to(NodeAlertsClient.class).asEagerSingleton(); // Rest layer @@ -50,6 +49,7 @@ public class AlertingModule extends AbstractModule { bind(RestDeleteAlertAction.class).asEagerSingleton(); bind(RestAlertsStatsAction.class).asEagerSingleton(); bind(RestGetAlertAction.class).asEagerSingleton(); + bind(RestAlertServiceAction.class).asEagerSingleton(); } } diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index 5b1243db9a3..afe4eafa12d 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -230,7 +230,6 @@ public class AlertsStore extends AbstractComponent { protected Alert parseAlert(String alertName, BytesReference source) { Alert alert = new Alert(); alert.alertName(alertName); - logger.error("Source : [{}]", source.toUtf8()); try (XContentParser parser = XContentHelper.createParser(source)) { String currentFieldName = null; XContentParser.Token token = parser.nextToken(); diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java index f5060bafd6c..e03352aa6f8 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java @@ -245,7 +245,7 @@ public class AlertActionManager extends AbstractComponent { .setSource(XContentFactory.jsonBuilder().value(entry)) .setOpType(IndexRequest.OpType.CREATE) .get(); - logger.info("Adding alert action for alert [{}]", alert.alertName()); + logger.debug("Adding alert action for alert [{}]", alert.alertName()); entry.setVersion(response.getVersion()); long currentSize = actionsToBeProcessed.size() + 1; actionsToBeProcessed.add(entry); @@ -277,12 +277,10 @@ public class AlertActionManager extends AbstractComponent { } public long getQueueSize() { - ensureStarted(); return actionsToBeProcessed.size(); } public long getLargestQueueSize() { - ensureStarted(); return largestQueueSize.get(); } @@ -313,7 +311,7 @@ public class AlertActionManager extends AbstractComponent { return; } updateHistoryEntry(entry, AlertActionState.SEARCH_UNDERWAY); - logger.info("Running an alert action entry for [{}]", entry.getAlertName()); + logger.debug("Running an alert action entry for [{}]", entry.getAlertName()); TriggerResult trigger = alertManager.executeAlert(entry); if (trigger.isTriggered()) { if (entry.getState() != AlertActionState.THROTTLED) { diff --git a/src/main/java/org/elasticsearch/alerts/client/AlertsClient.java b/src/main/java/org/elasticsearch/alerts/client/AlertsClient.java index b210d88c947..76ba9fd6748 100644 --- a/src/main/java/org/elasticsearch/alerts/client/AlertsClient.java +++ b/src/main/java/org/elasticsearch/alerts/client/AlertsClient.java @@ -19,6 +19,9 @@ import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse; import org.elasticsearch.alerts.transport.actions.put.PutAlertRequest; import org.elasticsearch.alerts.transport.actions.put.PutAlertRequestBuilder; import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse; +import org.elasticsearch.alerts.transport.actions.service.AlertServiceRequestBuilder; +import org.elasticsearch.alerts.transport.actions.service.AlertsServiceRequest; +import org.elasticsearch.alerts.transport.actions.service.AlertsServiceResponse; import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsRequest; import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsRequestBuilder; import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse; @@ -176,4 +179,19 @@ public interface AlertsClient extends ElasticsearchClient { */ ActionFuture ackAlert(AckAlertRequest request); + /** + * Prepare make an alert service request. + */ + AlertServiceRequestBuilder prepareAlertService(); + + /** + * Perform an alert service request to either start, stop or restart the alerting plugin. + */ + void alertService(AlertsServiceRequest request, ActionListener listener); + + /** + * Perform an alert service request to either start, stop or restart the alerting plugin. + */ + ActionFuture alertService(AlertsServiceRequest request); + } diff --git a/src/main/java/org/elasticsearch/alerts/client/NodeAlertsClient.java b/src/main/java/org/elasticsearch/alerts/client/NodeAlertsClient.java index 75accd545a7..d5bc9c33084 100644 --- a/src/main/java/org/elasticsearch/alerts/client/NodeAlertsClient.java +++ b/src/main/java/org/elasticsearch/alerts/client/NodeAlertsClient.java @@ -11,6 +11,7 @@ import org.elasticsearch.alerts.transport.actions.ack.*; import org.elasticsearch.alerts.transport.actions.delete.*; import org.elasticsearch.alerts.transport.actions.get.*; import org.elasticsearch.alerts.transport.actions.put.*; +import org.elasticsearch.alerts.transport.actions.service.*; import org.elasticsearch.alerts.transport.actions.stats.*; import org.elasticsearch.client.support.Headers; import org.elasticsearch.common.collect.ImmutableMap; @@ -26,7 +27,8 @@ public class NodeAlertsClient implements AlertsClient { @Inject public NodeAlertsClient(ThreadPool threadPool, Headers headers, TransportPutAlertAction transportPutAlertAction, TransportGetAlertAction transportGetAlertAction, TransportDeleteAlertAction transportDeleteAlertAction, - TransportAlertStatsAction transportAlertStatsAction, TransportAckAlertAction transportAckAlertAction) { + TransportAlertStatsAction transportAlertStatsAction, TransportAckAlertAction transportAckAlertAction, + TransportAlertsServiceAction transportAlertsServiceAction) { this.headers = headers; this.threadPool = threadPool; internalActions = ImmutableMap.builder() @@ -35,6 +37,7 @@ public class NodeAlertsClient implements AlertsClient { .put(DeleteAlertAction.INSTANCE, transportDeleteAlertAction) .put(AlertsStatsAction.INSTANCE, transportAlertStatsAction) .put(AckAlertAction.INSTANCE, transportAckAlertAction) + .put(AlertServiceAction.INSTANCE, transportAlertsServiceAction) .build(); } @@ -132,6 +135,21 @@ public class NodeAlertsClient implements AlertsClient { return execute(AckAlertAction.INSTANCE, request); } + @Override + public AlertServiceRequestBuilder prepareAlertService() { + return new AlertServiceRequestBuilder(this); + } + + @Override + public void alertService(AlertsServiceRequest request, ActionListener listener) { + execute(AlertServiceAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture alertService(AlertsServiceRequest request) { + return execute(AlertServiceAction.INSTANCE, request); + } + @SuppressWarnings("unchecked") @Override public > ActionFuture execute(Action action, Request request) { diff --git a/src/main/java/org/elasticsearch/alerts/rest/RestAlertServiceAction.java b/src/main/java/org/elasticsearch/alerts/rest/RestAlertServiceAction.java new file mode 100644 index 00000000000..5162c7fe343 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/rest/RestAlertServiceAction.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.rest; + +import org.elasticsearch.alerts.AlertsStore; +import org.elasticsearch.alerts.client.AlertsClient; +import org.elasticsearch.alerts.transport.actions.service.AlertsServiceRequest; +import org.elasticsearch.alerts.transport.actions.service.AlertsServiceResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.*; +import org.elasticsearch.rest.action.support.AcknowledgedRestListener; + +/** + */ +public class RestAlertServiceAction extends BaseRestHandler { + + private final AlertsClient alertsClient; + + @Inject + protected RestAlertServiceAction(Settings settings, RestController controller, Client client, AlertsClient alertsClient) { + super(settings, controller, client); + this.alertsClient = alertsClient; + controller.registerHandler(RestRequest.Method.PUT, AlertsStore.ALERT_INDEX + "/_restart", this); + controller.registerHandler(RestRequest.Method.PUT, AlertsStore.ALERT_INDEX + "/_start", new StartRestHandler(settings, controller, client)); + controller.registerHandler(RestRequest.Method.PUT, AlertsStore.ALERT_INDEX + "/_stop", new StopRestHandler(settings, controller, client)); + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + AlertsServiceRequest serviceRequest = new AlertsServiceRequest(); + serviceRequest.restart(); + alertsClient.alertService(serviceRequest, new AcknowledgedRestListener(channel)); + } + + final class StartRestHandler extends BaseRestHandler { + + public StartRestHandler(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + } + + @Override + public void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + AlertsServiceRequest serviceRequest = new AlertsServiceRequest(); + serviceRequest.start(); + alertsClient.alertService(serviceRequest, new AcknowledgedRestListener(channel)); + } + } + + final class StopRestHandler extends BaseRestHandler { + + public StopRestHandler(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + } + + @Override + public void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + AlertsServiceRequest serviceRequest = new AlertsServiceRequest(); + serviceRequest.stop(); + alertsClient.alertService(serviceRequest, new AcknowledgedRestListener(channel)); + } + } +} diff --git a/src/main/java/org/elasticsearch/alerts/transport/actions/service/AlertServiceAction.java b/src/main/java/org/elasticsearch/alerts/transport/actions/service/AlertServiceAction.java new file mode 100644 index 00000000000..cc373b07e36 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/transport/actions/service/AlertServiceAction.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.transport.actions.service; + +import org.elasticsearch.alerts.client.AlertsClient; +import org.elasticsearch.alerts.client.AlertsClientAction; + +/** + */ +public class AlertServiceAction extends AlertsClientAction { + + public static final AlertServiceAction INSTANCE = new AlertServiceAction(); + public static final String NAME = "cluster:admin/alerts/service"; + + private AlertServiceAction() { + super(NAME); + } + + @Override + public AlertsServiceResponse newResponse() { + return new AlertsServiceResponse(); + } + + @Override + public AlertServiceRequestBuilder newRequestBuilder(AlertsClient client) { + return new AlertServiceRequestBuilder(client); + } +} diff --git a/src/main/java/org/elasticsearch/alerts/transport/actions/service/AlertServiceRequestBuilder.java b/src/main/java/org/elasticsearch/alerts/transport/actions/service/AlertServiceRequestBuilder.java new file mode 100644 index 00000000000..5f9ad0eeaa9 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/transport/actions/service/AlertServiceRequestBuilder.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.transport.actions.service; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.alerts.client.AlertsClient; + +/** + */ +public class AlertServiceRequestBuilder extends MasterNodeOperationRequestBuilder { + + public AlertServiceRequestBuilder(AlertsClient client) { + super(client, new AlertsServiceRequest()); + } + + /** + * Starts alerting if not already started. + */ + public AlertServiceRequestBuilder start() { + request.start(); + return this; + } + + /** + * Stops alerting if not already stopped. + */ + public AlertServiceRequestBuilder stop() { + request.stop(); + return this; + } + + /** + * Starts and stops alerting. + */ + public AlertServiceRequestBuilder restart() { + request.restart(); + return this; + } + + @Override + protected void doExecute(ActionListener listener) { + client.alertService(request, listener); + } +} diff --git a/src/main/java/org/elasticsearch/alerts/transport/actions/service/AlertsServiceRequest.java b/src/main/java/org/elasticsearch/alerts/transport/actions/service/AlertsServiceRequest.java new file mode 100644 index 00000000000..26abe04f061 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/transport/actions/service/AlertsServiceRequest.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.transport.actions.service; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + */ +public class AlertsServiceRequest extends MasterNodeOperationRequest { + + private String command; + + /** + * Starts alerting if not already started. + */ + public void start() { + command = "start"; + } + + /** + * Stops alerting if not already stopped. + */ + public void stop() { + command = "stop"; + } + + /** + * Starts and stops alerting. + */ + public void restart() { + command = "restart"; + } + + String getCommand() { + return command; + } + + @Override + public ActionRequestValidationException validate() { + if (command == null) { + return ValidateActions.addValidationError("no command specified", null); + } else { + return null; + } + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + command = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(command); + } +} diff --git a/src/main/java/org/elasticsearch/alerts/transport/actions/service/AlertsServiceResponse.java b/src/main/java/org/elasticsearch/alerts/transport/actions/service/AlertsServiceResponse.java new file mode 100644 index 00000000000..f926ed5f6cd --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/transport/actions/service/AlertsServiceResponse.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.transport.actions.service; + +import org.elasticsearch.action.support.master.AcknowledgedResponse; + +/** + * Empty response, so if it returns, it means all is fine. + */ +public class AlertsServiceResponse extends AcknowledgedResponse { + + AlertsServiceResponse() { + } + + public AlertsServiceResponse(boolean acknowledged) { + super(acknowledged); + } +} diff --git a/src/main/java/org/elasticsearch/alerts/transport/actions/service/TransportAlertsServiceAction.java b/src/main/java/org/elasticsearch/alerts/transport/actions/service/TransportAlertsServiceAction.java new file mode 100644 index 00000000000..0cb6ea0af65 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/transport/actions/service/TransportAlertsServiceAction.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.transport.actions.service; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; +import org.elasticsearch.alerts.AlertManager; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +/** + */ +public class TransportAlertsServiceAction extends TransportMasterNodeOperationAction { + + private final AlertManager alertManager; + + @Inject + public TransportAlertsServiceAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, AlertManager alertManager) { + super(settings, actionName, transportService, clusterService, threadPool, actionFilters); + this.alertManager = alertManager; + } + + @Override + protected String executor() { + return ThreadPool.Names.MANAGEMENT; + } + + @Override + protected AlertsServiceRequest newRequest() { + return new AlertsServiceRequest(); + } + + @Override + protected AlertsServiceResponse newResponse() { + return new AlertsServiceResponse(); + } + + @Override + protected void masterOperation(AlertsServiceRequest request, ClusterState state, ActionListener listener) throws ElasticsearchException { + switch (request.getCommand()) { + case "start": + alertManager.start(); + break; + case "stop": + alertManager.stop(); + break; + case "restart": + alertManager.start(); + alertManager.stop(); + break; + default: + listener.onFailure(new ElasticsearchIllegalArgumentException("Command [" + request.getCommand() + "] is undefined")); + return; + } + listener.onResponse(new AlertsServiceResponse(true)); + } + + @Override + protected ClusterBlockException checkBlock(AlertsServiceRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA); + } +} diff --git a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java index 1a8cce3fbe8..8e5a6e1506f 100644 --- a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java +++ b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java @@ -159,6 +159,21 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest }); } + protected void startAlerting() throws Exception { + alertClient().prepareAlertService().start().get(); + ensureAlertingStarted(); + } + + protected void stopAlerting() throws Exception { + alertClient().prepareAlertService().stop().get(); + assertBusy(new Runnable() { + @Override + public void run() { + assertThat(alertClient().prepareAlertsStats().get().isAlertManagerStarted(), is(false)); + } + }); + } + protected static InternalTestCluster internalTestCluster() { return (InternalTestCluster) ((AlertingWrappingCluster) cluster()).testCluster; } diff --git a/src/test/java/org/elasticsearch/alerts/BootStrapTest.java b/src/test/java/org/elasticsearch/alerts/BootStrapTest.java index a0d32f2bf35..f8cc7a7c79d 100644 --- a/src/test/java/org/elasticsearch/alerts/BootStrapTest.java +++ b/src/test/java/org/elasticsearch/alerts/BootStrapTest.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.alerts; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; @@ -13,7 +12,6 @@ import org.elasticsearch.alerts.actions.AlertAction; import org.elasticsearch.alerts.actions.AlertActionEntry; import org.elasticsearch.alerts.actions.AlertActionManager; import org.elasticsearch.alerts.actions.AlertActionState; -import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsRequest; import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse; import org.elasticsearch.alerts.triggers.ScriptedTrigger; import org.elasticsearch.common.bytes.BytesReference; @@ -24,9 +22,7 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; -import java.io.IOException; import java.util.ArrayList; -import java.util.concurrent.TimeUnit; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; @@ -49,28 +45,10 @@ public class BootStrapTest extends AbstractAlertingTests { .get(); client().admin().indices().prepareRefresh(AlertsStore.ALERT_INDEX).get(); + stopAlerting(); + startAlerting(); - String oldMaster = internalTestCluster().getMasterName(); - try { - internalTestCluster().stopCurrentMasterNode(); - } catch (IOException ioe) { - throw new ElasticsearchException("Failed to stop current master", ioe); - } - - //Wait for alerts to start - TimeValue maxTime = new TimeValue(30, TimeUnit.SECONDS); - Thread.sleep(maxTime.getMillis()); - - String newMaster = internalTestCluster().getMasterName(); - assertFalse(newMaster.equals(oldMaster)); - logger.info("Switched master from [{}] to [{}]",oldMaster,newMaster); - - AlertsStatsRequest alertsStatsRequest = alertClient().prepareAlertsStats().request(); - AlertsStatsResponse response = alertClient().alertsStats(alertsStatsRequest).actionGet(); - - alertsStatsRequest = alertClient().prepareAlertsStats().request(); - response = alertClient().alertsStats(alertsStatsRequest).actionGet(); - + AlertsStatsResponse response = alertClient().prepareAlertsStats().get(); assertTrue(response.isAlertActionManagerStarted()); assertTrue(response.isAlertManagerStarted()); assertThat(response.getNumberOfRegisteredAlerts(), equalTo(1L)); @@ -81,14 +59,12 @@ public class BootStrapTest extends AbstractAlertingTests { ensureAlertingStarted(); internalTestCluster().ensureAtLeastNumDataNodes(2); - SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value"))); - AlertsStatsRequest alertsStatsRequest = alertClient().prepareAlertsStats().request(); - AlertsStatsResponse response = alertClient().alertsStats(alertsStatsRequest).actionGet(); - + AlertsStatsResponse response = alertClient().prepareAlertsStats().get(); assertTrue(response.isAlertActionManagerStarted()); assertTrue(response.isAlertManagerStarted()); assertThat(response.getNumberOfRegisteredAlerts(), equalTo(0L)); + SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value"))); Alert alert = new Alert("my-first-alert", searchRequest, new ScriptedTrigger("hits.total == 1", ScriptService.ScriptType.INLINE, "groovy"), @@ -107,27 +83,12 @@ public class BootStrapTest extends AbstractAlertingTests { .get(); assertTrue(indexResponse.isCreated()); - String oldMaster = internalTestCluster().getMasterName(); - try { - internalTestCluster().stopCurrentMasterNode(); - } catch (IOException ioe) { - throw new ElasticsearchException("Failed to stop current master", ioe); - } - - //Wait for alerts to start - TimeValue maxTime = new TimeValue(30, TimeUnit.SECONDS); - Thread.sleep(maxTime.getMillis()); - - String newMaster = internalTestCluster().getMasterName(); - assertFalse(newMaster.equals(oldMaster)); - logger.info("Switched master from [{}] to [{}]",oldMaster,newMaster); - - alertsStatsRequest = alertClient().prepareAlertsStats().request(); - response = alertClient().alertsStats(alertsStatsRequest).actionGet(); + stopAlerting(); + startAlerting(); + response = alertClient().prepareAlertsStats().get(); assertTrue(response.isAlertActionManagerStarted()); assertTrue(response.isAlertManagerStarted()); - assertThat(response.getNumberOfRegisteredAlerts(), equalTo(0L)); assertThat(response.getAlertActionManagerLargestQueueSize(), equalTo(1L)); }