Add actions and listeners for Configuration.

Add transport actions for configuration.
Add Listener so that components can listen for config changes.

Original commit: elastic/x-pack-elasticsearch@bab02770d9
This commit is contained in:
Brian Murphy 2014-12-03 13:53:25 +00:00
parent d3ec7f40ba
commit 31173507a5
16 changed files with 510 additions and 76 deletions

View File

@ -149,8 +149,7 @@ public class AlertManager extends AbstractComponent {
}
private void loadSettings() {
Settings indexedSettings = configurationManager.getGlobalConfig();
manuallyStopped = !configurationManager.getOverriddenBooleanValue("alerts.start_immediately", indexedSettings, true);
manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true);
}
public TriggerResult executeAlert(AlertActionEntry entry) throws IOException {
@ -267,7 +266,6 @@ public class AlertManager extends AbstractComponent {
while(true) {
if (configurationManager.isReady(initialState)) {
loadSettings();
break;
}
clusterState = newClusterState(clusterState);

View File

@ -13,6 +13,7 @@ import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.rest.*;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.alerts.transport.actions.ack.TransportAckAlertAction;
import org.elasticsearch.alerts.transport.actions.config.TransportConfigAlertAction;
import org.elasticsearch.alerts.transport.actions.delete.TransportDeleteAlertAction;
import org.elasticsearch.alerts.transport.actions.get.TransportGetAlertAction;
import org.elasticsearch.alerts.transport.actions.put.TransportPutAlertAction;
@ -43,6 +44,7 @@ public class AlertingModule extends AbstractModule {
bind(TransportAlertStatsAction.class).asEagerSingleton();
bind(TransportAckAlertAction.class).asEagerSingleton();
bind(TransportAlertsServiceAction.class).asEagerSingleton();
bind(TransportConfigAlertAction.class).asEagerSingleton();
bind(AlertsClient.class).to(NodeAlertsClient.class).asEagerSingleton();
// Rest layer

View File

@ -67,8 +67,8 @@ public class AlertsStore extends AbstractComponent {
private final ConfigurationManager configurationManager;
private final AtomicBoolean started = new AtomicBoolean(false);
private int scrollSize;
private TimeValue scrollTimeout;
private final int scrollSize;
private final TimeValue scrollTimeout;
@Inject
public AlertsStore(Settings settings, Client client, AlertActionRegistry alertActionRegistry,
@ -80,6 +80,10 @@ public class AlertsStore extends AbstractComponent {
this.alertMap = ConcurrentCollections.newConcurrentMap();
this.triggerManager = triggerManager;
this.configurationManager = configurationManager;
// Not using component settings, to let AlertsStore and AlertActionManager share the same settings
this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30));
this.scrollSize = settings.getAsInt("alerts.scroll.size", 100);
}
/**
@ -122,13 +126,6 @@ public class AlertsStore extends AbstractComponent {
return true;
}
private void loadSettings() {
// Not using component settings, to let AlertsStore and AlertActionManager share the same settings
Settings indexedSettings = configurationManager.getGlobalConfig();
this.scrollTimeout = configurationManager.getOverriddenTimeValue("alerts.scroll.timeout", indexedSettings, TimeValue.timeValueSeconds(30));
this.scrollSize = configurationManager.getOverriddenIntValue("alerts.scroll.size", indexedSettings, 100);
}
/**
* Deletes the alert with the specified name if exists
*/
@ -155,9 +152,7 @@ public class AlertsStore extends AbstractComponent {
return true;
}
if (configurationManager.isReady(state)) {
loadSettings();
} else {
if (!configurationManager.isReady(state)) {
return false;
}

View File

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.common.settings.Settings;
/**
* This interface allows a component to register itself for configuration updates
*/
public interface ConfigurableComponentListener {
void receiveConfigurationUpdate(Settings settings);
}

View File

@ -10,14 +10,18 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.indices.IndexMissingException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
/**
*/
@ -28,14 +32,14 @@ public class ConfigurationManager extends AbstractComponent {
public static final String CONFIG_TYPE = "config";
public static final String CONFIG_INDEX = AlertsStore.ALERT_INDEX;
private final String GLOBAL_CONFIG_NAME = "global";
private final Settings settings;
private volatile boolean readyToRead = false;
private volatile ImmutableOpenMap<String, List<ConfigurableComponentListener>> componentNameToListener;
@Inject
public ConfigurationManager(Settings settings, Client client) {
super(settings);
this.client = client;
this.settings = settings;
componentNameToListener = ImmutableOpenMap.<String, List<ConfigurableComponentListener>>builder().build();
}
/**
@ -68,35 +72,22 @@ public class ConfigurationManager extends AbstractComponent {
}
/**
* This method looks in the indexed settings provided for a setting and if it's not there it will go to the
* this.settings and load it from there using the default if not found
* Notify the listeners of a new config
* @param componentName
* @param settingsSource
*/
public TimeValue getOverriddenTimeValue(String settingName, Settings indexedSettings, TimeValue defaultValue) {
if (indexedSettings == null || indexedSettings.get(settingName) == null) {
return settings.getAsTime(settingName, defaultValue);
} else {
return indexedSettings.getAsTime(settingName, defaultValue);
public void newConfig(String componentName, BytesReference settingsSource) {
Map<String, Object> settingsMap = XContentHelper.convertToMap(settingsSource, true).v2();
ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder();
for (Map.Entry<String, Object> configEntry : settingsMap.entrySet() ) {
settingsBuilder.put(configEntry.getKey(), configEntry.getValue());
}
Settings settings = settingsBuilder.build();
for (ConfigurableComponentListener componentListener : componentNameToListener.get(componentName)) {
componentListener.receiveConfigurationUpdate(settings);
}
}
public int getOverriddenIntValue(String settingName, Settings indexedSettings, int defaultValue) {
if (indexedSettings == null || indexedSettings.get(settingName) == null) {
return settings.getAsInt(settingName, defaultValue);
} else {
return indexedSettings.getAsInt(settingName, defaultValue);
}
}
public boolean getOverriddenBooleanValue(String settingName, Settings indexedSettings, boolean defaultValue) {
if (indexedSettings == null || indexedSettings.get(settingName) == null) {
return settings.getAsBoolean(settingName, defaultValue);
} else {
return indexedSettings.getAsBoolean(settingName, defaultValue);
}
}
/**
* This method determines if the config manager is ready to start loading configs by checking to make sure the
* config index is in a readable state.
@ -134,4 +125,21 @@ public class ConfigurationManager extends AbstractComponent {
}
}
}
/**
* Registers an component to receive config updates
* @param componentName
* @param configListener
*/
public synchronized void registerListener(String componentName, ConfigurableComponentListener configListener) {
if (componentNameToListener.get(componentName) == null ){
List<ConfigurableComponentListener> componentListeners = new CopyOnWriteArrayList<>();
componentListeners.add(configListener);
ImmutableOpenMap.Builder componentNameToListenerBuilder = ImmutableOpenMap.builder(componentNameToListener)
.fPut(componentName, componentListeners);
componentNameToListener = componentNameToListenerBuilder.build();
} else if (!componentNameToListener.get(componentName).contains(configListener)) {
componentNameToListener.get(componentName).add(configListener);
}
}
}

View File

@ -95,13 +95,10 @@ public class AlertActionManager extends AbstractComponent {
this.triggerManager = triggerManager;
this.templateHelper = templateHelper;
this.configurationManager = configurationManager;
}
private void loadSettings() {
// Not using component settings, to let AlertsStore and AlertActionManager share the same settings
Settings indexedSettings = configurationManager.getGlobalConfig();
this.scrollTimeout = configurationManager.getOverriddenTimeValue("alerts.scroll.timeout", indexedSettings, TimeValue.timeValueSeconds(30));
this.scrollSize = configurationManager.getOverriddenIntValue("alerts.scroll.size", indexedSettings, 100);
this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30));
this.scrollSize = settings.getAsInt("alerts.scroll.size", 100);
}
public void setAlertManager(AlertManager alertManager){
@ -112,9 +109,7 @@ public class AlertActionManager extends AbstractComponent {
if (started.get()) {
return true;
}
if (configurationManager.isReady(state)) {
loadSettings();
} else {
if (!configurationManager.isReady(state)) {
return false;
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.ConfigurableComponentListener;
import org.elasticsearch.alerts.ConfigurationManager;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.settings.Settings;
@ -24,7 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
public class EmailAlertActionFactory implements AlertActionFactory {
public class EmailAlertActionFactory implements AlertActionFactory, ConfigurableComponentListener {
private static final String GLOBAL_EMAIL_CONFIG = "email";
@ -34,6 +35,7 @@ public class EmailAlertActionFactory implements AlertActionFactory {
private static final String PASSWD_SETTING = "from.passwd";
private final ConfigurationManager configurationManager;
private Settings settings;
public EmailAlertActionFactory(ConfigurationManager configurationManager) {
this.configurationManager = configurationManager;
@ -82,19 +84,27 @@ public class EmailAlertActionFactory implements AlertActionFactory {
throw new ElasticsearchIllegalStateException("Bad action [" + action.getClass() + "] passed to EmailAlertActionFactory expected [" + EmailAlertAction.class + "]");
}
EmailAlertAction emailAlertAction = (EmailAlertAction)action;
final Settings emailSettings = configurationManager.getConfigForComponent(GLOBAL_EMAIL_CONFIG);
if (settings == null) {
settings = configurationManager.getConfigForComponent(GLOBAL_EMAIL_CONFIG);
configurationManager.registerListener(GLOBAL_EMAIL_CONFIG, this);
}
if (settings == null) {
throw new ElasticsearchException("Unable to retrieve [" + GLOBAL_EMAIL_CONFIG + "] from the config index.");
}
Properties props = new Properties();
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.starttls.enable", "true");
props.put("mail.smtp.host", emailSettings.get(SERVER_SETTING, "smtp.gmail.com"));
props.put("mail.smtp.port", emailSettings.getAsInt(PORT_SETTING, 587));
Session session;
if (emailSettings.get(PASSWD_SETTING) != null) {
props.put("mail.smtp.host", settings.get(SERVER_SETTING, "smtp.gmail.com"));
props.put("mail.smtp.port", settings.getAsInt(PORT_SETTING, 587));
final Session session;
if (settings.get(PASSWD_SETTING) != null) {
session = Session.getInstance(props,
new javax.mail.Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(emailSettings.get(FROM_SETTING), emailSettings.get(PASSWD_SETTING));
return new PasswordAuthentication(settings.get(FROM_SETTING), settings.get(PASSWD_SETTING));
}
});
} else {
@ -103,7 +113,7 @@ public class EmailAlertActionFactory implements AlertActionFactory {
Message message = new MimeMessage(session);
try {
message.setFrom(new InternetAddress(emailSettings.get(FROM_SETTING)));
message.setFrom(new InternetAddress(settings.get(FROM_SETTING)));
message.setRecipients(Message.RecipientType.TO,
emailAlertAction.getEmailAddresses().toArray(new Address[1]));
message.setSubject("Elasticsearch Alert " + alert.getAlertName() + " triggered");
@ -145,4 +155,8 @@ public class EmailAlertActionFactory implements AlertActionFactory {
}
@Override
public void receiveConfigurationUpdate(Settings settings) {
}
}

View File

@ -10,6 +10,9 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.alerts.transport.actions.ack.AckAlertRequest;
import org.elasticsearch.alerts.transport.actions.ack.AckAlertRequestBuilder;
import org.elasticsearch.alerts.transport.actions.ack.AckAlertResponse;
import org.elasticsearch.alerts.transport.actions.config.ConfigAlertRequest;
import org.elasticsearch.alerts.transport.actions.config.ConfigAlertRequestBuilder;
import org.elasticsearch.alerts.transport.actions.config.ConfigAlertResponse;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequestBuilder;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse;
@ -194,4 +197,21 @@ public interface AlertsClient extends ElasticsearchClient<AlertsClient> {
*/
ActionFuture<AlertsServiceResponse> alertService(AlertsServiceRequest request);
/**
* Prepare make an alert config request.
*/
ConfigAlertRequestBuilder prepareAlertConfig();
/**
* Perform an config alert request
*/
void alertConfig(ConfigAlertRequest request, ActionListener<ConfigAlertResponse> listener);
/**
* Perform an config alert request
*/
ActionFuture<ConfigAlertResponse> alertConfig(ConfigAlertRequest request);
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.alerts.client;
import org.elasticsearch.action.*;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.alerts.transport.actions.ack.*;
import org.elasticsearch.alerts.transport.actions.config.*;
import org.elasticsearch.alerts.transport.actions.delete.*;
import org.elasticsearch.alerts.transport.actions.get.*;
import org.elasticsearch.alerts.transport.actions.put.*;
@ -28,7 +29,7 @@ public class NodeAlertsClient implements AlertsClient {
public NodeAlertsClient(ThreadPool threadPool, Headers headers, TransportPutAlertAction transportPutAlertAction,
TransportGetAlertAction transportGetAlertAction, TransportDeleteAlertAction transportDeleteAlertAction,
TransportAlertStatsAction transportAlertStatsAction, TransportAckAlertAction transportAckAlertAction,
TransportAlertsServiceAction transportAlertsServiceAction) {
TransportAlertsServiceAction transportAlertsServiceAction, TransportConfigAlertAction transportConfigAlertAction) {
this.headers = headers;
this.threadPool = threadPool;
internalActions = ImmutableMap.<GenericAction, TransportAction>builder()
@ -38,6 +39,7 @@ public class NodeAlertsClient implements AlertsClient {
.put(AlertsStatsAction.INSTANCE, transportAlertStatsAction)
.put(AckAlertAction.INSTANCE, transportAckAlertAction)
.put(AlertServiceAction.INSTANCE, transportAlertsServiceAction)
.put(ConfigAlertAction.INSTANCE, transportConfigAlertAction)
.build();
}
@ -150,6 +152,36 @@ public class NodeAlertsClient implements AlertsClient {
return execute(AlertServiceAction.INSTANCE, request);
}
/**
* Prepare make an alert config request.
*/
@Override
public ConfigAlertRequestBuilder prepareAlertConfig() {
return new ConfigAlertRequestBuilder(this);
}
/**
* Perform an config alert request
*
* @param request
* @param listener
*/
@Override
public void alertConfig(ConfigAlertRequest request, ActionListener<ConfigAlertResponse> listener) {
execute(ConfigAlertAction.INSTANCE, request, listener);
}
/**
* Perform an config alert request
*
* @param request
*/
@Override
public ActionFuture<ConfigAlertResponse> alertConfig(ConfigAlertRequest request) {
return execute(ConfigAlertAction.INSTANCE, request);
}
@SuppressWarnings("unchecked")
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, AlertsClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, AlertsClient> action, Request request) {

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.transport.actions.config;
import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.client.AlertsClientAction;
/**
* This action deletes an alert from in memory, the scheduler and the index
*/
public class ConfigAlertAction extends AlertsClientAction<ConfigAlertRequest, ConfigAlertResponse, ConfigAlertRequestBuilder> {
public static final ConfigAlertAction INSTANCE = new ConfigAlertAction();
public static final String NAME = "indices:data/write/alert/config";
private ConfigAlertAction() {
super(NAME);
}
@Override
public ConfigAlertResponse newResponse() {
return new ConfigAlertResponse();
}
@Override
public ConfigAlertRequestBuilder newRequestBuilder(AlertsClient client) {
return new ConfigAlertRequestBuilder(client);
}
}

View File

@ -0,0 +1,130 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.transport.actions.config;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.alerts.AlertsStore;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* A delete alert request to delete an alert by name (id)
*/
public class ConfigAlertRequest extends MasterNodeOperationRequest<ConfigAlertRequest> {
private String configName;
private BytesReference configSource;
private boolean configSourceUnsafe;
public ConfigAlertRequest() {
}
/**
* The constructor for the requests that takes the name of the config to modify
* @param configName
*/
public ConfigAlertRequest(String configName) {
this.configName = configName;
}
/**
* The name of the config to be modified
* @return
*/
public String getConfigName() {
return configName;
}
/**
* The name of the config to be modified
* @param configName
*/
public void setConfigName(String configName) {
this.configName = configName;
}
/**
* The source of the config
* @return
*/
public BytesReference getConfigSource() {
return configSource;
}
/**
* The source of the config document
* @param configSource
*/
public void setConfigSource(BytesReference configSource) {
this.configSource = configSource;
this.configSourceUnsafe = false;
}
/**
* Is the ByteRef configSource safe
* @return
*/
public boolean isConfigSourceUnsafe() {
return configSourceUnsafe;
}
public void setConfigSourceUnsafe(boolean configSourceUnsafe) {
this.configSourceUnsafe = configSourceUnsafe;
}
/**
* Set the source of the config with boolean to control source safety
* @param configSource
* @param configSourceUnsafe
*/
public void setAlertSource(BytesReference configSource, boolean configSourceUnsafe) {
this.configSource = configSource;
this.configSourceUnsafe = configSourceUnsafe;
}
public void beforeLocalFork() {
if (configSourceUnsafe) {
configSource = configSource.copyBytesArray();
configSourceUnsafe = false;
}
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (configName == null){
validationException = ValidateActions.addValidationError("configName is missing", validationException);
}
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
configName = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(configName);
}
@Override
public String toString() {
return "delete {[" + AlertsStore.ALERT_INDEX + "][" + configName + "]}";
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.transport.actions.config;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.alerts.client.AlertsClient;
/**
* A alert config action request builder.
*/
public class ConfigAlertRequestBuilder
extends MasterNodeOperationRequestBuilder<ConfigAlertRequest, ConfigAlertResponse, ConfigAlertRequestBuilder, AlertsClient> {
public ConfigAlertRequestBuilder(AlertsClient client) {
super(client, new ConfigAlertRequest());
}
public ConfigAlertRequestBuilder(AlertsClient client, String alertName) {
super(client, new ConfigAlertRequest(alertName));
}
/**
* Sets the name of the config to be modified
* @param configName
* @return
*/
public ConfigAlertRequestBuilder setAlertName(String configName) {
this.request().setConfigName(configName);
return this;
}
@Override
protected void doExecute(final ActionListener<ConfigAlertResponse> listener) {
client.alertConfig(request, listener);
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.transport.actions.config;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*/
public class ConfigAlertResponse extends ActionResponse {
private IndexResponse indexResponse;
public ConfigAlertResponse() {
}
public ConfigAlertResponse(@Nullable IndexResponse indexResponse) {
this.indexResponse = indexResponse;
}
public IndexResponse indexResponse() {
return indexResponse;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
indexResponse = new IndexResponse();
indexResponse.readFrom(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(indexResponse != null);
if (indexResponse != null) {
indexResponse.writeTo(out);
}
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.transport.actions.config;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.alerts.ConfigurationManager;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
/**
* Performs the config operation.
*/
public class TransportConfigAlertAction extends TransportMasterNodeOperationAction<ConfigAlertRequest, ConfigAlertResponse> {
private final ConfigurationManager configManager;
private final Client client;
@Inject
public TransportConfigAlertAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, ConfigurationManager configManager,
Client client) {
super(settings, ConfigAlertAction.NAME, transportService, clusterService, threadPool, actionFilters);
this.configManager = configManager;
this.client = client;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected ConfigAlertRequest newRequest() {
return new ConfigAlertRequest();
}
@Override
protected ConfigAlertResponse newResponse() {
return new ConfigAlertResponse();
}
@Override
protected void masterOperation(ConfigAlertRequest request, ClusterState state, ActionListener<ConfigAlertResponse> listener) throws ElasticsearchException {
try {
IndexResponse indexResponse = client.prepareIndex(ConfigurationManager.CONFIG_INDEX, ConfigurationManager.CONFIG_TYPE, request.getConfigName())
.setSource(request.getConfigSource(), request.isConfigSourceUnsafe()).get();
configManager.newConfig(request.getConfigName(), request.getConfigSource());
listener.onResponse(new ConfigAlertResponse(indexResponse));
//ConfigAlertResponse response = new ConfigAlertResponse(alertManager.deleteAlert(request.getConfigName()));
//listener.onResponse(response);
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override
protected ClusterBlockException checkBlock(ConfigAlertRequest request, ClusterState state) {
request.beforeLocalFork(); // This is the best place to make the config source safe
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, new String[]{ConfigurationManager.CONFIG_INDEX});
}
}

View File

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
/**
* Delete action.
*/
package org.elasticsearch.alerts.transport.actions.config;

View File

@ -21,7 +21,16 @@ import static org.hamcrest.core.IsEqual.equalTo;
*/
public class ConfigTest extends ElasticsearchIntegrationTest {
public void testOverridingSettings() throws Exception {
class SettingsListener implements ConfigurableComponentListener {
Settings settings;
@Override
public void receiveConfigurationUpdate(Settings settings) {
this.settings = settings;
}
}
public void testListener() throws Exception {
TimeValue tv = TimeValue.timeValueMillis(10000);
Settings oldSettings = ImmutableSettings.builder()
.put("foo", tv)
@ -33,20 +42,21 @@ public class ConfigTest extends ElasticsearchIntegrationTest {
ClusterState clusterState = internalCluster().clusterService().state();
boolean isReady = configurationManager.isReady(clusterState);
assertTrue(isReady); //Should always be ready on a clean start
Settings newSettings = ImmutableSettings.builder()
.build();
assertThat(configurationManager.getOverriddenIntValue("bar", newSettings, 0), equalTo(1));
assertThat(configurationManager.getOverriddenBooleanValue("baz", newSettings, false), equalTo(true));
assertThat(configurationManager.getOverriddenTimeValue("foo", newSettings, TimeValue.timeValueMillis(0)).getMillis(), equalTo(tv.getMillis()));
TimeValue tv2 = TimeValue.timeValueMillis(0);
newSettings = ImmutableSettings.builder()
.put("foo", tv2)
.put("bar", 100)
.put("baz", false)
.build();
assertThat(configurationManager.getOverriddenIntValue("bar", newSettings, 0), equalTo(100));
assertThat(configurationManager.getOverriddenBooleanValue("baz", newSettings, true), equalTo(false));
assertThat(configurationManager.getOverriddenTimeValue("foo", newSettings, TimeValue.timeValueMillis(1)).getMillis(), equalTo(tv2.getMillis()));
SettingsListener settingsListener = new SettingsListener();
configurationManager.registerListener("foo", settingsListener);
TimeValue tv2 = TimeValue.timeValueMillis(10);
XContentBuilder jsonSettings = XContentFactory.jsonBuilder();
jsonSettings.startObject();
jsonSettings.field("foo", tv2)
.field("bar", 100)
.field("baz", false);
jsonSettings.endObject();
configurationManager.newConfig("foo", jsonSettings.bytes());
assertThat(settingsListener.settings.getAsTime("foo", new TimeValue(0)).getMillis(), equalTo(tv2.getMillis()));
assertThat(settingsListener.settings.getAsInt("bar", 0), equalTo(100));
assertThat(settingsListener.settings.getAsBoolean("baz", true), equalTo(false));
}