Renamed ConfigManager#isReady() to ConfigManager#start()
Add ConfigManager#stop() Simplified config loading smpt alert action settings are now also updated Original commit: elastic/x-pack-elasticsearch@acb180f88c
This commit is contained in:
parent
920f7ea2a9
commit
1258a4c327
|
@ -251,6 +251,7 @@ public class AlertManager extends AbstractComponent {
|
||||||
actionManager.stop();
|
actionManager.stop();
|
||||||
scheduler.stop();
|
scheduler.stop();
|
||||||
alertsStore.stop();
|
alertsStore.stop();
|
||||||
|
configurationManager.stop();
|
||||||
state.set(State.STOPPED);
|
state.set(State.STOPPED);
|
||||||
logger.info("Alert manager has stopped");
|
logger.info("Alert manager has stopped");
|
||||||
}
|
}
|
||||||
|
@ -260,8 +261,8 @@ public class AlertManager extends AbstractComponent {
|
||||||
if (state.compareAndSet(State.STOPPED, State.STARTING)) {
|
if (state.compareAndSet(State.STOPPED, State.STARTING)) {
|
||||||
ClusterState clusterState = initialState;
|
ClusterState clusterState = initialState;
|
||||||
|
|
||||||
while(true) {
|
while (true) {
|
||||||
if (configurationManager.isReady(initialState)) {
|
if (configurationManager.start(initialState)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
clusterState = newClusterState(clusterState);
|
clusterState = newClusterState(clusterState);
|
||||||
|
|
|
@ -152,7 +152,7 @@ public class AlertsStore extends AbstractComponent {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!configurationManager.isReady(state)) {
|
if (!configurationManager.start(state)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,27 +15,23 @@ import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
|
||||||
import org.elasticsearch.indices.IndexMissingException;
|
import org.elasticsearch.indices.IndexMissingException;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class ConfigurationManager extends AbstractComponent {
|
public class ConfigurationManager extends AbstractComponent {
|
||||||
|
|
||||||
private final Client client;
|
|
||||||
|
|
||||||
public static final String CONFIG_TYPE = "config";
|
public static final String CONFIG_TYPE = "config";
|
||||||
public static final String CONFIG_INDEX = AlertsStore.ALERT_INDEX;
|
public static final String CONFIG_INDEX = AlertsStore.ALERT_INDEX;
|
||||||
public static final String GLOBAL_CONFIG_NAME = "global";
|
public static final String GLOBAL_CONFIG_NAME = "global";
|
||||||
|
|
||||||
|
private final Client client;
|
||||||
|
private volatile boolean started = false;
|
||||||
private final CopyOnWriteArrayList<ConfigurableComponentListener> registeredComponents;
|
private final CopyOnWriteArrayList<ConfigurableComponentListener> registeredComponents;
|
||||||
|
|
||||||
private volatile boolean readyToRead = false;
|
|
||||||
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ConfigurationManager(Settings settings, Client client) {
|
public ConfigurationManager(Settings settings, Client client) {
|
||||||
super(settings);
|
super(settings);
|
||||||
|
@ -48,36 +44,28 @@ public class ConfigurationManager extends AbstractComponent {
|
||||||
* @return The immutable settings loaded from the index
|
* @return The immutable settings loaded from the index
|
||||||
*/
|
*/
|
||||||
public Settings getGlobalConfig() {
|
public Settings getGlobalConfig() {
|
||||||
ensureReady();
|
ensureStarted();
|
||||||
try {
|
try {
|
||||||
client.admin().indices().prepareRefresh(CONFIG_INDEX).get();
|
client.admin().indices().prepareRefresh(CONFIG_INDEX).get();
|
||||||
} catch (IndexMissingException ime) {
|
} catch (IndexMissingException ime) {
|
||||||
logger.info("No index [" + CONFIG_INDEX + "] found");
|
logger.error("No index [" + CONFIG_INDEX + "] found");
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
GetResponse response = client.prepareGet(CONFIG_INDEX, CONFIG_TYPE, GLOBAL_CONFIG_NAME).get();
|
GetResponse response = client.prepareGet(CONFIG_INDEX, CONFIG_TYPE, GLOBAL_CONFIG_NAME).get();
|
||||||
if (!response.isExists()) {
|
if (response.isExists()) {
|
||||||
|
return ImmutableSettings.settingsBuilder().loadFromSource(response.getSourceAsString()).build();
|
||||||
|
} else {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
Map<String, Object> sourceMap = response.getSourceAsMap();
|
|
||||||
ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder();
|
|
||||||
for (Map.Entry<String, Object> configEntry : sourceMap.entrySet() ) {
|
|
||||||
settingsBuilder.put(configEntry.getKey(), configEntry.getValue());
|
|
||||||
}
|
|
||||||
return settingsBuilder.build();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notify the listeners of a new config
|
* Notify the listeners of a new config
|
||||||
|
*
|
||||||
* @param settingsSource
|
* @param settingsSource
|
||||||
*/
|
*/
|
||||||
public void newConfig(BytesReference settingsSource) {
|
public void newConfig(BytesReference settingsSource) throws IOException {
|
||||||
Map<String, Object> settingsMap = XContentHelper.convertToMap(settingsSource, true).v2();
|
Settings settings = ImmutableSettings.settingsBuilder().loadFromSource(settingsSource.toUtf8()).build();
|
||||||
ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder();
|
|
||||||
for (Map.Entry<String, Object> configEntry : settingsMap.entrySet() ) {
|
|
||||||
settingsBuilder.put(configEntry.getKey(), configEntry.getValue());
|
|
||||||
}
|
|
||||||
Settings settings = settingsBuilder.build();
|
|
||||||
for (ConfigurableComponentListener componentListener : registeredComponents) {
|
for (ConfigurableComponentListener componentListener : registeredComponents) {
|
||||||
componentListener.receiveConfigurationUpdate(settings);
|
componentListener.receiveConfigurationUpdate(settings);
|
||||||
}
|
}
|
||||||
|
@ -89,15 +77,21 @@ public class ConfigurationManager extends AbstractComponent {
|
||||||
* @param clusterState
|
* @param clusterState
|
||||||
* @return true if ready to read or false if not
|
* @return true if ready to read or false if not
|
||||||
*/
|
*/
|
||||||
public boolean isReady(ClusterState clusterState) {
|
public boolean start(ClusterState clusterState) {
|
||||||
if (readyToRead) {
|
if (started) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
readyToRead = checkIndexState(clusterState);
|
started = checkIndexState(clusterState);
|
||||||
return readyToRead;
|
return started;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void stop() {
|
||||||
|
// Even though we just check if the config index is started, we need to do it again if alert manager is restarted,
|
||||||
|
// the index may not be available
|
||||||
|
started = false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers an component to receive config updates
|
* Registers an component to receive config updates
|
||||||
*/
|
*/
|
||||||
|
@ -107,8 +101,8 @@ public class ConfigurationManager extends AbstractComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void ensureReady() {
|
private void ensureStarted() {
|
||||||
if (!readyToRead) {
|
if (!started) {
|
||||||
throw new ElasticsearchException("Config index [" + CONFIG_INDEX + "] is not known to be started");
|
throw new ElasticsearchException("Config index [" + CONFIG_INDEX + "] is not known to be started");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -121,7 +115,6 @@ public class ConfigurationManager extends AbstractComponent {
|
||||||
} else {
|
} else {
|
||||||
if (clusterState.routingTable().index(CONFIG_INDEX).allPrimaryShardsActive()) {
|
if (clusterState.routingTable().index(CONFIG_INDEX).allPrimaryShardsActive()) {
|
||||||
logger.info("Index [" + CONFIG_INDEX + "] is started.");
|
logger.info("Index [" + CONFIG_INDEX + "] is started.");
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -109,7 +109,7 @@ public class AlertActionManager extends AbstractComponent {
|
||||||
if (started.get()) {
|
if (started.get()) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (!configurationManager.isReady(state)) {
|
if (!configurationManager.start(state)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@ public class SmtpAlertActionFactory implements AlertActionFactory, ConfigurableC
|
||||||
private static final String PASSWD_SETTING = "alerts.action.email.from.passwd";
|
private static final String PASSWD_SETTING = "alerts.action.email.from.passwd";
|
||||||
|
|
||||||
private final ConfigurationManager configurationManager;
|
private final ConfigurationManager configurationManager;
|
||||||
private Settings settings;
|
private volatile Settings settings;
|
||||||
|
|
||||||
public SmtpAlertActionFactory(ConfigurationManager configurationManager) {
|
public SmtpAlertActionFactory(ConfigurationManager configurationManager) {
|
||||||
this.configurationManager = configurationManager;
|
this.configurationManager = configurationManager;
|
||||||
|
@ -158,6 +158,6 @@ public class SmtpAlertActionFactory implements AlertActionFactory, ConfigurableC
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void receiveConfigurationUpdate(Settings settings) {
|
public void receiveConfigurationUpdate(Settings settings) {
|
||||||
|
this.settings = settings;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,7 @@ public class ConfigTest extends ElasticsearchIntegrationTest {
|
||||||
ConfigurationManager configurationManager = new ConfigurationManager(oldSettings, client());
|
ConfigurationManager configurationManager = new ConfigurationManager(oldSettings, client());
|
||||||
|
|
||||||
|
|
||||||
boolean isReady = configurationManager.isReady(ClusterState.builder(new ClusterName("foobar")).build());
|
boolean isReady = configurationManager.start(ClusterState.builder(new ClusterName("foobar")).build());
|
||||||
assertTrue(isReady); //Should always be ready on a clean start
|
assertTrue(isReady); //Should always be ready on a clean start
|
||||||
|
|
||||||
SettingsListener settingsListener = new SettingsListener();
|
SettingsListener settingsListener = new SettingsListener();
|
||||||
|
@ -85,7 +85,7 @@ public class ConfigTest extends ElasticsearchIntegrationTest {
|
||||||
.get();
|
.get();
|
||||||
assertTrue(indexResponse.isCreated());
|
assertTrue(indexResponse.isCreated());
|
||||||
ConfigurationManager configurationManager = new ConfigurationManager(oldSettings, client());
|
ConfigurationManager configurationManager = new ConfigurationManager(oldSettings, client());
|
||||||
assertTrue(configurationManager.isReady(ClusterState.builder(new ClusterName("foobar")).build()));
|
assertTrue(configurationManager.start(ClusterState.builder(new ClusterName("foobar")).build()));
|
||||||
Settings loadedSettings = configurationManager.getGlobalConfig();
|
Settings loadedSettings = configurationManager.getGlobalConfig();
|
||||||
assertThat(loadedSettings.get("foo"), equalTo(newSettings.get("foo")));
|
assertThat(loadedSettings.get("foo"), equalTo(newSettings.get("foo")));
|
||||||
assertThat(loadedSettings.get("bar"), equalTo(newSettings.get("bar")));
|
assertThat(loadedSettings.get("bar"), equalTo(newSettings.get("bar")));
|
||||||
|
|
Loading…
Reference in New Issue