Initial step to running alerts on master and added a very simple test.

Original commit: elastic/x-pack-elasticsearch@480f6bd44b
This commit is contained in:
Martijn van Groningen 2014-10-24 12:49:33 +02:00
parent f57ca58605
commit 75ef2dc3b3
8 changed files with 309 additions and 76 deletions

127
pom.xml
View File

@ -13,55 +13,106 @@
<packaging>jar</packaging>
<properties>
<elasticsearch.version>1.3.1</elasticsearch.version>
<elasticsearch.version>1.4.0.Beta1</elasticsearch.version>
<lucene.version>4.10.1</lucene.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-test-framework</artifactId>
<version>${lucene.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-test-framework</artifactId>
<version>${lucene.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3.RC2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>1.3.RC2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
<scope>compile</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>${lucene.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>2.3.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>1.4.4</version>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>1.4.4</version>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<repositories>
<repositories>
<repository>
<id>maven2-repository.dev.java.net</id>
<name>Java.net Repository for Maven</name>
<url>http://download.java.net/maven/2/</url>
<layout>default</layout>
<id>maven2-repository.dev.java.net</id>
<name>Java.net Repository for Maven</name>
<url>http://download.java.net/maven/2/</url>
<layout>default</layout>
</repository>
</repositories>
</repositories>
<build>
<plugins>
<plugin>

View File

@ -27,6 +27,7 @@ public class AlertActionManager extends AbstractComponent {
this.actionImplemented = new HashMap<>();
registerAction("email", new EmailAlertActionFactory());
registerAction("index", new IndexAlertActionFactory(client));
alertManager.setActionManager(this);
}
public void registerAction(String name, AlertActionFactory actionFactory){

View File

@ -30,7 +30,10 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
@ -103,7 +106,6 @@ public class AlertManager extends AbstractLifecycleComponent {
}
}
@Inject
public void setActionManager(AlertActionManager actionManager){
this.actionManager = actionManager;
}
@ -117,6 +119,7 @@ public class AlertManager extends AbstractLifecycleComponent {
@Override
protected void doStop() throws ElasticsearchException {
logger.warn("STOPPING");
starter.interrupt();
/*
try {
starter.join();
@ -142,7 +145,6 @@ public class AlertManager extends AbstractLifecycleComponent {
//scheduleAlerts();
}
@Inject
public void setAlertScheduler(AlertScheduler scheduler){
this.scheduler = scheduler;
}

View File

@ -9,39 +9,44 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.query.*;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.query.FilteredQueryBuilder;
import org.elasticsearch.index.query.RangeFilterBuilder;
import org.elasticsearch.index.query.TemplateQueryBuilder;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.simpl.SimpleJobFactory;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
public class AlertScheduler extends AbstractLifecycleComponent {
public class AlertScheduler extends AbstractLifecycleComponent implements ClusterStateListener {
Scheduler scheduler = null;
private final AlertManager alertManager;
private final Client client;
private final Scheduler scheduler;
private final AlertManager alertManager;
private final ScriptService scriptService;
private final TriggerManager triggerManager;
private final AlertActionManager actionManager;
private final ScriptService scriptService;
private final AtomicBoolean run = new AtomicBoolean(false);
@Inject
public AlertScheduler(Settings settings, AlertManager alertManager, Client client,
TriggerManager triggerManager, AlertActionManager actionManager,
ScriptService scriptService) {
ScriptService scriptService, ClusterService clusterService) {
super(settings);
this.alertManager = alertManager;
this.client = client;
@ -52,9 +57,51 @@ public class AlertScheduler extends AbstractLifecycleComponent {
SchedulerFactory schFactory = new StdSchedulerFactory();
scheduler = schFactory.getScheduler();
scheduler.setJobFactory(new SimpleJobFactory());
} catch (Throwable t) {
logger.error("Failed to instantiate scheduler", t);
} catch (SchedulerException e) {
throw new ElasticsearchException("Failed to instantiate scheduler", e);
}
clusterService.add(this);
alertManager.setAlertScheduler(this);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().nodes().localNodeMaster()) {
if (run.compareAndSet(false, true)) {
try {
logger.info("Starting scheduler");
scheduler.start();
} catch (SchedulerException se){
logger.error("Failed to start quartz scheduler",se);
}
}
} else {
stopIfRunning();
}
}
private void stopIfRunning() {
if (run.compareAndSet(true, false)) {
try {
logger.info("Stopping scheduler");
scheduler.shutdown(true);
} catch (SchedulerException se){
logger.error("Failed to stop quartz scheduler",se);
}
}
}
@Override
protected void doStart() throws ElasticsearchException {
}
@Override
protected void doStop() throws ElasticsearchException {
stopIfRunning();
}
@Override
protected void doClose() throws ElasticsearchException {
}
public boolean deleteAlertFromSchedule(String alertName) {
@ -174,26 +221,8 @@ public class AlertScheduler extends AbstractLifecycleComponent {
}
}
@Override
protected void doStart() throws ElasticsearchException {
logger.warn("Starting Scheduler");
try {
scheduler.start();
} catch (SchedulerException se){
logger.error("Failed to start quartz scheduler",se);
}
public boolean isRunning() {
return true;
}
@Override
protected void doStop() throws ElasticsearchException {
try {
scheduler.shutdown(true);
} catch (SchedulerException se){
logger.error("Failed to stop quartz scheduler",se);
}
}
@Override
protected void doClose() throws ElasticsearchException {
}
}

View File

@ -8,13 +8,11 @@ package org.elasticsearch.alerting;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import java.io.IOException;
@ -24,7 +22,6 @@ public class IndexAlertAction implements AlertAction, ToXContent {
private Client client = null;
ESLogger logger = Loggers.getLogger(IndexAlertAction.class);
@Inject
public IndexAlertAction(String index, String type, Client client){
this.index = index;
this.type = type;

View File

@ -1 +1,2 @@
plugin=org.elasticsearch.plugin.alerting.AlertingPlugin
plugin=org.elasticsearch.plugin.alerting.AlertingPlugin
version=${project.version}

View File

@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerting;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.plugin.alerting.AlertingPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.core.Is.is;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, numDataNodes = 1)
public class BasicAlertingTest extends ElasticsearchIntegrationTest {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("plugin.mandatory", "alerting-plugin")
.put("plugin.types", AlertingPlugin.class.getName())
.put("node.mode", "network")
.put("plugins.load_classpath_plugins", false)
.build();
}
@Test
// TODO: add request, response & request builder etc.
public void testAlerSchedulerStartsProperly() throws Exception {
createIndex("my-index");
createIndex(ScriptService.SCRIPT_INDEX);
client().prepareIndex(ScriptService.SCRIPT_INDEX, "mustache", "query")
.setSource(jsonBuilder().startObject().startObject("template").startObject("match_all").endObject().endObject().endObject())
.get();
/*client().admin().indices().preparePutTemplate("query")
.setTemplate("*")
.setSource(jsonBuilder().startObject().startObject("query").startObject("match_all").endObject().endObject().endObject())
.get();
GetIndexTemplatesResponse templatesResponse = client().admin().indices().prepareGetTemplates("query").get();
assertThat(templatesResponse.getIndexTemplates().size(), equalTo(1));
assertThat(templatesResponse.getIndexTemplates().get(0).getName(), equalTo("query"));*/
AlertScheduler alertScheduler = internalCluster().getInstance(AlertScheduler.class, internalCluster().getMasterName());
assertThat(alertScheduler.isRunning(), is(true));
AlertManager alertManager = internalCluster().getInstance(AlertManager.class, internalCluster().getMasterName());
final CountDownLatch latch = new CountDownLatch(1);
final AlertAction alertAction = new AlertAction() {
@Override
public String getActionName() {
return "test";
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.endObject();
return builder;
}
@Override
public boolean doAction(String alertName, AlertResult alert) {
logger.info("Alert {} invoked: {}", alertName, alert);
latch.countDown();
return true;
}
};
AlertActionManager alertActionManager = internalCluster().getInstance(AlertActionManager.class, internalCluster().getMasterName());
alertActionManager.registerAction("test", new AlertActionFactory() {
@Override
public AlertAction createAction(Object parameters) {
return alertAction;
}
});
AlertTrigger alertTrigger = new AlertTrigger(new ScriptedAlertTrigger("return true", ScriptService.ScriptType.INLINE, "groovy"));
Alert alert = new Alert(
"my-first-alert",
"/mustache/query",
alertTrigger,
TimeValue.timeValueSeconds(1),
Arrays.asList(alertAction),
"* * * * * ? *",
null,
Arrays.asList("my-index"),
null,
1,
true,
true
);
alertManager.addAlert("my-first-alert", alert, true);
latch.await();
}
}

View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!-- Licensed to Elasticsearch under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. ElasticSearch licenses this file to you
under the Apache License, Version 2.0 (the "License"); you may not use this
file except in compliance with the License. You may obtain a copy of the
License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the specific language
governing permissions and limitations under the License. -->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p %c{1} - %m%n" />
</layout>
</appender>
<logger name="org.elasticsearch">
<level value="info" />
</logger>
<logger name="org.elasticsearch.cloud.aws">
<level value="trace" />
</logger>
<logger name="org.elasticsearch.discovery.aws">
<level value="trace" />
</logger>
<logger name="org.elasticsearch.repositories.aws">
<level value="trace" />
</logger>
<root>
<priority value="info" />
<appender-ref ref="console" />
</root>
</log4j:configuration>