diff --git a/pom.xml b/pom.xml
index 612e3e4b210..7585b6df7df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,6 +79,14 @@
test
+
+
+ com.google.guava
+ guava
+ 18.0
+ test
+
+
diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java
index 952584f98d6..21d5d111668 100644
--- a/src/main/java/org/elasticsearch/alerts/AlertManager.java
+++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java
@@ -30,7 +30,6 @@ import org.elasticsearch.indices.IndicesService;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -135,7 +134,9 @@ public class AlertManager extends AbstractComponent {
public void stop() {
if (started.compareAndSet(true, false)) {
logger.info("Stopping alert manager...");
- scheduler.stop();
+ synchronized (scheduler) {
+ scheduler.stop();
+ }
actionManager.stop();
alertsStore.stop();
logger.info("Alert manager has stopped");
@@ -156,12 +157,6 @@ public class AlertManager extends AbstractComponent {
}
}
- private void sendAlertsToScheduler() {
- for (Map.Entry entry : alertsStore.getAlerts().entrySet()) {
- scheduler.add(entry.getKey(), entry.getValue());
- }
- }
-
private final class AlertsClusterStateListener implements ClusterStateListener {
@Override
@@ -206,8 +201,7 @@ public class AlertManager extends AbstractComponent {
private void startIfReady() {
if (alertsStore.started() && actionManager.started()) {
if (started.compareAndSet(false, true)) {
- scheduler.start();
- sendAlertsToScheduler();
+ scheduler.start(alertsStore.getAlerts());
}
}
}
diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java
index 42154ef504a..2deaa010527 100644
--- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java
+++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java
@@ -317,12 +317,13 @@ public class AlertActionManager extends AbstractComponent {
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
- }
- if (started()) {
- logger.error("Error during reader thread, restarting queue reader thread...", e);
- threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueReaderThread());
} else {
- logger.error("Error during reader thread", e);
+ if (started()) {
+ logger.error("Error during reader thread, restarting queue reader thread...", e);
+ threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueReaderThread());
+ } else {
+ logger.error("Error during reader thread", e);
+ }
}
}
}
diff --git a/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java b/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java
index 575fa48ca6e..bcc909a4745 100644
--- a/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java
+++ b/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java
@@ -19,6 +19,7 @@ import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.simpl.SimpleJobFactory;
+import java.util.Map;
import java.util.Properties;
public class AlertScheduler extends AbstractComponent {
@@ -39,7 +40,13 @@ public class AlertScheduler extends AbstractComponent {
this.alertManager = alertManager;
}
- public void start() {
+ /**
+ * Starts the scheduler and schedules the specified alerts before returning.
+ *
+ * Both the start and stop are synchronized to avoid that scheduler gets stopped while previously stored alerts
+ * are being loaded.
+ */
+ public synchronized void start(Map alerts) {
try {
logger.info("Starting scheduler");
// Can't start a scheduler that has been shutdown, so we need to re-create each time start() is invoked
@@ -49,18 +56,22 @@ public class AlertScheduler extends AbstractComponent {
scheduler = schFactory.getScheduler();
scheduler.setJobFactory(new SimpleJobFactory());
scheduler.start();
+ for (Map.Entry entry : alerts.entrySet()) {
+ add(entry.getKey(), entry.getValue());
+ }
} catch (SchedulerException se){
logger.error("Failed to start quartz scheduler", se);
}
}
- public void stop() {
+ /**
+ * Stops the scheduler.
+ */
+ public synchronized void stop() {
try {
- if (!scheduler.isShutdown()) {
- scheduler.clear();
- scheduler.shutdown(false);
- logger.info("Stopped scheduler");
- }
+ scheduler.clear();
+ scheduler.shutdown(true);
+ logger.info("Stopped scheduler");
} catch (SchedulerException se){
logger.error("Failed to stop quartz scheduler", se);
}
diff --git a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java
index f938d8490e6..59e2ce784ea 100644
--- a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java
+++ b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java
@@ -53,8 +53,6 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
.put(super.nodeSettings(nodeOrdinal))
.put("scroll.size", randomIntBetween(1, 100))
.put("plugin.types", AlertsPlugin.class.getName())
- // TODO: Figure out why Guice errors occur if zen discovery isn't configured
- .put("discovery.type", "zen")
.build();
}
diff --git a/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java b/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java
index fe9a65bab41..78cf357ed01 100644
--- a/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java
+++ b/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java
@@ -38,6 +38,7 @@ public class NoMasterNodeTests extends AbstractAlertingTests {
return ImmutableSettings.builder()
.put(settings)
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2)
+ .put("discovery.type", "zen")
.build();
}
diff --git a/src/test/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/src/test/java/org/elasticsearch/discovery/local/LocalDiscovery.java
new file mode 100644
index 00000000000..9701c8b8175
--- /dev/null
+++ b/src/test/java/org/elasticsearch/discovery/local/LocalDiscovery.java
@@ -0,0 +1,381 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.discovery.local;
+
+import com.google.common.base.Objects;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchIllegalStateException;
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.*;
+import org.elasticsearch.cluster.block.ClusterBlocks;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodeService;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.inject.internal.Nullable;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.discovery.*;
+import org.elasticsearch.node.service.NodeService;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.elasticsearch.cluster.ClusterState.Builder;
+
+/**
+ * Forked from ES core just for testing. This class is first on the classpath and will be picked
+ * before the one in es core jar file.
+ *
+ * The version in core has a Guice circular dependency issue with 'DiscoveryService'.
+ * It doesn't need it, so in this version that has been removed.
+ *
+ * The circular dependency issue only manifests when plugin.types setting is used in tests.
+ */
+public class LocalDiscovery extends AbstractLifecycleComponent implements Discovery {
+
+ private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0];
+
+ private final TransportService transportService;
+ private final ClusterService clusterService;
+ private final DiscoveryNodeService discoveryNodeService;
+ private AllocationService allocationService;
+ private final ClusterName clusterName;
+ private final Version version;
+
+ private final DiscoverySettings discoverySettings;
+
+ private DiscoveryNode localNode;
+
+ private volatile boolean master = false;
+
+ private final AtomicBoolean initialStateSent = new AtomicBoolean();
+
+ private final CopyOnWriteArrayList initialStateListeners = new CopyOnWriteArrayList<>();
+
+ private static final ConcurrentMap clusterGroups = ConcurrentCollections.newConcurrentMap();
+
+ @Inject
+ public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
+ DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) {
+ super(settings);
+ this.clusterName = clusterName;
+ this.clusterService = clusterService;
+ this.transportService = transportService;
+ this.discoveryNodeService = discoveryNodeService;
+ this.version = version;
+ this.discoverySettings = discoverySettings;
+ }
+
+ @Override
+ public void setNodeService(@Nullable NodeService nodeService) {
+ // nothing to do here
+ }
+
+ @Override
+ public void setAllocationService(AllocationService allocationService) {
+ this.allocationService = allocationService;
+ }
+
+ @Override
+ protected void doStart() throws ElasticsearchException {
+ synchronized (clusterGroups) {
+ ClusterGroup clusterGroup = clusterGroups.get(clusterName);
+ if (clusterGroup == null) {
+ clusterGroup = new ClusterGroup();
+ clusterGroups.put(clusterName, clusterGroup);
+ }
+ logger.debug("Connected to cluster [{}]", clusterName);
+ this.localNode = new DiscoveryNode(settings.get("name"), DiscoveryService.generateNodeId(settings), transportService.boundAddress().publishAddress(),
+ discoveryNodeService.buildAttributes(), version);
+
+ clusterGroup.members().add(this);
+
+ LocalDiscovery firstMaster = null;
+ for (LocalDiscovery localDiscovery : clusterGroup.members()) {
+ if (localDiscovery.localNode().masterNode()) {
+ firstMaster = localDiscovery;
+ break;
+ }
+ }
+
+ if (firstMaster != null && firstMaster.equals(this)) {
+ // we are the first master (and the master)
+ master = true;
+ final LocalDiscovery master = firstMaster;
+ clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateNonMasterUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
+ for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
+ nodesBuilder.put(discovery.localNode);
+ }
+ nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
+ // remove the NO_MASTER block in this case
+ ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock());
+ return ClusterState.builder(currentState).nodes(nodesBuilder).blocks(blocks).build();
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ logger.error("unexpected failure during [{}]", t, source);
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ sendInitialStateEventIfNeeded();
+ }
+ });
+ } else if (firstMaster != null) {
+ // update as fast as we can the local node state with the new metadata (so we create indices for example)
+ final ClusterState masterState = firstMaster.clusterService.state();
+ clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateNonMasterUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ // make sure we have the local node id set, we might need it as a result of the new metadata
+ DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes()).put(localNode).localNodeId(localNode.id());
+ return ClusterState.builder(currentState).metaData(masterState.metaData()).nodes(nodesBuilder).build();
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ logger.error("unexpected failure during [{}]", t, source);
+ }
+ });
+
+ // tell the master to send the fact that we are here
+ final LocalDiscovery master = firstMaster;
+ firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateNonMasterUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
+ for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
+ nodesBuilder.put(discovery.localNode);
+ }
+ nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
+ return ClusterState.builder(currentState).nodes(nodesBuilder).build();
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ logger.error("unexpected failure during [{}]", t, source);
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ sendInitialStateEventIfNeeded();
+ }
+ });
+ }
+ } // else, no master node, the next node that will start will fill things in...
+ }
+
+ @Override
+ protected void doStop() throws ElasticsearchException {
+ synchronized (clusterGroups) {
+ ClusterGroup clusterGroup = clusterGroups.get(clusterName);
+ if (clusterGroup == null) {
+ logger.warn("Illegal state, should not have an empty cluster group when stopping, I should be there at teh very least...");
+ return;
+ }
+ clusterGroup.members().remove(this);
+ if (clusterGroup.members().isEmpty()) {
+ // no more members, remove and return
+ clusterGroups.remove(clusterName);
+ return;
+ }
+
+ LocalDiscovery firstMaster = null;
+ for (LocalDiscovery localDiscovery : clusterGroup.members()) {
+ if (localDiscovery.localNode().masterNode()) {
+ firstMaster = localDiscovery;
+ break;
+ }
+ }
+
+ if (firstMaster != null) {
+ // if the removed node is the master, make the next one as the master
+ if (master) {
+ firstMaster.master = true;
+ }
+
+ final Set newMembers = new HashSet<>();
+ for (LocalDiscovery discovery : clusterGroup.members()) {
+ newMembers.add(discovery.localNode.id());
+ }
+
+ final LocalDiscovery master = firstMaster;
+ master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateNonMasterUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id());
+ DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes());
+ if (delta.added()) {
+ logger.warn("No new nodes should be created when a new discovery view is accepted");
+ }
+ // reroute here, so we eagerly remove dead nodes from the routing
+ ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build();
+ RoutingAllocation.Result routingResult = master.allocationService.reroute(ClusterState.builder(updatedState).build());
+ return ClusterState.builder(updatedState).routingResult(routingResult).build();
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ logger.error("unexpected failure during [{}]", t, source);
+ }
+ });
+ }
+ }
+ }
+
+ @Override
+ protected void doClose() throws ElasticsearchException {
+ }
+
+ @Override
+ public DiscoveryNode localNode() {
+ return localNode;
+ }
+
+ @Override
+ public void addListener(InitialStateDiscoveryListener listener) {
+ this.initialStateListeners.add(listener);
+ }
+
+ @Override
+ public void removeListener(InitialStateDiscoveryListener listener) {
+ this.initialStateListeners.remove(listener);
+ }
+
+ @Override
+ public String nodeDescription() {
+ return clusterName.value() + "/" + localNode.id();
+ }
+
+ public void publish(ClusterState clusterState, final Discovery.AckListener ackListener) {
+ if (!master) {
+ throw new ElasticsearchIllegalStateException("Shouldn't publish state when not master");
+ }
+ LocalDiscovery[] members = members();
+ if (members.length > 0) {
+ publish(members, clusterState, new AckClusterStatePublishResponseHandler(members.length - 1, ackListener));
+ }
+ }
+
+ private LocalDiscovery[] members() {
+ ClusterGroup clusterGroup = clusterGroups.get(clusterName);
+ if (clusterGroup == null) {
+ return NO_MEMBERS;
+ }
+ Queue members = clusterGroup.members();
+ return members.toArray(new LocalDiscovery[members.size()]);
+ }
+
+ private void publish(LocalDiscovery[] members, ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) {
+
+ try {
+ // we do the marshaling intentionally, to check it works well...
+ final byte[] clusterStateBytes = Builder.toBytes(clusterState);
+
+ for (final LocalDiscovery discovery : members) {
+ if (discovery.master) {
+ continue;
+ }
+ final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode, clusterName);
+ nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
+ // ignore cluster state messages that do not include "me", not in the game yet...
+ if (nodeSpecificClusterState.nodes().localNode() != null) {
+ assert nodeSpecificClusterState.nodes().masterNode() != null : "received a cluster state without a master";
+ assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block";
+
+ discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) {
+ return currentState;
+ }
+
+ if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) {
+ // its a fresh update from the master as we transition from a start of not having a master to having one
+ logger.debug("got first state from fresh master [{}]", nodeSpecificClusterState.nodes().masterNodeId());
+ return nodeSpecificClusterState;
+ }
+
+ ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState);
+ // if the routing table did not change, use the original one
+ if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) {
+ builder.routingTable(currentState.routingTable());
+ }
+ if (nodeSpecificClusterState.metaData().version() == currentState.metaData().version()) {
+ builder.metaData(currentState.metaData());
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ logger.error("unexpected failure during [{}]", t, source);
+ publishResponseHandler.onFailure(discovery.localNode, t);
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ sendInitialStateEventIfNeeded();
+ publishResponseHandler.onResponse(discovery.localNode);
+ }
+ });
+ } else {
+ publishResponseHandler.onResponse(discovery.localNode);
+ }
+ }
+
+ TimeValue publishTimeout = discoverySettings.getPublishTimeout();
+ if (publishTimeout.millis() > 0) {
+ try {
+ boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout);
+ if (!awaited) {
+ logger.debug("awaiting all nodes to process published state {} timed out, timeout {}", clusterState.version(), publishTimeout);
+ }
+ } catch (InterruptedException e) {
+ // ignore & restore interrupt
+ Thread.currentThread().interrupt();
+ }
+ }
+
+
+ } catch (Exception e) {
+ // failure to marshal or un-marshal
+ throw new ElasticsearchIllegalStateException("Cluster state failed to serialize", e);
+ }
+ }
+
+ private void sendInitialStateEventIfNeeded() {
+ if (initialStateSent.compareAndSet(false, true)) {
+ for (InitialStateDiscoveryListener listener : initialStateListeners) {
+ listener.initialStateProcessed();
+ }
+ }
+ }
+
+ private class ClusterGroup {
+
+ private Queue members = ConcurrentCollections.newQueue();
+
+ Queue members() {
+ return members;
+ }
+ }
+}