Original commit: elastic/x-pack-elasticsearch@7aee9aef96
This commit is contained in:
Brian Murphy 2014-11-21 15:52:08 +00:00
commit c471abdab5
14 changed files with 400 additions and 61 deletions

View File

@ -49,6 +49,9 @@ public class AlertManager extends AbstractComponent {
private final ClusterService clusterService; private final ClusterService clusterService;
private final KeyedLock<String> alertLock = new KeyedLock<>(); private final KeyedLock<String> alertLock = new KeyedLock<>();
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED); private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
private final AlertsClusterStateListener alertsClusterStateListener = new AlertsClusterStateListener();
private volatile boolean manuallyStopped;
@Inject @Inject
public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore, public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore,
@ -64,13 +67,14 @@ public class AlertManager extends AbstractComponent {
this.actionManager.setAlertManager(this); this.actionManager.setAlertManager(this);
this.actionRegistry = actionRegistry; this.actionRegistry = actionRegistry;
this.clusterService = clusterService; this.clusterService = clusterService;
clusterService.add(new AlertsClusterStateListener()); clusterService.add(alertsClusterStateListener);
manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true);
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will // Close if the indices service is being stopped, so we don't run into search failures (locally) that will
// happen because we're shutting down and an alert is scheduled. // happen because we're shutting down and an alert is scheduled.
indicesService.addLifecycleListener(new LifecycleListener() { indicesService.addLifecycleListener(new LifecycleListener() {
@Override @Override
public void beforeStop() { public void beforeStop() {
stop(); internalStop();
} }
}); });
@ -177,12 +181,25 @@ public class AlertManager extends AbstractComponent {
return false; return false;
} }
public void start() {
if (state.compareAndSet(State.STOPPED, State.LOADING)) {
manuallyStopped = false;
logger.info("Starting alert manager...");
ClusterState state = clusterService.state();
alertsClusterStateListener.initialize(state);
}
}
public void stop() {
manuallyStopped = true;
internalStop();
}
// This is synchronized, because this may first be called from the cluster changed event and then from before close // This is synchronized, because this may first be called from the cluster changed event and then from before close
// when a node closes. The stop also stops the scheduler which has several background threads. If this method is // when a node closes. The stop also stops the scheduler which has several background threads. If this method is
// invoked in that order that node closes and the test framework complains then about the fact that there are still // invoked in that order that node closes and the test framework complains then about the fact that there are still
// threads alive. // threads alive.
public synchronized void stop() { private synchronized void internalStop() {
if (state.compareAndSet(State.LOADING, State.STOPPED) || state.compareAndSet(State.STARTED, State.STOPPED)) { if (state.compareAndSet(State.LOADING, State.STOPPED) || state.compareAndSet(State.STARTED, State.STOPPED)) {
logger.info("Stopping alert manager..."); logger.info("Stopping alert manager...");
actionManager.stop(); actionManager.stop();
@ -251,7 +268,7 @@ public class AlertManager extends AbstractComponent {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override @Override
public void run() { public void run() {
stop(); internalStop();
} }
}); });
} else { } else {
@ -267,6 +284,10 @@ public class AlertManager extends AbstractComponent {
} }
private void initialize(final ClusterState state) { private void initialize(final ClusterState state) {
if (manuallyStopped) {
return;
}
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -293,6 +314,7 @@ public class AlertManager extends AbstractComponent {
if (alertsStore.started() && actionManager.started()) { if (alertsStore.started() && actionManager.started()) {
if (state.compareAndSet(State.LOADING, State.STARTED)) { if (state.compareAndSet(State.LOADING, State.STARTED)) {
scheduler.start(alertsStore.getAlerts()); scheduler.start(alertsStore.getAlerts());
logger.info("Alert manager has started");
} else { } else {
logger.info("Didn't start alert manager, because it state was [{}] while [{}] was expected", state.get(), State.LOADING); logger.info("Didn't start alert manager, because it state was [{}] while [{}] was expected", state.get(), State.LOADING);
} }

View File

@ -10,15 +10,13 @@ import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.client.NodeAlertsClient; import org.elasticsearch.alerts.client.NodeAlertsClient;
import org.elasticsearch.alerts.client.AlertsClient; import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.rest.RestAlertsStatsAction; import org.elasticsearch.alerts.rest.*;
import org.elasticsearch.alerts.rest.RestDeleteAlertAction;
import org.elasticsearch.alerts.rest.RestGetAlertAction;
import org.elasticsearch.alerts.rest.RestPutAlertAction;
import org.elasticsearch.alerts.scheduler.AlertScheduler; import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.alerts.transport.actions.ack.TransportAckAlertAction; import org.elasticsearch.alerts.transport.actions.ack.TransportAckAlertAction;
import org.elasticsearch.alerts.transport.actions.delete.TransportDeleteAlertAction; import org.elasticsearch.alerts.transport.actions.delete.TransportDeleteAlertAction;
import org.elasticsearch.alerts.transport.actions.get.TransportGetAlertAction; import org.elasticsearch.alerts.transport.actions.get.TransportGetAlertAction;
import org.elasticsearch.alerts.transport.actions.put.TransportPutAlertAction; import org.elasticsearch.alerts.transport.actions.put.TransportPutAlertAction;
import org.elasticsearch.alerts.transport.actions.service.TransportAlertsServiceAction;
import org.elasticsearch.alerts.transport.actions.stats.TransportAlertStatsAction; import org.elasticsearch.alerts.transport.actions.stats.TransportAlertStatsAction;
import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
@ -43,6 +41,7 @@ public class AlertingModule extends AbstractModule {
bind(TransportGetAlertAction.class).asEagerSingleton(); bind(TransportGetAlertAction.class).asEagerSingleton();
bind(TransportAlertStatsAction.class).asEagerSingleton(); bind(TransportAlertStatsAction.class).asEagerSingleton();
bind(TransportAckAlertAction.class).asEagerSingleton(); bind(TransportAckAlertAction.class).asEagerSingleton();
bind(TransportAlertsServiceAction.class).asEagerSingleton();
bind(AlertsClient.class).to(NodeAlertsClient.class).asEagerSingleton(); bind(AlertsClient.class).to(NodeAlertsClient.class).asEagerSingleton();
// Rest layer // Rest layer
@ -50,6 +49,7 @@ public class AlertingModule extends AbstractModule {
bind(RestDeleteAlertAction.class).asEagerSingleton(); bind(RestDeleteAlertAction.class).asEagerSingleton();
bind(RestAlertsStatsAction.class).asEagerSingleton(); bind(RestAlertsStatsAction.class).asEagerSingleton();
bind(RestGetAlertAction.class).asEagerSingleton(); bind(RestGetAlertAction.class).asEagerSingleton();
bind(RestAlertServiceAction.class).asEagerSingleton();
} }
} }

View File

@ -230,7 +230,6 @@ public class AlertsStore extends AbstractComponent {
protected Alert parseAlert(String alertName, BytesReference source) { protected Alert parseAlert(String alertName, BytesReference source) {
Alert alert = new Alert(); Alert alert = new Alert();
alert.alertName(alertName); alert.alertName(alertName);
logger.error("Source : [{}]", source.toUtf8());
try (XContentParser parser = XContentHelper.createParser(source)) { try (XContentParser parser = XContentHelper.createParser(source)) {
String currentFieldName = null; String currentFieldName = null;
XContentParser.Token token = parser.nextToken(); XContentParser.Token token = parser.nextToken();

View File

@ -245,7 +245,7 @@ public class AlertActionManager extends AbstractComponent {
.setSource(XContentFactory.jsonBuilder().value(entry)) .setSource(XContentFactory.jsonBuilder().value(entry))
.setOpType(IndexRequest.OpType.CREATE) .setOpType(IndexRequest.OpType.CREATE)
.get(); .get();
logger.info("Adding alert action for alert [{}]", alert.alertName()); logger.debug("Adding alert action for alert [{}]", alert.alertName());
entry.setVersion(response.getVersion()); entry.setVersion(response.getVersion());
long currentSize = actionsToBeProcessed.size() + 1; long currentSize = actionsToBeProcessed.size() + 1;
actionsToBeProcessed.add(entry); actionsToBeProcessed.add(entry);
@ -277,12 +277,10 @@ public class AlertActionManager extends AbstractComponent {
} }
public long getQueueSize() { public long getQueueSize() {
ensureStarted();
return actionsToBeProcessed.size(); return actionsToBeProcessed.size();
} }
public long getLargestQueueSize() { public long getLargestQueueSize() {
ensureStarted();
return largestQueueSize.get(); return largestQueueSize.get();
} }
@ -313,7 +311,7 @@ public class AlertActionManager extends AbstractComponent {
return; return;
} }
updateHistoryEntry(entry, AlertActionState.SEARCH_UNDERWAY); updateHistoryEntry(entry, AlertActionState.SEARCH_UNDERWAY);
logger.info("Running an alert action entry for [{}]", entry.getAlertName()); logger.debug("Running an alert action entry for [{}]", entry.getAlertName());
TriggerResult trigger = alertManager.executeAlert(entry); TriggerResult trigger = alertManager.executeAlert(entry);
if (trigger.isTriggered()) { if (trigger.isTriggered()) {
if (entry.getState() != AlertActionState.THROTTLED) { if (entry.getState() != AlertActionState.THROTTLED) {

View File

@ -19,6 +19,9 @@ import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse;
import org.elasticsearch.alerts.transport.actions.put.PutAlertRequest; import org.elasticsearch.alerts.transport.actions.put.PutAlertRequest;
import org.elasticsearch.alerts.transport.actions.put.PutAlertRequestBuilder; import org.elasticsearch.alerts.transport.actions.put.PutAlertRequestBuilder;
import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse; import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse;
import org.elasticsearch.alerts.transport.actions.service.AlertServiceRequestBuilder;
import org.elasticsearch.alerts.transport.actions.service.AlertsServiceRequest;
import org.elasticsearch.alerts.transport.actions.service.AlertsServiceResponse;
import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsRequest; import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsRequest;
import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsRequestBuilder; import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsRequestBuilder;
import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse; import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse;
@ -176,4 +179,19 @@ public interface AlertsClient extends ElasticsearchClient<AlertsClient> {
*/ */
ActionFuture<AckAlertResponse> ackAlert(AckAlertRequest request); ActionFuture<AckAlertResponse> ackAlert(AckAlertRequest request);
/**
* Prepare make an alert service request.
*/
AlertServiceRequestBuilder prepareAlertService();
/**
* Perform an alert service request to either start, stop or restart the alerting plugin.
*/
void alertService(AlertsServiceRequest request, ActionListener<AlertsServiceResponse> listener);
/**
* Perform an alert service request to either start, stop or restart the alerting plugin.
*/
ActionFuture<AlertsServiceResponse> alertService(AlertsServiceRequest request);
} }

View File

@ -11,6 +11,7 @@ import org.elasticsearch.alerts.transport.actions.ack.*;
import org.elasticsearch.alerts.transport.actions.delete.*; import org.elasticsearch.alerts.transport.actions.delete.*;
import org.elasticsearch.alerts.transport.actions.get.*; import org.elasticsearch.alerts.transport.actions.get.*;
import org.elasticsearch.alerts.transport.actions.put.*; import org.elasticsearch.alerts.transport.actions.put.*;
import org.elasticsearch.alerts.transport.actions.service.*;
import org.elasticsearch.alerts.transport.actions.stats.*; import org.elasticsearch.alerts.transport.actions.stats.*;
import org.elasticsearch.client.support.Headers; import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
@ -26,7 +27,8 @@ public class NodeAlertsClient implements AlertsClient {
@Inject @Inject
public NodeAlertsClient(ThreadPool threadPool, Headers headers, TransportPutAlertAction transportPutAlertAction, public NodeAlertsClient(ThreadPool threadPool, Headers headers, TransportPutAlertAction transportPutAlertAction,
TransportGetAlertAction transportGetAlertAction, TransportDeleteAlertAction transportDeleteAlertAction, TransportGetAlertAction transportGetAlertAction, TransportDeleteAlertAction transportDeleteAlertAction,
TransportAlertStatsAction transportAlertStatsAction, TransportAckAlertAction transportAckAlertAction) { TransportAlertStatsAction transportAlertStatsAction, TransportAckAlertAction transportAckAlertAction,
TransportAlertsServiceAction transportAlertsServiceAction) {
this.headers = headers; this.headers = headers;
this.threadPool = threadPool; this.threadPool = threadPool;
internalActions = ImmutableMap.<GenericAction, TransportAction>builder() internalActions = ImmutableMap.<GenericAction, TransportAction>builder()
@ -35,6 +37,7 @@ public class NodeAlertsClient implements AlertsClient {
.put(DeleteAlertAction.INSTANCE, transportDeleteAlertAction) .put(DeleteAlertAction.INSTANCE, transportDeleteAlertAction)
.put(AlertsStatsAction.INSTANCE, transportAlertStatsAction) .put(AlertsStatsAction.INSTANCE, transportAlertStatsAction)
.put(AckAlertAction.INSTANCE, transportAckAlertAction) .put(AckAlertAction.INSTANCE, transportAckAlertAction)
.put(AlertServiceAction.INSTANCE, transportAlertsServiceAction)
.build(); .build();
} }
@ -132,6 +135,21 @@ public class NodeAlertsClient implements AlertsClient {
return execute(AckAlertAction.INSTANCE, request); return execute(AckAlertAction.INSTANCE, request);
} }
@Override
public AlertServiceRequestBuilder prepareAlertService() {
return new AlertServiceRequestBuilder(this);
}
@Override
public void alertService(AlertsServiceRequest request, ActionListener<AlertsServiceResponse> listener) {
execute(AlertServiceAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<AlertsServiceResponse> alertService(AlertsServiceRequest request) {
return execute(AlertServiceAction.INSTANCE, request);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, AlertsClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, AlertsClient> action, Request request) { public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, AlertsClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, AlertsClient> action, Request request) {

View File

@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.rest;
import org.elasticsearch.alerts.AlertsStore;
import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.transport.actions.service.AlertsServiceRequest;
import org.elasticsearch.alerts.transport.actions.service.AlertsServiceResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.AcknowledgedRestListener;
/**
*/
public class RestAlertServiceAction extends BaseRestHandler {
private final AlertsClient alertsClient;
@Inject
protected RestAlertServiceAction(Settings settings, RestController controller, Client client, AlertsClient alertsClient) {
super(settings, controller, client);
this.alertsClient = alertsClient;
controller.registerHandler(RestRequest.Method.PUT, AlertsStore.ALERT_INDEX + "/_restart", this);
controller.registerHandler(RestRequest.Method.PUT, AlertsStore.ALERT_INDEX + "/_start", new StartRestHandler(settings, controller, client));
controller.registerHandler(RestRequest.Method.PUT, AlertsStore.ALERT_INDEX + "/_stop", new StopRestHandler(settings, controller, client));
}
@Override
protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
AlertsServiceRequest serviceRequest = new AlertsServiceRequest();
serviceRequest.restart();
alertsClient.alertService(serviceRequest, new AcknowledgedRestListener<AlertsServiceResponse>(channel));
}
final class StartRestHandler extends BaseRestHandler {
public StartRestHandler(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
}
@Override
public void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
AlertsServiceRequest serviceRequest = new AlertsServiceRequest();
serviceRequest.start();
alertsClient.alertService(serviceRequest, new AcknowledgedRestListener<AlertsServiceResponse>(channel));
}
}
final class StopRestHandler extends BaseRestHandler {
public StopRestHandler(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
}
@Override
public void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
AlertsServiceRequest serviceRequest = new AlertsServiceRequest();
serviceRequest.stop();
alertsClient.alertService(serviceRequest, new AcknowledgedRestListener<AlertsServiceResponse>(channel));
}
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.transport.actions.service;
import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.client.AlertsClientAction;
/**
*/
public class AlertServiceAction extends AlertsClientAction<AlertsServiceRequest, AlertsServiceResponse, AlertServiceRequestBuilder> {
public static final AlertServiceAction INSTANCE = new AlertServiceAction();
public static final String NAME = "cluster:admin/alerts/service";
private AlertServiceAction() {
super(NAME);
}
@Override
public AlertsServiceResponse newResponse() {
return new AlertsServiceResponse();
}
@Override
public AlertServiceRequestBuilder newRequestBuilder(AlertsClient client) {
return new AlertServiceRequestBuilder(client);
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.transport.actions.service;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.alerts.client.AlertsClient;
/**
*/
public class AlertServiceRequestBuilder extends MasterNodeOperationRequestBuilder<AlertsServiceRequest, AlertsServiceResponse, AlertServiceRequestBuilder, AlertsClient> {
public AlertServiceRequestBuilder(AlertsClient client) {
super(client, new AlertsServiceRequest());
}
/**
* Starts alerting if not already started.
*/
public AlertServiceRequestBuilder start() {
request.start();
return this;
}
/**
* Stops alerting if not already stopped.
*/
public AlertServiceRequestBuilder stop() {
request.stop();
return this;
}
/**
* Starts and stops alerting.
*/
public AlertServiceRequestBuilder restart() {
request.restart();
return this;
}
@Override
protected void doExecute(ActionListener<AlertsServiceResponse> listener) {
client.alertService(request, listener);
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.transport.actions.service;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*/
public class AlertsServiceRequest extends MasterNodeOperationRequest<AlertsServiceRequest> {
private String command;
/**
* Starts alerting if not already started.
*/
public void start() {
command = "start";
}
/**
* Stops alerting if not already stopped.
*/
public void stop() {
command = "stop";
}
/**
* Starts and stops alerting.
*/
public void restart() {
command = "restart";
}
String getCommand() {
return command;
}
@Override
public ActionRequestValidationException validate() {
if (command == null) {
return ValidateActions.addValidationError("no command specified", null);
} else {
return null;
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
command = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(command);
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.transport.actions.service;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
/**
* Empty response, so if it returns, it means all is fine.
*/
public class AlertsServiceResponse extends AcknowledgedResponse {
AlertsServiceResponse() {
}
public AlertsServiceResponse(boolean acknowledged) {
super(acknowledged);
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.transport.actions.service;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
/**
*/
public class TransportAlertsServiceAction extends TransportMasterNodeOperationAction<AlertsServiceRequest, AlertsServiceResponse> {
private final AlertManager alertManager;
@Inject
public TransportAlertsServiceAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, AlertManager alertManager) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters);
this.alertManager = alertManager;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected AlertsServiceRequest newRequest() {
return new AlertsServiceRequest();
}
@Override
protected AlertsServiceResponse newResponse() {
return new AlertsServiceResponse();
}
@Override
protected void masterOperation(AlertsServiceRequest request, ClusterState state, ActionListener<AlertsServiceResponse> listener) throws ElasticsearchException {
switch (request.getCommand()) {
case "start":
alertManager.start();
break;
case "stop":
alertManager.stop();
break;
case "restart":
alertManager.start();
alertManager.stop();
break;
default:
listener.onFailure(new ElasticsearchIllegalArgumentException("Command [" + request.getCommand() + "] is undefined"));
return;
}
listener.onResponse(new AlertsServiceResponse(true));
}
@Override
protected ClusterBlockException checkBlock(AlertsServiceRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
}

View File

@ -159,6 +159,21 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
}); });
} }
protected void startAlerting() throws Exception {
alertClient().prepareAlertService().start().get();
ensureAlertingStarted();
}
protected void stopAlerting() throws Exception {
alertClient().prepareAlertService().stop().get();
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(alertClient().prepareAlertsStats().get().isAlertManagerStarted(), is(false));
}
});
}
protected static InternalTestCluster internalTestCluster() { protected static InternalTestCluster internalTestCluster() {
return (InternalTestCluster) ((AlertingWrappingCluster) cluster()).testCluster; return (InternalTestCluster) ((AlertingWrappingCluster) cluster()).testCluster;
} }

View File

@ -5,7 +5,6 @@
*/ */
package org.elasticsearch.alerts; package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
@ -13,7 +12,6 @@ import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionEntry; import org.elasticsearch.alerts.actions.AlertActionEntry;
import org.elasticsearch.alerts.actions.AlertActionManager; import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.actions.AlertActionState; import org.elasticsearch.alerts.actions.AlertActionState;
import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsRequest;
import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse; import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse;
import org.elasticsearch.alerts.triggers.ScriptedTrigger; import org.elasticsearch.alerts.triggers.ScriptedTrigger;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
@ -24,9 +22,7 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
@ -49,28 +45,10 @@ public class BootStrapTest extends AbstractAlertingTests {
.get(); .get();
client().admin().indices().prepareRefresh(AlertsStore.ALERT_INDEX).get(); client().admin().indices().prepareRefresh(AlertsStore.ALERT_INDEX).get();
stopAlerting();
startAlerting();
String oldMaster = internalTestCluster().getMasterName(); AlertsStatsResponse response = alertClient().prepareAlertsStats().get();
try {
internalTestCluster().stopCurrentMasterNode();
} catch (IOException ioe) {
throw new ElasticsearchException("Failed to stop current master", ioe);
}
//Wait for alerts to start
TimeValue maxTime = new TimeValue(30, TimeUnit.SECONDS);
Thread.sleep(maxTime.getMillis());
String newMaster = internalTestCluster().getMasterName();
assertFalse(newMaster.equals(oldMaster));
logger.info("Switched master from [{}] to [{}]",oldMaster,newMaster);
AlertsStatsRequest alertsStatsRequest = alertClient().prepareAlertsStats().request();
AlertsStatsResponse response = alertClient().alertsStats(alertsStatsRequest).actionGet();
alertsStatsRequest = alertClient().prepareAlertsStats().request();
response = alertClient().alertsStats(alertsStatsRequest).actionGet();
assertTrue(response.isAlertActionManagerStarted()); assertTrue(response.isAlertActionManagerStarted());
assertTrue(response.isAlertManagerStarted()); assertTrue(response.isAlertManagerStarted());
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(1L)); assertThat(response.getNumberOfRegisteredAlerts(), equalTo(1L));
@ -81,14 +59,12 @@ public class BootStrapTest extends AbstractAlertingTests {
ensureAlertingStarted(); ensureAlertingStarted();
internalTestCluster().ensureAtLeastNumDataNodes(2); internalTestCluster().ensureAtLeastNumDataNodes(2);
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value"))); AlertsStatsResponse response = alertClient().prepareAlertsStats().get();
AlertsStatsRequest alertsStatsRequest = alertClient().prepareAlertsStats().request();
AlertsStatsResponse response = alertClient().alertsStats(alertsStatsRequest).actionGet();
assertTrue(response.isAlertActionManagerStarted()); assertTrue(response.isAlertActionManagerStarted());
assertTrue(response.isAlertManagerStarted()); assertTrue(response.isAlertManagerStarted());
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(0L)); assertThat(response.getNumberOfRegisteredAlerts(), equalTo(0L));
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
Alert alert = new Alert("my-first-alert", Alert alert = new Alert("my-first-alert",
searchRequest, searchRequest,
new ScriptedTrigger("hits.total == 1", ScriptService.ScriptType.INLINE, "groovy"), new ScriptedTrigger("hits.total == 1", ScriptService.ScriptType.INLINE, "groovy"),
@ -107,27 +83,12 @@ public class BootStrapTest extends AbstractAlertingTests {
.get(); .get();
assertTrue(indexResponse.isCreated()); assertTrue(indexResponse.isCreated());
String oldMaster = internalTestCluster().getMasterName(); stopAlerting();
try { startAlerting();
internalTestCluster().stopCurrentMasterNode();
} catch (IOException ioe) {
throw new ElasticsearchException("Failed to stop current master", ioe);
}
//Wait for alerts to start
TimeValue maxTime = new TimeValue(30, TimeUnit.SECONDS);
Thread.sleep(maxTime.getMillis());
String newMaster = internalTestCluster().getMasterName();
assertFalse(newMaster.equals(oldMaster));
logger.info("Switched master from [{}] to [{}]",oldMaster,newMaster);
alertsStatsRequest = alertClient().prepareAlertsStats().request();
response = alertClient().alertsStats(alertsStatsRequest).actionGet();
response = alertClient().prepareAlertsStats().get();
assertTrue(response.isAlertActionManagerStarted()); assertTrue(response.isAlertActionManagerStarted());
assertTrue(response.isAlertManagerStarted()); assertTrue(response.isAlertManagerStarted());
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(0L)); assertThat(response.getNumberOfRegisteredAlerts(), equalTo(0L));
assertThat(response.getAlertActionManagerLargestQueueSize(), equalTo(1L)); assertThat(response.getAlertActionManagerLargestQueueSize(), equalTo(1L));
} }