Test: Fixed local discovery issue when running tests for plugins

Core: synchronized initializing and stopping the scheduler to avoid scheduler stops leaving leaking threads behind

Original commit: elastic/x-pack-elasticsearch@b845651430
This commit is contained in:
Martijn van Groningen 2014-11-09 22:21:51 +00:00
parent 77fc31f815
commit e87b0a980c
7 changed files with 418 additions and 24 deletions

View File

@ -79,6 +79,14 @@
<scope>test</scope>
</dependency>
<!-- Remove this when LocalDiscovery gets fixed in core -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
<scope>test</scope>
</dependency>
<!-- Regular dependencies -->
<dependency>

View File

@ -30,7 +30,6 @@ import org.elasticsearch.indices.IndicesService;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -135,7 +134,9 @@ public class AlertManager extends AbstractComponent {
public void stop() {
if (started.compareAndSet(true, false)) {
logger.info("Stopping alert manager...");
synchronized (scheduler) {
scheduler.stop();
}
actionManager.stop();
alertsStore.stop();
logger.info("Alert manager has stopped");
@ -156,12 +157,6 @@ public class AlertManager extends AbstractComponent {
}
}
private void sendAlertsToScheduler() {
for (Map.Entry<String, Alert> entry : alertsStore.getAlerts().entrySet()) {
scheduler.add(entry.getKey(), entry.getValue());
}
}
private final class AlertsClusterStateListener implements ClusterStateListener {
@Override
@ -206,8 +201,7 @@ public class AlertManager extends AbstractComponent {
private void startIfReady() {
if (alertsStore.started() && actionManager.started()) {
if (started.compareAndSet(false, true)) {
scheduler.start();
sendAlertsToScheduler();
scheduler.start(alertsStore.getAlerts());
}
}
}

View File

@ -317,7 +317,7 @@ public class AlertActionManager extends AbstractComponent {
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
} else {
if (started()) {
logger.error("Error during reader thread, restarting queue reader thread...", e);
threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueReaderThread());
@ -327,6 +327,7 @@ public class AlertActionManager extends AbstractComponent {
}
}
}
}
private enum State {

View File

@ -19,6 +19,7 @@ import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.simpl.SimpleJobFactory;
import java.util.Map;
import java.util.Properties;
public class AlertScheduler extends AbstractComponent {
@ -39,7 +40,13 @@ public class AlertScheduler extends AbstractComponent {
this.alertManager = alertManager;
}
public void start() {
/**
* Starts the scheduler and schedules the specified alerts before returning.
*
* Both the start and stop are synchronized to avoid that scheduler gets stopped while previously stored alerts
* are being loaded.
*/
public synchronized void start(Map<String, Alert> alerts) {
try {
logger.info("Starting scheduler");
// Can't start a scheduler that has been shutdown, so we need to re-create each time start() is invoked
@ -49,18 +56,22 @@ public class AlertScheduler extends AbstractComponent {
scheduler = schFactory.getScheduler();
scheduler.setJobFactory(new SimpleJobFactory());
scheduler.start();
for (Map.Entry<String, Alert> entry : alerts.entrySet()) {
add(entry.getKey(), entry.getValue());
}
} catch (SchedulerException se){
logger.error("Failed to start quartz scheduler", se);
}
}
public void stop() {
/**
* Stops the scheduler.
*/
public synchronized void stop() {
try {
if (!scheduler.isShutdown()) {
scheduler.clear();
scheduler.shutdown(false);
scheduler.shutdown(true);
logger.info("Stopped scheduler");
}
} catch (SchedulerException se){
logger.error("Failed to stop quartz scheduler", se);
}

View File

@ -53,8 +53,6 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
.put(super.nodeSettings(nodeOrdinal))
.put("scroll.size", randomIntBetween(1, 100))
.put("plugin.types", AlertsPlugin.class.getName())
// TODO: Figure out why Guice errors occur if zen discovery isn't configured
.put("discovery.type", "zen")
.build();
}

View File

@ -38,6 +38,7 @@ public class NoMasterNodeTests extends AbstractAlertingTests {
return ImmutableSettings.builder()
.put(settings)
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2)
.put("discovery.type", "zen")
.build();
}

View File

@ -0,0 +1,381 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.discovery.local;
import com.google.common.base.Objects;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.*;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.transport.TransportService;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.cluster.ClusterState.Builder;
/**
* Forked from ES core just for testing. This class is first on the classpath and will be picked
* before the one in es core jar file.
*
* The version in core has a Guice circular dependency issue with 'DiscoveryService'.
* It doesn't need it, so in this version that has been removed.
*
* The circular dependency issue only manifests when plugin.types setting is used in tests.
*/
public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery {
private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0];
private final TransportService transportService;
private final ClusterService clusterService;
private final DiscoveryNodeService discoveryNodeService;
private AllocationService allocationService;
private final ClusterName clusterName;
private final Version version;
private final DiscoverySettings discoverySettings;
private DiscoveryNode localNode;
private volatile boolean master = false;
private final AtomicBoolean initialStateSent = new AtomicBoolean();
private final CopyOnWriteArrayList<InitialStateDiscoveryListener> initialStateListeners = new CopyOnWriteArrayList<>();
private static final ConcurrentMap<ClusterName, ClusterGroup> clusterGroups = ConcurrentCollections.newConcurrentMap();
@Inject
public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) {
super(settings);
this.clusterName = clusterName;
this.clusterService = clusterService;
this.transportService = transportService;
this.discoveryNodeService = discoveryNodeService;
this.version = version;
this.discoverySettings = discoverySettings;
}
@Override
public void setNodeService(@Nullable NodeService nodeService) {
// nothing to do here
}
@Override
public void setAllocationService(AllocationService allocationService) {
this.allocationService = allocationService;
}
@Override
protected void doStart() throws ElasticsearchException {
synchronized (clusterGroups) {
ClusterGroup clusterGroup = clusterGroups.get(clusterName);
if (clusterGroup == null) {
clusterGroup = new ClusterGroup();
clusterGroups.put(clusterName, clusterGroup);
}
logger.debug("Connected to cluster [{}]", clusterName);
this.localNode = new DiscoveryNode(settings.get("name"), DiscoveryService.generateNodeId(settings), transportService.boundAddress().publishAddress(),
discoveryNodeService.buildAttributes(), version);
clusterGroup.members().add(this);
LocalDiscovery firstMaster = null;
for (LocalDiscovery localDiscovery : clusterGroup.members()) {
if (localDiscovery.localNode().masterNode()) {
firstMaster = localDiscovery;
break;
}
}
if (firstMaster != null && firstMaster.equals(this)) {
// we are the first master (and the master)
master = true;
final LocalDiscovery master = firstMaster;
clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
nodesBuilder.put(discovery.localNode);
}
nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
// remove the NO_MASTER block in this case
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock());
return ClusterState.builder(currentState).nodes(nodesBuilder).blocks(blocks).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
sendInitialStateEventIfNeeded();
}
});
} else if (firstMaster != null) {
// update as fast as we can the local node state with the new metadata (so we create indices for example)
final ClusterState masterState = firstMaster.clusterService.state();
clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
// make sure we have the local node id set, we might need it as a result of the new metadata
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes()).put(localNode).localNodeId(localNode.id());
return ClusterState.builder(currentState).metaData(masterState.metaData()).nodes(nodesBuilder).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
});
// tell the master to send the fact that we are here
final LocalDiscovery master = firstMaster;
firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
nodesBuilder.put(discovery.localNode);
}
nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
return ClusterState.builder(currentState).nodes(nodesBuilder).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
sendInitialStateEventIfNeeded();
}
});
}
} // else, no master node, the next node that will start will fill things in...
}
@Override
protected void doStop() throws ElasticsearchException {
synchronized (clusterGroups) {
ClusterGroup clusterGroup = clusterGroups.get(clusterName);
if (clusterGroup == null) {
logger.warn("Illegal state, should not have an empty cluster group when stopping, I should be there at teh very least...");
return;
}
clusterGroup.members().remove(this);
if (clusterGroup.members().isEmpty()) {
// no more members, remove and return
clusterGroups.remove(clusterName);
return;
}
LocalDiscovery firstMaster = null;
for (LocalDiscovery localDiscovery : clusterGroup.members()) {
if (localDiscovery.localNode().masterNode()) {
firstMaster = localDiscovery;
break;
}
}
if (firstMaster != null) {
// if the removed node is the master, make the next one as the master
if (master) {
firstMaster.master = true;
}
final Set<String> newMembers = new HashSet<>();
for (LocalDiscovery discovery : clusterGroup.members()) {
newMembers.add(discovery.localNode.id());
}
final LocalDiscovery master = firstMaster;
master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id());
DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes());
if (delta.added()) {
logger.warn("No new nodes should be created when a new discovery view is accepted");
}
// reroute here, so we eagerly remove dead nodes from the routing
ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build();
RoutingAllocation.Result routingResult = master.allocationService.reroute(ClusterState.builder(updatedState).build());
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
});
}
}
}
@Override
protected void doClose() throws ElasticsearchException {
}
@Override
public DiscoveryNode localNode() {
return localNode;
}
@Override
public void addListener(InitialStateDiscoveryListener listener) {
this.initialStateListeners.add(listener);
}
@Override
public void removeListener(InitialStateDiscoveryListener listener) {
this.initialStateListeners.remove(listener);
}
@Override
public String nodeDescription() {
return clusterName.value() + "/" + localNode.id();
}
public void publish(ClusterState clusterState, final Discovery.AckListener ackListener) {
if (!master) {
throw new ElasticsearchIllegalStateException("Shouldn't publish state when not master");
}
LocalDiscovery[] members = members();
if (members.length > 0) {
publish(members, clusterState, new AckClusterStatePublishResponseHandler(members.length - 1, ackListener));
}
}
private LocalDiscovery[] members() {
ClusterGroup clusterGroup = clusterGroups.get(clusterName);
if (clusterGroup == null) {
return NO_MEMBERS;
}
Queue<LocalDiscovery> members = clusterGroup.members();
return members.toArray(new LocalDiscovery[members.size()]);
}
private void publish(LocalDiscovery[] members, ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) {
try {
// we do the marshaling intentionally, to check it works well...
final byte[] clusterStateBytes = Builder.toBytes(clusterState);
for (final LocalDiscovery discovery : members) {
if (discovery.master) {
continue;
}
final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode, clusterName);
nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
// ignore cluster state messages that do not include "me", not in the game yet...
if (nodeSpecificClusterState.nodes().localNode() != null) {
assert nodeSpecificClusterState.nodes().masterNode() != null : "received a cluster state without a master";
assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block";
discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) {
return currentState;
}
if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) {
// its a fresh update from the master as we transition from a start of not having a master to having one
logger.debug("got first state from fresh master [{}]", nodeSpecificClusterState.nodes().masterNodeId());
return nodeSpecificClusterState;
}
ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState);
// if the routing table did not change, use the original one
if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) {
builder.routingTable(currentState.routingTable());
}
if (nodeSpecificClusterState.metaData().version() == currentState.metaData().version()) {
builder.metaData(currentState.metaData());
}
return builder.build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
publishResponseHandler.onFailure(discovery.localNode, t);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
sendInitialStateEventIfNeeded();
publishResponseHandler.onResponse(discovery.localNode);
}
});
} else {
publishResponseHandler.onResponse(discovery.localNode);
}
}
TimeValue publishTimeout = discoverySettings.getPublishTimeout();
if (publishTimeout.millis() > 0) {
try {
boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout);
if (!awaited) {
logger.debug("awaiting all nodes to process published state {} timed out, timeout {}", clusterState.version(), publishTimeout);
}
} catch (InterruptedException e) {
// ignore & restore interrupt
Thread.currentThread().interrupt();
}
}
} catch (Exception e) {
// failure to marshal or un-marshal
throw new ElasticsearchIllegalStateException("Cluster state failed to serialize", e);
}
}
private void sendInitialStateEventIfNeeded() {
if (initialStateSent.compareAndSet(false, true)) {
for (InitialStateDiscoveryListener listener : initialStateListeners) {
listener.initialStateProcessed();
}
}
}
private class ClusterGroup {
private Queue<LocalDiscovery> members = ConcurrentCollections.newQueue();
Queue<LocalDiscovery> members() {
return members;
}
}
}