Changes the configuration manager to rely more on alert store for the .alerts index / templated to be loaded. This simplified to loading logic in the configuration manager.

Original commit: elastic/x-pack-elasticsearch@ccd7a23243
This commit is contained in:
Martijn van Groningen 2014-12-05 16:40:21 +01:00
parent 780c89ec23
commit ac45a4fe99
6 changed files with 21 additions and 80 deletions

View File

@ -56,7 +56,6 @@ public class AlertManager extends AbstractComponent {
private final ClusterService clusterService;
private final ScriptService scriptService;
private final Client client;
private final ConfigurationManager configurationManager;
private final KeyedLock<String> alertLock = new KeyedLock<>();
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
@ -65,8 +64,7 @@ public class AlertManager extends AbstractComponent {
@Inject
public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore,
IndicesService indicesService, TriggerManager triggerManager, AlertActionManager actionManager,
AlertActionRegistry actionRegistry, ThreadPool threadPool, ScriptService scriptService, Client client,
ConfigurationManager configurationManager) {
AlertActionRegistry actionRegistry, ThreadPool threadPool, ScriptService scriptService, Client client) {
super(settings);
this.scheduler = scheduler;
this.threadPool = threadPool;
@ -80,7 +78,6 @@ public class AlertManager extends AbstractComponent {
this.scriptService = scriptService;
this.client = client;
this.configurationManager = configurationManager;
clusterService.add(new AlertsClusterStateListener());
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
@ -251,7 +248,6 @@ public class AlertManager extends AbstractComponent {
actionManager.stop();
scheduler.stop();
alertsStore.stop();
configurationManager.stop();
state.set(State.STOPPED);
logger.info("Alert manager has stopped");
}
@ -261,12 +257,6 @@ public class AlertManager extends AbstractComponent {
if (state.compareAndSet(State.STOPPED, State.STARTING)) {
ClusterState clusterState = initialState;
while (true) {
if (configurationManager.start(initialState)) {
break;
}
clusterState = newClusterState(clusterState);
}
// Try to load alert store before the action manager, b/c action depends on alert store
while (true) {
if (alertsStore.start(clusterState)) {

View File

@ -5,11 +5,8 @@
*/
package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -21,15 +18,17 @@ import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Simple service to get settings that are persisted in the a special document in the .alerts index.
* Also notifies known components about setting changes.
*
* The service requires on the fact that the alert service has been started.
*/
public class ConfigurationManager extends AbstractComponent {
public static final String CONFIG_TYPE = "config";
public static final String CONFIG_INDEX = AlertsStore.ALERT_INDEX;
public static final String GLOBAL_CONFIG_NAME = "global";
private final Client client;
private volatile boolean started = false;
private final CopyOnWriteArrayList<ConfigurableComponentListener> registeredComponents;
@Inject
@ -43,15 +42,14 @@ public class ConfigurationManager extends AbstractComponent {
* This method gets the config
* @return The immutable settings loaded from the index
*/
public Settings getGlobalConfig() {
ensureStarted();
public Settings getConfig() {
try {
client.admin().indices().prepareRefresh(CONFIG_INDEX).get();
client.admin().indices().prepareRefresh(AlertsStore.ALERT_INDEX).get();
} catch (IndexMissingException ime) {
logger.error("No index [" + CONFIG_INDEX + "] found");
logger.error("No index [" + AlertsStore.ALERT_INDEX + "] found");
return null;
}
GetResponse response = client.prepareGet(CONFIG_INDEX, CONFIG_TYPE, GLOBAL_CONFIG_NAME).get();
GetResponse response = client.prepareGet(AlertsStore.ALERT_INDEX, CONFIG_TYPE, GLOBAL_CONFIG_NAME).get();
if (response.isExists()) {
return ImmutableSettings.settingsBuilder().loadFromSource(response.getSourceAsString()).build();
} else {
@ -64,34 +62,13 @@ public class ConfigurationManager extends AbstractComponent {
*
* @param settingsSource
*/
public void newConfig(BytesReference settingsSource) throws IOException {
public void updateConfig(BytesReference settingsSource) throws IOException {
Settings settings = ImmutableSettings.settingsBuilder().loadFromSource(settingsSource.toUtf8()).build();
for (ConfigurableComponentListener componentListener : registeredComponents) {
componentListener.receiveConfigurationUpdate(settings);
}
}
/**
* This method determines if the config manager is ready to start loading configs by checking to make sure the
* config index is in a readable state.
* @param clusterState
* @return true if ready to read or false if not
*/
public boolean start(ClusterState clusterState) {
if (started) {
return true;
} else {
started = checkIndexState(clusterState);
return started;
}
}
public void stop() {
// Even though we just check if the config index is started, we need to do it again if alert manager is restarted,
// the index may not be available
started = false;
}
/**
* Registers an component to receive config updates
*/
@ -100,25 +77,4 @@ public class ConfigurationManager extends AbstractComponent {
registeredComponents.add(configListener);
}
}
private void ensureStarted() {
if (!started) {
throw new ElasticsearchException("Config index [" + CONFIG_INDEX + "] is not known to be started");
}
}
private boolean checkIndexState(ClusterState clusterState) {
IndexMetaData configIndexMetadata = clusterState.getMetaData().index(CONFIG_INDEX);
if (configIndexMetadata == null) {
logger.info("No previous [" + CONFIG_INDEX + "]");
return true;
} else {
if (clusterState.routingTable().index(CONFIG_INDEX).allPrimaryShardsActive()) {
logger.info("Index [" + CONFIG_INDEX + "] is started.");
return true;
} else {
return false;
}
}
}
}

View File

@ -44,7 +44,7 @@ public class SmtpAlertActionFactory implements AlertActionFactory, ConfigurableC
@Override
public AlertAction createAction(XContentParser parser) throws IOException {
if (settings == null) {
settings = configurationManager.getGlobalConfig();
settings = configurationManager.getConfig();
configurationManager.registerListener(this);
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.alerts.transport.actions.config;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.alerts.AlertsStore;
import org.elasticsearch.alerts.ConfigurationManager;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
@ -102,6 +103,6 @@ public class ConfigAlertRequest extends MasterNodeOperationRequest<ConfigAlertRe
@Override
public String toString() {
return "delete {[" + ConfigurationManager.CONFIG_INDEX + "][" + ConfigurationManager.CONFIG_TYPE + "]}";
return "config {[" + AlertsStore.ALERT_INDEX + "][" + ConfigurationManager.CONFIG_TYPE + "]}";
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.alerts.AlertsStore;
import org.elasticsearch.alerts.ConfigurationManager;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
@ -56,10 +57,9 @@ public class TransportConfigAlertAction extends TransportMasterNodeOperationActi
@Override
protected void masterOperation(ConfigAlertRequest request, ClusterState state, ActionListener<ConfigAlertResponse> listener) throws ElasticsearchException {
try {
IndexResponse indexResponse = client.prepareIndex(ConfigurationManager.CONFIG_INDEX, ConfigurationManager.CONFIG_TYPE, ConfigurationManager.GLOBAL_CONFIG_NAME)
IndexResponse indexResponse = client.prepareIndex(AlertsStore.ALERT_INDEX, ConfigurationManager.CONFIG_TYPE, ConfigurationManager.GLOBAL_CONFIG_NAME)
.setSource(request.getConfigSource(), request.isConfigSourceUnsafe()).get();
configManager.newConfig( request.getConfigSource());
configManager.updateConfig(request.getConfigSource());
listener.onResponse(new ConfigAlertResponse(indexResponse));
} catch (Exception e) {
listener.onFailure(e);
@ -69,7 +69,7 @@ public class TransportConfigAlertAction extends TransportMasterNodeOperationActi
@Override
protected ClusterBlockException checkBlock(ConfigAlertRequest request, ClusterState state) {
request.beforeLocalFork(); // This is the best place to make the config source safe
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, new String[]{ConfigurationManager.CONFIG_INDEX});
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, new String[]{AlertsStore.ALERT_INDEX});
}

View File

@ -6,8 +6,6 @@
package org.elasticsearch.alerts;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -40,10 +38,6 @@ public class ConfigTest extends ElasticsearchIntegrationTest {
.build();
ConfigurationManager configurationManager = new ConfigurationManager(oldSettings, client());
boolean isReady = configurationManager.start(ClusterState.builder(new ClusterName("foobar")).build());
assertTrue(isReady); //Should always be ready on a clean start
SettingsListener settingsListener = new SettingsListener();
configurationManager.registerListener(settingsListener);
TimeValue tv2 = TimeValue.timeValueMillis(10);
@ -53,7 +47,7 @@ public class ConfigTest extends ElasticsearchIntegrationTest {
.field("bar", 100)
.field("baz", false);
jsonSettings.endObject();
configurationManager.newConfig(jsonSettings.bytes());
configurationManager.updateConfig(jsonSettings.bytes());
assertThat(settingsListener.settings.getAsTime("foo", new TimeValue(0)).getMillis(), equalTo(tv2.getMillis()));
assertThat(settingsListener.settings.getAsInt("bar", 0), equalTo(100));
@ -80,13 +74,13 @@ public class ConfigTest extends ElasticsearchIntegrationTest {
newSettings.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
jsonBuilder.endObject();
IndexResponse indexResponse = client()
.prepareIndex(ConfigurationManager.CONFIG_INDEX, ConfigurationManager.CONFIG_TYPE, "global")
.prepareIndex(AlertsStore.ALERT_INDEX, ConfigurationManager.CONFIG_TYPE, "global")
.setSource(jsonBuilder)
.get();
assertTrue(indexResponse.isCreated());
ConfigurationManager configurationManager = new ConfigurationManager(oldSettings, client());
assertTrue(configurationManager.start(ClusterState.builder(new ClusterName("foobar")).build()));
Settings loadedSettings = configurationManager.getGlobalConfig();
Settings loadedSettings = configurationManager.getConfig();
assertThat(loadedSettings.get("foo"), equalTo(newSettings.get("foo")));
assertThat(loadedSettings.get("bar"), equalTo(newSettings.get("bar")));
assertThat(loadedSettings.get("baz"), equalTo(newSettings.get("baz")));