Move the bootstrap logic out of AlertService which will make it easier to test.

Original commit: elastic/x-pack-elasticsearch@bf5c47dd5f
This commit is contained in:
Martijn van Groningen 2015-02-26 10:47:13 +01:00
parent aae6ff834f
commit bebfbf9664
4 changed files with 109 additions and 106 deletions

View File

@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
/**
*/
public class AlertBootstrap extends AbstractComponent implements ClusterStateListener {
private final ThreadPool threadPool;
private final AlertsService alertsService;
private final ClusterService clusterService;
private volatile boolean manuallyStopped;
@Inject
public AlertBootstrap(Settings settings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, AlertsService alertsService) {
super(settings);
this.clusterService = clusterService;
this.threadPool = threadPool;
this.alertsService = alertsService;
clusterService.add(this);
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
// happen because we're shutting down and an alert is scheduled.
indicesService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
AlertBootstrap.this.alertsService.stop();
}
});
manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true);
}
public void start() {
manuallyStopped = false;
alertsService.start(clusterService.state());
}
public void stop() {
manuallyStopped = true;
alertsService.stop();
}
@Override
public void clusterChanged(final ClusterChangedEvent event) {
if (!event.localNodeMaster()) {
// We're no longer the master so we need to stop alerting.
// Stopping alerting may take a while since it will wait on the scheduler to complete shutdown,
// so we fork here so that we don't wait too long. Other events may need to be processed and
// other cluster state listeners may need to be executed as well for this event.
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
alertsService.stop();
}
});
} else {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have .alerts and
// a .alerts_history index, but they may not have been restored from the cluster state on disk
return;
}
if (alertsService.state() == AlertsService.State.STOPPED && !manuallyStopped) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
alertsService.start(event.state());
}
});
}
}
}
}

View File

@ -47,6 +47,7 @@ public class AlertsModule extends AbstractModule implements SpawnModules {
bind(Alert.Parser.class).asEagerSingleton();
bind(AlertLockService.class).asEagerSingleton();
bind(AlertBootstrap.class).asEagerSingleton();
bind(AlertsService.class).asEagerSingleton();
bind(AlertsStore.class).asEagerSingleton();
bind(TemplateUtils.class).asEagerSingleton();

View File

@ -11,18 +11,11 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.alerts.history.HistoryService;
import org.elasticsearch.alerts.scheduler.Scheduler;
import org.elasticsearch.alerts.support.Callback;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
@ -32,76 +25,21 @@ public class AlertsService extends AbstractComponent {
private final Scheduler scheduler;
private final AlertsStore alertsStore;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final AlertLockService alertLockService;
private final HistoryService historyService;
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
private volatile boolean manuallyStopped;
@Inject
public AlertsService(Settings settings, ClusterService clusterService, Scheduler scheduler, AlertsStore alertsStore,
IndicesService indicesService, HistoryService historyService, ThreadPool threadPool,
public AlertsService(Settings settings, Scheduler scheduler, AlertsStore alertsStore, HistoryService historyService,
AlertLockService alertLockService) {
super(settings);
this.scheduler = scheduler;
this.threadPool = threadPool;
this.alertsStore = alertsStore;
this.clusterService = clusterService;
this.alertLockService = alertLockService;
this.historyService = historyService;
clusterService.add(new AlertsClusterStateListener());
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
// happen because we're shutting down and an alert is scheduled.
indicesService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
stop();
}
});
manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true);
}
// TODO: consider making this adding start/stop lock
// The currently mechanism isn't broken, but in tests it is annoying that if stop has been invoked concurrently
// and the first invocation sets the state to STOPPING then the second invocation will just return, because the
// first invocation will do the work to stop alerts plugin. If the second invocation was caused by a test teardown
// then the thread lead detection will fail, because it assumes that everything should have been stopped & closed.
// This isn't the case in the situation, although this will happen, but to late for the thread leak detection to
// not see it as a failure.
/**
* Manually starts alerting if not already started
*/
public void start() {
manuallyStopped = false;
ClusterState state = clusterService.state();
internalStart(state);
}
/**
* Manually stops alerting if not already stopped.
*/
public void stop() {
manuallyStopped = true;
internalStop();
}
private void internalStop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
logger.info("stopping alert service...");
alertLockService.stop();
historyService.stop();
scheduler.stop();
alertsStore.stop();
state.set(State.STOPPED);
logger.info("alert service has stopped");
}
}
private void internalStart(ClusterState clusterState) {
public void start(ClusterState clusterState) {
if (state.compareAndSet(State.STOPPED, State.STARTING)) {
logger.info("starting alert service...");
alertLockService.start();
@ -135,6 +73,18 @@ public class AlertsService extends AbstractComponent {
}
}
public void stop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
logger.info("stopping alert service...");
alertLockService.stop();
historyService.stop();
scheduler.stop();
alertsStore.stop();
state.set(State.STOPPED);
logger.info("alert service has stopped");
}
}
public AlertsStore.AlertDelete deleteAlert(String name) throws InterruptedException, ExecutionException {
ensureStarted();
AlertLockService.Lock lock = alertLockService.acquire(name);
@ -209,40 +159,6 @@ public class AlertsService extends AbstractComponent {
}
}
private final class AlertsClusterStateListener implements ClusterStateListener {
@Override
public void clusterChanged(final ClusterChangedEvent event) {
if (!event.localNodeMaster()) {
// We're no longer the master so we need to stop alerting.
// Stopping alerting may take a while since it will wait on the scheduler to complete shutdown,
// so we fork here so that we don't wait too long. Other events may need to be processed and
// other cluster state listeners may need to be executed as well for this event.
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
internalStop();
}
});
} else {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have .alerts and
// a .alerts_history index, but they may not have been restored from the cluster state on disk
return;
}
if (state.get() == State.STOPPED && !manuallyStopped) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
internalStart(event.state());
}
});
}
}
}
}
/**
* Encapsulates the state of the alerts plugin.
*/

View File

@ -10,7 +10,7 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.alerts.AlertsService;
import org.elasticsearch.alerts.AlertBootstrap;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -24,12 +24,12 @@ import org.elasticsearch.transport.TransportService;
*/
public class TransportAlertsServiceAction extends TransportMasterNodeOperationAction<AlertsServiceRequest, AlertsServiceResponse> {
private final AlertsService alertsService;
private final AlertBootstrap alertBootstrap;
@Inject
public TransportAlertsServiceAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, AlertsService alertsService) {
public TransportAlertsServiceAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, AlertBootstrap alertBootstrap) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters);
this.alertsService = alertsService;
this.alertBootstrap = alertBootstrap;
}
@Override
@ -51,14 +51,14 @@ public class TransportAlertsServiceAction extends TransportMasterNodeOperationAc
protected void masterOperation(AlertsServiceRequest request, ClusterState state, ActionListener<AlertsServiceResponse> listener) throws ElasticsearchException {
switch (request.getCommand()) {
case "start":
alertsService.start();
alertBootstrap.start();
break;
case "stop":
alertsService.stop();
alertBootstrap.stop();
break;
case "restart":
alertsService.start();
alertsService.stop();
alertBootstrap.start();
alertBootstrap.stop();
break;
default:
listener.onFailure(new ElasticsearchIllegalArgumentException("Command [" + request.getCommand() + "] is undefined"));