diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java index e3de203c9b6..13fde0ded69 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java @@ -251,6 +251,7 @@ public class AlertManager extends AbstractComponent { actionManager.stop(); scheduler.stop(); alertsStore.stop(); + configurationManager.stop(); state.set(State.STOPPED); logger.info("Alert manager has stopped"); } @@ -260,8 +261,8 @@ public class AlertManager extends AbstractComponent { if (state.compareAndSet(State.STOPPED, State.STARTING)) { ClusterState clusterState = initialState; - while(true) { - if (configurationManager.isReady(initialState)) { + while (true) { + if (configurationManager.start(initialState)) { break; } clusterState = newClusterState(clusterState); diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index 2197ae09c4f..f612ecca3ef 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -152,7 +152,7 @@ public class AlertsStore extends AbstractComponent { return true; } - if (!configurationManager.isReady(state)) { + if (!configurationManager.start(state)) { return false; } diff --git a/src/main/java/org/elasticsearch/alerts/ConfigurationManager.java b/src/main/java/org/elasticsearch/alerts/ConfigurationManager.java index d4186691354..d43a007f538 100644 --- a/src/main/java/org/elasticsearch/alerts/ConfigurationManager.java +++ b/src/main/java/org/elasticsearch/alerts/ConfigurationManager.java @@ -15,27 +15,23 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.indices.IndexMissingException; -import java.util.Map; +import java.io.IOException; import java.util.concurrent.CopyOnWriteArrayList; /** */ public class ConfigurationManager extends AbstractComponent { - private final Client client; - public static final String CONFIG_TYPE = "config"; public static final String CONFIG_INDEX = AlertsStore.ALERT_INDEX; public static final String GLOBAL_CONFIG_NAME = "global"; + private final Client client; + private volatile boolean started = false; private final CopyOnWriteArrayList registeredComponents; - private volatile boolean readyToRead = false; - - @Inject public ConfigurationManager(Settings settings, Client client) { super(settings); @@ -48,36 +44,28 @@ public class ConfigurationManager extends AbstractComponent { * @return The immutable settings loaded from the index */ public Settings getGlobalConfig() { - ensureReady(); + ensureStarted(); try { client.admin().indices().prepareRefresh(CONFIG_INDEX).get(); } catch (IndexMissingException ime) { - logger.info("No index [" + CONFIG_INDEX + "] found"); + logger.error("No index [" + CONFIG_INDEX + "] found"); return null; } GetResponse response = client.prepareGet(CONFIG_INDEX, CONFIG_TYPE, GLOBAL_CONFIG_NAME).get(); - if (!response.isExists()) { + if (response.isExists()) { + return ImmutableSettings.settingsBuilder().loadFromSource(response.getSourceAsString()).build(); + } else { return null; } - Map sourceMap = response.getSourceAsMap(); - ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder(); - for (Map.Entry configEntry : sourceMap.entrySet() ) { - settingsBuilder.put(configEntry.getKey(), configEntry.getValue()); - } - return settingsBuilder.build(); } /** * Notify the listeners of a new config + * * @param settingsSource */ - public void newConfig(BytesReference settingsSource) { - Map settingsMap = XContentHelper.convertToMap(settingsSource, true).v2(); - ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder(); - for (Map.Entry configEntry : settingsMap.entrySet() ) { - settingsBuilder.put(configEntry.getKey(), configEntry.getValue()); - } - Settings settings = settingsBuilder.build(); + public void newConfig(BytesReference settingsSource) throws IOException { + Settings settings = ImmutableSettings.settingsBuilder().loadFromSource(settingsSource.toUtf8()).build(); for (ConfigurableComponentListener componentListener : registeredComponents) { componentListener.receiveConfigurationUpdate(settings); } @@ -89,15 +77,21 @@ public class ConfigurationManager extends AbstractComponent { * @param clusterState * @return true if ready to read or false if not */ - public boolean isReady(ClusterState clusterState) { - if (readyToRead) { + public boolean start(ClusterState clusterState) { + if (started) { return true; } else { - readyToRead = checkIndexState(clusterState); - return readyToRead; + started = checkIndexState(clusterState); + return started; } } + public void stop() { + // Even though we just check if the config index is started, we need to do it again if alert manager is restarted, + // the index may not be available + started = false; + } + /** * Registers an component to receive config updates */ @@ -107,8 +101,8 @@ public class ConfigurationManager extends AbstractComponent { } } - private void ensureReady() { - if (!readyToRead) { + private void ensureStarted() { + if (!started) { throw new ElasticsearchException("Config index [" + CONFIG_INDEX + "] is not known to be started"); } } @@ -121,7 +115,6 @@ public class ConfigurationManager extends AbstractComponent { } else { if (clusterState.routingTable().index(CONFIG_INDEX).allPrimaryShardsActive()) { logger.info("Index [" + CONFIG_INDEX + "] is started."); - return true; } else { return false; diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java index 7c70dc6d7cb..e11826f6cb1 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java @@ -109,7 +109,7 @@ public class AlertActionManager extends AbstractComponent { if (started.get()) { return true; } - if (!configurationManager.isReady(state)) { + if (!configurationManager.start(state)) { return false; } diff --git a/src/main/java/org/elasticsearch/alerts/actions/SmtpAlertActionFactory.java b/src/main/java/org/elasticsearch/alerts/actions/SmtpAlertActionFactory.java index fb55e6877fc..9423b9812da 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/SmtpAlertActionFactory.java +++ b/src/main/java/org/elasticsearch/alerts/actions/SmtpAlertActionFactory.java @@ -35,7 +35,7 @@ public class SmtpAlertActionFactory implements AlertActionFactory, ConfigurableC private static final String PASSWD_SETTING = "alerts.action.email.from.passwd"; private final ConfigurationManager configurationManager; - private Settings settings; + private volatile Settings settings; public SmtpAlertActionFactory(ConfigurationManager configurationManager) { this.configurationManager = configurationManager; @@ -158,6 +158,6 @@ public class SmtpAlertActionFactory implements AlertActionFactory, ConfigurableC @Override public void receiveConfigurationUpdate(Settings settings) { - + this.settings = settings; } } diff --git a/src/test/java/org/elasticsearch/alerts/ConfigTest.java b/src/test/java/org/elasticsearch/alerts/ConfigTest.java index 4c1da4bb324..a3c447b6ee6 100644 --- a/src/test/java/org/elasticsearch/alerts/ConfigTest.java +++ b/src/test/java/org/elasticsearch/alerts/ConfigTest.java @@ -41,7 +41,7 @@ public class ConfigTest extends ElasticsearchIntegrationTest { ConfigurationManager configurationManager = new ConfigurationManager(oldSettings, client()); - boolean isReady = configurationManager.isReady(ClusterState.builder(new ClusterName("foobar")).build()); + boolean isReady = configurationManager.start(ClusterState.builder(new ClusterName("foobar")).build()); assertTrue(isReady); //Should always be ready on a clean start SettingsListener settingsListener = new SettingsListener(); @@ -85,7 +85,7 @@ public class ConfigTest extends ElasticsearchIntegrationTest { .get(); assertTrue(indexResponse.isCreated()); ConfigurationManager configurationManager = new ConfigurationManager(oldSettings, client()); - assertTrue(configurationManager.isReady(ClusterState.builder(new ClusterName("foobar")).build())); + assertTrue(configurationManager.start(ClusterState.builder(new ClusterName("foobar")).build())); Settings loadedSettings = configurationManager.getGlobalConfig(); assertThat(loadedSettings.get("foo"), equalTo(newSettings.get("foo"))); assertThat(loadedSettings.get("bar"), equalTo(newSettings.get("bar")));