mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
Changed the initialization retry logic of the AlertService to be event based instead of blocking (actively polling for a new cluster state).
This avoids that a single thread will be busy during the time that not all primary shards of the alerts and alert history indices are started. Also the execution of alert history items that were loaded during initialization will be executed once the AlertService goes into started state, before this was executed once the AlertActionService has started, which could load to failures, because there was a small window of time where the alert manager wasn't started. Executing alert history items with the state search_needed requires the alert manager to be started and that isn't yet the case when the AlertActionService has started. Closes elastic/elasticsearch#75 Closes elastic/elasticsearch#76 Original commit: elastic/x-pack-elasticsearch@a799bc34e3
This commit is contained in:
parent
54923420d9
commit
69bbea6985
@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.alerts.history.HistoryService;
|
||||
import org.elasticsearch.alerts.scheduler.Scheduler;
|
||||
import org.elasticsearch.alerts.support.Callback;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
@ -92,22 +93,37 @@ public class AlertsService extends AbstractComponent {
|
||||
}
|
||||
}
|
||||
|
||||
private void internalStart(ClusterState initialState) {
|
||||
private void internalStart(ClusterState clusterState) {
|
||||
if (state.compareAndSet(State.STOPPED, State.STARTING)) {
|
||||
logger.info("starting alert service...");
|
||||
alertLockService.start();
|
||||
ClusterState clusterState = initialState;
|
||||
|
||||
// Try to load alert store before the action service, b/c action depends on alert store
|
||||
while (!alertsStore.start(clusterState)) {
|
||||
clusterState = newClusterState(clusterState);
|
||||
}
|
||||
while (!historyService.start(clusterState)) {
|
||||
clusterState = newClusterState(clusterState);
|
||||
}
|
||||
scheduler.start(alertsStore.getAlerts().values());
|
||||
state.set(State.STARTED);
|
||||
logger.info("alert service has started");
|
||||
alertsStore.start(clusterState, new Callback<ClusterState>(){
|
||||
|
||||
@Override
|
||||
public void onSuccess(ClusterState clusterState) {
|
||||
historyService.start(clusterState, new Callback<ClusterState>() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(ClusterState clusterState) {
|
||||
scheduler.start(alertsStore.getAlerts().values());
|
||||
state.set(State.STARTED);
|
||||
logger.info("alert service has started");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
logger.error("failed to start alert service", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
logger.error("failed to start alert service", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -15,9 +15,13 @@ import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.alerts.support.Callback;
|
||||
import org.elasticsearch.alerts.support.TemplateUtils;
|
||||
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
@ -27,11 +31,13 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
*/
|
||||
@ -44,28 +50,35 @@ public class AlertsStore extends AbstractComponent {
|
||||
private final ClientProxy client;
|
||||
private final TemplateUtils templateUtils;
|
||||
private final Alert.Parser alertParser;
|
||||
private final ClusterService clusterService;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final ConcurrentMap<String, Alert> alertMap;
|
||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||
private final AtomicInteger initializationRetries = new AtomicInteger();
|
||||
|
||||
private final int scrollSize;
|
||||
private final TimeValue scrollTimeout;
|
||||
|
||||
@Inject
|
||||
public AlertsStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, Alert.Parser alertParser) {
|
||||
public AlertsStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, Alert.Parser alertParser,
|
||||
ClusterService clusterService, ThreadPool threadPool) {
|
||||
super(settings);
|
||||
this.client = client;
|
||||
this.templateUtils = templateUtils;
|
||||
this.alertParser = alertParser;
|
||||
this.clusterService = clusterService;
|
||||
this.threadPool = threadPool;
|
||||
this.alertMap = ConcurrentCollections.newConcurrentMap();
|
||||
|
||||
this.scrollTimeout = componentSettings.getAsTime("scroll.timeout", TimeValue.timeValueSeconds(30));
|
||||
this.scrollSize = componentSettings.getAsInt("scroll.size", 100);
|
||||
}
|
||||
|
||||
public boolean start(ClusterState state) {
|
||||
public void start(ClusterState state, Callback<ClusterState> callback) {
|
||||
if (started.get()) {
|
||||
return true;
|
||||
callback.onSuccess(state);
|
||||
return;
|
||||
}
|
||||
|
||||
IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX);
|
||||
@ -73,25 +86,28 @@ public class AlertsStore extends AbstractComponent {
|
||||
logger.trace("alerts index [{}] was not found. skipping alerts loading...", ALERT_INDEX);
|
||||
templateUtils.ensureIndexTemplateIsLoaded(state, ALERT_INDEX_TEMPLATE);
|
||||
started.set(true);
|
||||
return true;
|
||||
callback.onSuccess(state);
|
||||
return;
|
||||
}
|
||||
|
||||
if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
|
||||
logger.debug("alerts index [{}] found with all active primary shards. loading alerts...", ALERT_INDEX);
|
||||
try {
|
||||
int count = loadAlerts(client, scrollSize, scrollTimeout, alertIndexMetaData.numberOfShards(), alertParser, alertMap);
|
||||
logger.info("loaded [{}] alerts from the alert index [{}]", count, ALERT_INDEX);
|
||||
logger.debug("loaded [{}] alerts from the alert index [{}]", count, ALERT_INDEX);
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to load alerts for alert index [{}]. scheduled to retry alert loading...", e, ALERT_INDEX);
|
||||
logger.debug("failed to load alerts for alert index [{}]. scheduled to retry alert loading...", e, ALERT_INDEX);
|
||||
alertMap.clear();
|
||||
return false;
|
||||
retry(callback);
|
||||
return;
|
||||
}
|
||||
templateUtils.ensureIndexTemplateIsLoaded(state, ALERT_INDEX_TEMPLATE);
|
||||
started.set(true);
|
||||
return true;
|
||||
callback.onSuccess(state);
|
||||
} else {
|
||||
logger.warn("not all primary shards of the alerts index [{}] are started. scheduled to retry alert loading...", ALERT_INDEX);
|
||||
retry(callback);
|
||||
}
|
||||
logger.warn("not all primary shards of the alerts index [{}] are started. scheduled to retry alert loading...", ALERT_INDEX);
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean started() {
|
||||
@ -176,6 +192,36 @@ public class AlertsStore extends AbstractComponent {
|
||||
return indexRequest;
|
||||
}
|
||||
|
||||
private void retry(final Callback<ClusterState> callback) {
|
||||
ClusterStateListener clusterStateListener = new ClusterStateListener() {
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
final ClusterState state = event.state();
|
||||
IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX);
|
||||
if (alertIndexMetaData != null) {
|
||||
if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
|
||||
// Remove listener, so that it doesn't get called on the next cluster state update:
|
||||
assert initializationRetries.decrementAndGet() == 0 : "Only one retry can run at the time";
|
||||
clusterService.remove(this);
|
||||
// We fork into another thread, because start(...) is expensive and we can't call this from the cluster update thread.
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
start(state, callback);
|
||||
} catch (Exception e) {
|
||||
callback.onFailure(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
clusterService.add(clusterStateListener);
|
||||
assert initializationRetries.incrementAndGet() == 1 : "Only one retry can run at the time";
|
||||
}
|
||||
|
||||
/**
|
||||
* scrolls all the alert documents in the alerts index, parses them, and loads them into
|
||||
* the given map.
|
||||
|
@ -8,11 +8,15 @@ package org.elasticsearch.alerts.history;
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.alerts.*;
|
||||
import org.elasticsearch.alerts.actions.Action;
|
||||
import org.elasticsearch.alerts.condition.Condition;
|
||||
import org.elasticsearch.alerts.scheduler.Scheduler;
|
||||
import org.elasticsearch.alerts.support.Callback;
|
||||
import org.elasticsearch.alerts.throttle.Throttler;
|
||||
import org.elasticsearch.alerts.transform.Transform;
|
||||
import org.elasticsearch.alerts.condition.Condition;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
@ -26,6 +30,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
*/
|
||||
@ -34,33 +39,39 @@ public class HistoryService extends AbstractComponent {
|
||||
private final HistoryStore historyStore;
|
||||
private final ThreadPool threadPool;
|
||||
private final AlertsStore alertsStore;
|
||||
private final ClusterService clusterService;
|
||||
private final AlertLockService alertLockService;
|
||||
|
||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||
private final AtomicInteger initializationRetries = new AtomicInteger();
|
||||
|
||||
// Holds fired alerts that were fired before on a different elected master node, but never had the chance to run.
|
||||
private volatile ImmutableList<FiredAlert> previousFiredAlerts = ImmutableList.of();
|
||||
|
||||
@Inject
|
||||
public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool,
|
||||
AlertsStore alertsStore, AlertLockService alertLockService, Scheduler scheduler) {
|
||||
AlertsStore alertsStore, AlertLockService alertLockService, Scheduler scheduler,
|
||||
ClusterService clusterService) {
|
||||
super(settings);
|
||||
this.historyStore = historyStore;
|
||||
this.threadPool = threadPool;
|
||||
this.alertsStore = alertsStore;
|
||||
this.alertLockService = alertLockService;
|
||||
this.clusterService = clusterService;
|
||||
scheduler.addListener(new SchedulerListener());
|
||||
}
|
||||
|
||||
public boolean start(ClusterState state) {
|
||||
public void start(ClusterState state, Callback<ClusterState> callback) {
|
||||
if (started.get()) {
|
||||
return true;
|
||||
callback.onSuccess(state);
|
||||
return;
|
||||
}
|
||||
|
||||
assert alertsThreadPool().getQueue().isEmpty() : "queue should be empty, but contains " + alertsThreadPool().getQueue().size() + " elements.";
|
||||
HistoryStore.LoadResult loadResult = historyStore.loadFiredAlerts(state, FiredAlert.State.AWAITS_EXECUTION);
|
||||
if (!loadResult.succeeded()) {
|
||||
return false;
|
||||
retry(callback);
|
||||
return;
|
||||
}
|
||||
this.previousFiredAlerts = ImmutableList.copyOf(loadResult);
|
||||
if (!previousFiredAlerts.isEmpty()) {
|
||||
@ -92,7 +103,7 @@ public class HistoryService extends AbstractComponent {
|
||||
logger.debug("started history service");
|
||||
}
|
||||
executePreviouslyFiredAlerts();
|
||||
return true;
|
||||
callback.onSuccess(state);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
@ -162,6 +173,32 @@ public class HistoryService extends AbstractComponent {
|
||||
return (EsThreadPoolExecutor) threadPool.executor(AlertsPlugin.NAME);
|
||||
}
|
||||
|
||||
private void retry(final Callback<ClusterState> callback) {
|
||||
ClusterStateListener clusterStateListener = new ClusterStateListener() {
|
||||
|
||||
@Override
|
||||
public void clusterChanged(final ClusterChangedEvent event) {
|
||||
// Remove listener, so that it doesn't get called on the next cluster state update:
|
||||
assert initializationRetries.decrementAndGet() == 0 : "Only one retry can run at the time";
|
||||
clusterService.remove(this);
|
||||
// We fork into another thread, because start(...) is expensive and we can't call this from the cluster update thread.
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
start(event.state(), callback);
|
||||
} catch (Exception e) {
|
||||
callback.onFailure(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
assert initializationRetries.incrementAndGet() == 1 : "Only one retry can run at the time";
|
||||
clusterService.add(clusterStateListener);
|
||||
}
|
||||
|
||||
private final class AlertExecutionTask implements Runnable {
|
||||
|
||||
private final FiredAlert firedAlert;
|
||||
|
@ -88,7 +88,7 @@ public class HistoryStore extends AbstractComponent {
|
||||
public LoadResult loadFiredAlerts(ClusterState state, FiredAlert.State firedAlertState) {
|
||||
String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*");
|
||||
if (indices.length == 0) {
|
||||
logger.info("No .alert_history indices found, skip loading of alert actions");
|
||||
logger.debug("No .alert_history indices found, skip loading of alert actions");
|
||||
templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory");
|
||||
return new LoadResult(true);
|
||||
}
|
||||
@ -97,7 +97,7 @@ public class HistoryStore extends AbstractComponent {
|
||||
IndexMetaData indexMetaData = state.getMetaData().index(index);
|
||||
if (indexMetaData != null) {
|
||||
if (!state.routingTable().index(index).allPrimaryShardsActive()) {
|
||||
logger.warn("Not all primary shards of the [{}] index are started. Schedule to retry alert action loading..", index);
|
||||
logger.debug("Not all primary shards of the [{}] index are started. Schedule to retry alert action loading..", index);
|
||||
return new LoadResult(false);
|
||||
} else {
|
||||
numPrimaryShards += indexMetaData.numberOfShards();
|
||||
|
15
src/main/java/org/elasticsearch/alerts/support/Callback.java
Normal file
15
src/main/java/org/elasticsearch/alerts/support/Callback.java
Normal file
@ -0,0 +1,15 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.alerts.support;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface Callback<T> {
|
||||
|
||||
void onSuccess(T t);
|
||||
|
||||
void onFailure(Throwable e);
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user