Alerts: Transport actions

Add the UpdateAction (oops) and fix the test.

Original commit: elastic/x-pack-elasticsearch@ead7d446d7
This commit is contained in:
Brian Murphy 2014-11-04 16:49:09 +00:00
parent 75ce20ecff
commit a05fc88f0d
5 changed files with 153 additions and 18 deletions

View File

@ -154,6 +154,14 @@ public class AlertManager extends AbstractComponent {
return alertsStore.getAlert(alertName);
}
public boolean updateAlert(Alert alert) {
if (!alertsStore.hasAlert(alert.alertName())) {
return false;
}
return alertsStore.updateAlert(alert);
}
private final class AlertsClusterStateListener implements ClusterStateListener {
@Override

View File

@ -121,10 +121,14 @@ public class AlertsStore extends AbstractComponent {
}
public boolean updateAlert(Alert alert) {
return updateAlert(alert, false);
}
/**
* Updates the specified alert by making sure that the made changes are persisted.
*/
public void updateAlert(Alert alert) {
public boolean updateAlert(Alert alert, boolean updateMap) {
IndexRequest updateRequest = new IndexRequest();
updateRequest.index(ALERT_INDEX);
updateRequest.type(ALERT_TYPE);
@ -142,8 +146,13 @@ public class AlertsStore extends AbstractComponent {
IndexResponse response = client.index(updateRequest).actionGet();
alert.version(response.getVersion());
// Don't need to update the alertMap, since we are working on an instance from it.
assert alertMap.get(alert.alertName()) == alert;
if (updateMap) {
alertMap.put(alert.alertName(), alert);
} else {
// Don'<></> need to update the alertMap, since we are working on an instance from it.
assert alertMap.get(alert.alertName()) == alert;
}
return true;
}
/**

View File

@ -50,7 +50,12 @@ public class TransportUpdateAlertAction extends TransportMasterNodeOperationActi
@Override
protected void masterOperation(UpdateAlertRequest request, ClusterState state, ActionListener<UpdateAlertResponse> listener) throws ElasticsearchException {
try {
boolean success = alertManager.updateAlert(request.alert());
listener.onResponse(new UpdateAlertResponse(success));
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override

View File

@ -6,8 +6,43 @@
package org.elasticsearch.alerts.transport.actions.update;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*/
public class UpdateAlertResponse extends ActionResponse {
boolean success = false;
public UpdateAlertResponse() {
}
public UpdateAlertResponse(boolean success) {
this.success = success;
}
public boolean success() {
return success;
}
public void success(boolean success) {
this.success = success;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
success = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(success);
}
}

View File

@ -9,6 +9,7 @@ package org.elasticsearch.alerts.actions;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.AlertsStore;
import org.elasticsearch.alerts.BasicAlertingTest;
@ -17,13 +18,23 @@ import org.elasticsearch.alerts.client.AlertsClientInterface;
import org.elasticsearch.alerts.plugin.AlertsPlugin;
import org.elasticsearch.alerts.transport.actions.create.CreateAlertRequest;
import org.elasticsearch.alerts.transport.actions.create.CreateAlertResponse;
import org.elasticsearch.alerts.transport.actions.get.GetAlertRequest;
import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse;
import org.elasticsearch.alerts.transport.actions.update.UpdateAlertRequest;
import org.elasticsearch.alerts.transport.actions.update.UpdateAlertResponse;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.ScriptedAlertTrigger;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.core.DateFieldMapper;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
@ -33,11 +44,11 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.core.Is.is;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -130,23 +141,90 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
.setSource(jsonBuilder().startObject().startObject("template").startObject("match_all").endObject().endObject().endObject())
.get();
CreateAlertRequest alertRequest = new CreateAlertRequest("myAlert");
alertRequest.queryName("test-query");
alertRequest.enabled(true);
alertRequest.schedule("0/5 * * * * ? *");
alertRequest.trigger(new AlertTrigger(AlertTrigger.SimpleTrigger.GREATER_THAN, AlertTrigger.TriggerType.NUMBER_OF_EVENTS, 1));
alertRequest.timePeriod(new TimeValue(300, TimeUnit.SECONDS));
alertRequest.actions(new ArrayList<String>());
alertRequest.lastRan(new DateTime());
alertRequest.lastActionFire(new DateTime());
alertRequest.running(new DateTime());
final AlertManager alertManager = internalCluster().getInstance(AlertManager.class, internalCluster().getMasterName());
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(alertManager.isStarted(), is(true));
}
});
final AtomicBoolean alertActionInvoked = new AtomicBoolean(false);
final AlertAction alertAction = new AlertAction() {
@Override
public String getActionName() {
return "test";
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.endObject();
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
}
@Override
public void readFrom(StreamInput in) throws IOException {
}
@Override
public boolean doAction(Alert alert, AlertActionEntry actionEntry) {
logger.info("Alert {} invoked: {}", alert.alertName(), actionEntry);
alertActionInvoked.set(true);
return true;
}
};
AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName());
alertActionRegistry.registerAction("test", new AlertActionFactory() {
@Override
public AlertAction createAction(XContentParser parser) throws IOException {
parser.nextToken();
return alertAction;
}
@Override
public AlertAction readFrom(StreamInput in) throws IOException {
return alertAction;
}
});
AlertTrigger alertTrigger = new AlertTrigger(new ScriptedAlertTrigger("return true", ScriptService.ScriptType.INLINE, "groovy"));
Alert alert = new Alert(
"my-first-alert",
client().prepareSearch("my-index").setQuery(QueryBuilders.matchAllQuery()).request(),
alertTrigger,
Arrays.asList(alertAction),
"0/5 * * * * ? *",
null,
1,
true
);
CreateAlertRequest alertRequest = new CreateAlertRequest(alert);
AlertsClientInterface alertsClient = internalCluster().getInstance(AlertsClient.class, internalCluster().getMasterName());
CreateAlertResponse alertsResponse = alertsClient.createAlert(alertRequest).actionGet();
assertTrue(alertsResponse.success());
GetAlertRequest getAlertRequest = new GetAlertRequest(alert.alertName());
GetAlertResponse getAlertResponse = alertsClient.getAlert(getAlertRequest).actionGet();
assertTrue(getAlertResponse.found());
assertEquals(alert.schedule(), getAlertResponse.alert().schedule());
String schedule = "0/10 * * * * ? *";
alert.schedule(schedule);
UpdateAlertRequest updateAlertRequest = new UpdateAlertRequest(alert);
UpdateAlertResponse updateAlertResponse = alertsClient.updateAlert(updateAlertRequest).actionGet();
assertTrue(updateAlertResponse.success());
}
}