From 75ef2dc3b398c895ae7ef4ffdf5d989538450a44 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 24 Oct 2014 12:49:33 +0200 Subject: [PATCH] Initial step to running alerts on master and added a very simple test. Original commit: elastic/x-pack-elasticsearch@480f6bd44b26347132498f90ea8ae229c56922e6 --- pom.xml | 127 ++++++++++++------ .../alerting/AlertActionManager.java | 1 + .../elasticsearch/alerting/AlertManager.java | 8 +- .../alerting/AlertScheduler.java | 91 ++++++++----- .../alerting/IndexAlertAction.java | 3 - src/main/resources/es-plugin.properties | 3 +- .../alerting/BasicAlertingTest.java | 110 +++++++++++++++ src/test/resources/log4j.xml | 42 ++++++ 8 files changed, 309 insertions(+), 76 deletions(-) create mode 100644 src/test/java/org/elasticsearch/alerting/BasicAlertingTest.java create mode 100644 src/test/resources/log4j.xml diff --git a/pom.xml b/pom.xml index 797e00a471c..cd6d1c915e4 100644 --- a/pom.xml +++ b/pom.xml @@ -13,55 +13,106 @@ jar - 1.3.1 + 1.4.0.Beta1 + 4.10.1 + + + org.apache.lucene + lucene-test-framework + ${lucene.version} + test + + + + org.apache.lucene + lucene-test-framework + ${lucene.version} + test + + + + org.hamcrest + hamcrest-core + 1.3.RC2 + test + + + org.hamcrest + hamcrest-library + 1.3.RC2 + test + + org.elasticsearch elasticsearch ${elasticsearch.version} - compile + provided + + + org.apache.lucene + lucene-core + ${lucene.version} + provided + + + + org.elasticsearch + elasticsearch + ${elasticsearch.version} + test-jar + test + + + + org.codehaus.groovy + groovy-all + 2.3.2 + provided + + + + org.quartz-scheduler + quartz + 2.2.1 + + + org.quartz-scheduler + quartz-jobs + 2.2.1 + + + log4j + log4j + 1.2.16 + + + org.slf4j + slf4j-api + 1.7.7 + + + javax.mail + mail + 1.4.4 + + + javax.activation + activation + 1.1.1 - - org.quartz-scheduler - quartz - 2.2.1 - - - org.quartz-scheduler - quartz-jobs - 2.2.1 - - - log4j - log4j - 1.2.16 - - - org.slf4j - slf4j-api - 1.7.7 - - - javax.mail - mail - 1.4.4 - - - javax.activation - activation - 1.1.1 - - + - maven2-repository.dev.java.net - Java.net Repository for Maven - http://download.java.net/maven/2/ - default + maven2-repository.dev.java.net + Java.net Repository for Maven + http://download.java.net/maven/2/ + default - + diff --git a/src/main/java/org/elasticsearch/alerting/AlertActionManager.java b/src/main/java/org/elasticsearch/alerting/AlertActionManager.java index 4ad90b1ae6c..11936394e92 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerting/AlertActionManager.java @@ -27,6 +27,7 @@ public class AlertActionManager extends AbstractComponent { this.actionImplemented = new HashMap<>(); registerAction("email", new EmailAlertActionFactory()); registerAction("index", new IndexAlertActionFactory(client)); + alertManager.setActionManager(this); } public void registerAction(String name, AlertActionFactory actionFactory){ diff --git a/src/main/java/org/elasticsearch/alerting/AlertManager.java b/src/main/java/org/elasticsearch/alerting/AlertManager.java index ba3e787021e..52e9a29402d 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerting/AlertManager.java @@ -30,7 +30,10 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.search.SearchHit; import java.io.IOException; @@ -103,7 +106,6 @@ public class AlertManager extends AbstractLifecycleComponent { } } - @Inject public void setActionManager(AlertActionManager actionManager){ this.actionManager = actionManager; } @@ -117,6 +119,7 @@ public class AlertManager extends AbstractLifecycleComponent { @Override protected void doStop() throws ElasticsearchException { logger.warn("STOPPING"); + starter.interrupt(); /* try { starter.join(); @@ -142,7 +145,6 @@ public class AlertManager extends AbstractLifecycleComponent { //scheduleAlerts(); } - @Inject public void setAlertScheduler(AlertScheduler scheduler){ this.scheduler = scheduler; } diff --git a/src/main/java/org/elasticsearch/alerting/AlertScheduler.java b/src/main/java/org/elasticsearch/alerting/AlertScheduler.java index 9f15865fa99..7e704a79b68 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertScheduler.java +++ b/src/main/java/org/elasticsearch/alerting/AlertScheduler.java @@ -9,39 +9,44 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.*; -import org.elasticsearch.index.query.*; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.index.query.FilteredQueryBuilder; +import org.elasticsearch.index.query.RangeFilterBuilder; +import org.elasticsearch.index.query.TemplateQueryBuilder; import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.ScriptService; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.quartz.simpl.SimpleJobFactory; -import java.io.IOException; import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; -public class AlertScheduler extends AbstractLifecycleComponent { +public class AlertScheduler extends AbstractLifecycleComponent implements ClusterStateListener { - Scheduler scheduler = null; - private final AlertManager alertManager; private final Client client; + private final Scheduler scheduler; + private final AlertManager alertManager; + private final ScriptService scriptService; private final TriggerManager triggerManager; private final AlertActionManager actionManager; - private final ScriptService scriptService; + private final AtomicBoolean run = new AtomicBoolean(false); @Inject public AlertScheduler(Settings settings, AlertManager alertManager, Client client, TriggerManager triggerManager, AlertActionManager actionManager, - ScriptService scriptService) { + ScriptService scriptService, ClusterService clusterService) { super(settings); this.alertManager = alertManager; this.client = client; @@ -52,9 +57,51 @@ public class AlertScheduler extends AbstractLifecycleComponent { SchedulerFactory schFactory = new StdSchedulerFactory(); scheduler = schFactory.getScheduler(); scheduler.setJobFactory(new SimpleJobFactory()); - } catch (Throwable t) { - logger.error("Failed to instantiate scheduler", t); + } catch (SchedulerException e) { + throw new ElasticsearchException("Failed to instantiate scheduler", e); } + clusterService.add(this); + alertManager.setAlertScheduler(this); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.state().nodes().localNodeMaster()) { + if (run.compareAndSet(false, true)) { + try { + logger.info("Starting scheduler"); + scheduler.start(); + } catch (SchedulerException se){ + logger.error("Failed to start quartz scheduler",se); + } + } + } else { + stopIfRunning(); + } + } + + private void stopIfRunning() { + if (run.compareAndSet(true, false)) { + try { + logger.info("Stopping scheduler"); + scheduler.shutdown(true); + } catch (SchedulerException se){ + logger.error("Failed to stop quartz scheduler",se); + } + } + } + + @Override + protected void doStart() throws ElasticsearchException { + } + + @Override + protected void doStop() throws ElasticsearchException { + stopIfRunning(); + } + + @Override + protected void doClose() throws ElasticsearchException { } public boolean deleteAlertFromSchedule(String alertName) { @@ -174,26 +221,8 @@ public class AlertScheduler extends AbstractLifecycleComponent { } } - @Override - protected void doStart() throws ElasticsearchException { - logger.warn("Starting Scheduler"); - try { - scheduler.start(); - } catch (SchedulerException se){ - logger.error("Failed to start quartz scheduler",se); - } + public boolean isRunning() { + return true; } - @Override - protected void doStop() throws ElasticsearchException { - try { - scheduler.shutdown(true); - } catch (SchedulerException se){ - logger.error("Failed to stop quartz scheduler",se); - } - } - - @Override - protected void doClose() throws ElasticsearchException { - } } diff --git a/src/main/java/org/elasticsearch/alerting/IndexAlertAction.java b/src/main/java/org/elasticsearch/alerting/IndexAlertAction.java index 8402daeb121..25d918b4fc7 100644 --- a/src/main/java/org/elasticsearch/alerting/IndexAlertAction.java +++ b/src/main/java/org/elasticsearch/alerting/IndexAlertAction.java @@ -8,13 +8,11 @@ package org.elasticsearch.alerting; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentHelper; import java.io.IOException; @@ -24,7 +22,6 @@ public class IndexAlertAction implements AlertAction, ToXContent { private Client client = null; ESLogger logger = Loggers.getLogger(IndexAlertAction.class); - @Inject public IndexAlertAction(String index, String type, Client client){ this.index = index; this.type = type; diff --git a/src/main/resources/es-plugin.properties b/src/main/resources/es-plugin.properties index 8832ae6c743..ecf11df7a6c 100644 --- a/src/main/resources/es-plugin.properties +++ b/src/main/resources/es-plugin.properties @@ -1 +1,2 @@ -plugin=org.elasticsearch.plugin.alerting.AlertingPlugin \ No newline at end of file +plugin=org.elasticsearch.plugin.alerting.AlertingPlugin +version=${project.version} \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/alerting/BasicAlertingTest.java b/src/test/java/org/elasticsearch/alerting/BasicAlertingTest.java new file mode 100644 index 00000000000..4b58364f73b --- /dev/null +++ b/src/test/java/org/elasticsearch/alerting/BasicAlertingTest.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerting; + +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.plugin.alerting.AlertingPlugin; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.core.Is.is; + +/** + */ +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, numDataNodes = 1) +public class BasicAlertingTest extends ElasticsearchIntegrationTest { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return ImmutableSettings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put("plugin.mandatory", "alerting-plugin") + .put("plugin.types", AlertingPlugin.class.getName()) + .put("node.mode", "network") + .put("plugins.load_classpath_plugins", false) + .build(); + } + + @Test + // TODO: add request, response & request builder etc. + public void testAlerSchedulerStartsProperly() throws Exception { + createIndex("my-index"); + createIndex(ScriptService.SCRIPT_INDEX); + + client().prepareIndex(ScriptService.SCRIPT_INDEX, "mustache", "query") + .setSource(jsonBuilder().startObject().startObject("template").startObject("match_all").endObject().endObject().endObject()) + .get(); + + /*client().admin().indices().preparePutTemplate("query") + .setTemplate("*") + .setSource(jsonBuilder().startObject().startObject("query").startObject("match_all").endObject().endObject().endObject()) + .get(); + + GetIndexTemplatesResponse templatesResponse = client().admin().indices().prepareGetTemplates("query").get(); + assertThat(templatesResponse.getIndexTemplates().size(), equalTo(1)); + assertThat(templatesResponse.getIndexTemplates().get(0).getName(), equalTo("query"));*/ + + AlertScheduler alertScheduler = internalCluster().getInstance(AlertScheduler.class, internalCluster().getMasterName()); + assertThat(alertScheduler.isRunning(), is(true)); + + AlertManager alertManager = internalCluster().getInstance(AlertManager.class, internalCluster().getMasterName()); + final CountDownLatch latch = new CountDownLatch(1); + final AlertAction alertAction = new AlertAction() { + @Override + public String getActionName() { + return "test"; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.endObject(); + return builder; + } + + @Override + public boolean doAction(String alertName, AlertResult alert) { + logger.info("Alert {} invoked: {}", alertName, alert); + latch.countDown(); + return true; + } + }; + AlertActionManager alertActionManager = internalCluster().getInstance(AlertActionManager.class, internalCluster().getMasterName()); + alertActionManager.registerAction("test", new AlertActionFactory() { + @Override + public AlertAction createAction(Object parameters) { + return alertAction; + } + }); + AlertTrigger alertTrigger = new AlertTrigger(new ScriptedAlertTrigger("return true", ScriptService.ScriptType.INLINE, "groovy")); + Alert alert = new Alert( + "my-first-alert", + "/mustache/query", + alertTrigger, + TimeValue.timeValueSeconds(1), + Arrays.asList(alertAction), + "* * * * * ? *", + null, + Arrays.asList("my-index"), + null, + 1, + true, + true + ); + alertManager.addAlert("my-first-alert", alert, true); + latch.await(); + } + +} diff --git a/src/test/resources/log4j.xml b/src/test/resources/log4j.xml new file mode 100644 index 00000000000..052175f3aed --- /dev/null +++ b/src/test/resources/log4j.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +