Changes after review.
Original commit: elastic/x-pack-elasticsearch@0a9055b918
This commit is contained in:
parent
31173507a5
commit
6791841f42
|
@ -91,7 +91,7 @@ public class AlertManager extends AbstractComponent {
|
|||
stop();
|
||||
}
|
||||
});
|
||||
|
||||
manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true);
|
||||
}
|
||||
|
||||
public DeleteResponse deleteAlert(String name) throws InterruptedException, ExecutionException {
|
||||
|
@ -148,10 +148,6 @@ public class AlertManager extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private void loadSettings() {
|
||||
manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true);
|
||||
}
|
||||
|
||||
public TriggerResult executeAlert(AlertActionEntry entry) throws IOException {
|
||||
ensureStarted();
|
||||
alertLock.acquire(entry.getAlertName());
|
||||
|
|
|
@ -11,7 +11,6 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
|
@ -19,7 +18,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.indices.IndexMissingException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
|
@ -31,23 +29,22 @@ public class ConfigurationManager extends AbstractComponent {
|
|||
|
||||
public static final String CONFIG_TYPE = "config";
|
||||
public static final String CONFIG_INDEX = AlertsStore.ALERT_INDEX;
|
||||
private final String GLOBAL_CONFIG_NAME = "global";
|
||||
public static final String GLOBAL_CONFIG_NAME = "global";
|
||||
private volatile boolean readyToRead = false;
|
||||
private volatile ImmutableOpenMap<String, List<ConfigurableComponentListener>> componentNameToListener;
|
||||
private volatile CopyOnWriteArrayList<ConfigurableComponentListener> registeredComponents;
|
||||
|
||||
@Inject
|
||||
public ConfigurationManager(Settings settings, Client client) {
|
||||
super(settings);
|
||||
this.client = client;
|
||||
componentNameToListener = ImmutableOpenMap.<String, List<ConfigurableComponentListener>>builder().build();
|
||||
registeredComponents = new CopyOnWriteArrayList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method gets the config for a component name
|
||||
* @param componentName
|
||||
* This method gets the config
|
||||
* @return The immutable settings loaded from the index
|
||||
*/
|
||||
public Settings getConfigForComponent(String componentName) {
|
||||
public Settings getGlobalConfig() {
|
||||
ensureReady();
|
||||
try {
|
||||
client.admin().indices().prepareRefresh(CONFIG_INDEX).get();
|
||||
|
@ -55,7 +52,7 @@ public class ConfigurationManager extends AbstractComponent {
|
|||
logger.info("No index [" + CONFIG_INDEX + "] found");
|
||||
return null;
|
||||
}
|
||||
GetResponse response = client.prepareGet(CONFIG_INDEX, CONFIG_TYPE, componentName).get();
|
||||
GetResponse response = client.prepareGet(CONFIG_INDEX, CONFIG_TYPE, GLOBAL_CONFIG_NAME).get();
|
||||
if (!response.isExists()) {
|
||||
return null;
|
||||
}
|
||||
|
@ -67,23 +64,18 @@ public class ConfigurationManager extends AbstractComponent {
|
|||
return settingsBuilder.build();
|
||||
}
|
||||
|
||||
public Settings getGlobalConfig() {
|
||||
return getConfigForComponent(GLOBAL_CONFIG_NAME);
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify the listeners of a new config
|
||||
* @param componentName
|
||||
* @param settingsSource
|
||||
*/
|
||||
public void newConfig(String componentName, BytesReference settingsSource) {
|
||||
public void newConfig(BytesReference settingsSource) {
|
||||
Map<String, Object> settingsMap = XContentHelper.convertToMap(settingsSource, true).v2();
|
||||
ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder();
|
||||
for (Map.Entry<String, Object> configEntry : settingsMap.entrySet() ) {
|
||||
settingsBuilder.put(configEntry.getKey(), configEntry.getValue());
|
||||
}
|
||||
Settings settings = settingsBuilder.build();
|
||||
for (ConfigurableComponentListener componentListener : componentNameToListener.get(componentName)) {
|
||||
for (ConfigurableComponentListener componentListener : registeredComponents) {
|
||||
componentListener.receiveConfigurationUpdate(settings);
|
||||
}
|
||||
}
|
||||
|
@ -113,7 +105,6 @@ public class ConfigurationManager extends AbstractComponent {
|
|||
IndexMetaData configIndexMetadata = clusterState.getMetaData().index(CONFIG_INDEX);
|
||||
if (configIndexMetadata == null) {
|
||||
logger.info("No previous [" + CONFIG_INDEX + "]");
|
||||
|
||||
return true;
|
||||
} else {
|
||||
if (clusterState.routingTable().index(CONFIG_INDEX).allPrimaryShardsActive()) {
|
||||
|
@ -128,18 +119,10 @@ public class ConfigurationManager extends AbstractComponent {
|
|||
|
||||
/**
|
||||
* Registers an component to receive config updates
|
||||
* @param componentName
|
||||
* @param configListener
|
||||
*/
|
||||
public synchronized void registerListener(String componentName, ConfigurableComponentListener configListener) {
|
||||
if (componentNameToListener.get(componentName) == null ){
|
||||
List<ConfigurableComponentListener> componentListeners = new CopyOnWriteArrayList<>();
|
||||
componentListeners.add(configListener);
|
||||
ImmutableOpenMap.Builder componentNameToListenerBuilder = ImmutableOpenMap.builder(componentNameToListener)
|
||||
.fPut(componentName, componentListeners);
|
||||
componentNameToListener = componentNameToListenerBuilder.build();
|
||||
} else if (!componentNameToListener.get(componentName).contains(configListener)) {
|
||||
componentNameToListener.get(componentName).add(configListener);
|
||||
public void registerListener(ConfigurableComponentListener configListener) {
|
||||
if (!registeredComponents.contains(configListener)) {
|
||||
registeredComponents.add(configListener);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,8 +75,8 @@ public class AlertActionManager extends AbstractComponent {
|
|||
private final TemplateHelper templateHelper;
|
||||
private final AlertActionRegistry actionRegistry;
|
||||
|
||||
private int scrollSize;
|
||||
private TimeValue scrollTimeout;
|
||||
private final int scrollSize;
|
||||
private final TimeValue scrollTimeout;
|
||||
|
||||
private final AtomicLong largestQueueSize = new AtomicLong(0);
|
||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||
|
|
|
@ -28,7 +28,7 @@ public class AlertActionRegistry extends AbstractComponent {
|
|||
public AlertActionRegistry(Settings settings, Client client, ConfigurationManager configurationManager) {
|
||||
super(settings);
|
||||
actionImplemented = ImmutableOpenMap.<String, AlertActionFactory>builder()
|
||||
.fPut("email", new EmailAlertActionFactory(configurationManager))
|
||||
.fPut("email", new SnptAlertActionFactory(configurationManager))
|
||||
.fPut("index", new IndexAlertActionFactory(client, configurationManager))
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -25,24 +25,22 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
public class EmailAlertActionFactory implements AlertActionFactory, ConfigurableComponentListener {
|
||||
public class SnptAlertActionFactory implements AlertActionFactory, ConfigurableComponentListener {
|
||||
|
||||
private static final String GLOBAL_EMAIL_CONFIG = "email";
|
||||
|
||||
private static final String PORT_SETTING = "server.port";
|
||||
private static final String SERVER_SETTING = "server.name";
|
||||
private static final String FROM_SETTING = "from.address";
|
||||
private static final String PASSWD_SETTING = "from.passwd";
|
||||
private static final String PORT_SETTING = "alerts.action.snpt.server.port";
|
||||
private static final String SERVER_SETTING = "alerts.action.email.server.name";
|
||||
private static final String FROM_SETTING = "alerts.action.email.from.address";
|
||||
private static final String PASSWD_SETTING = "alerts.action.email.from.passwd";
|
||||
|
||||
private final ConfigurationManager configurationManager;
|
||||
private Settings settings;
|
||||
|
||||
public EmailAlertActionFactory(ConfigurationManager configurationManager) {
|
||||
public SnptAlertActionFactory(ConfigurationManager configurationManager) {
|
||||
this.configurationManager = configurationManager;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public AlertAction createAction(XContentParser parser) throws IOException {
|
||||
String display = null;
|
||||
|
@ -85,8 +83,8 @@ public class EmailAlertActionFactory implements AlertActionFactory, Configurable
|
|||
}
|
||||
EmailAlertAction emailAlertAction = (EmailAlertAction)action;
|
||||
if (settings == null) {
|
||||
settings = configurationManager.getConfigForComponent(GLOBAL_EMAIL_CONFIG);
|
||||
configurationManager.registerListener(GLOBAL_EMAIL_CONFIG, this);
|
||||
settings = configurationManager.getGlobalConfig();
|
||||
configurationManager.registerListener(this);
|
||||
}
|
||||
|
||||
if (settings == null) {
|
|
@ -8,7 +8,7 @@ package org.elasticsearch.alerts.transport.actions.config;
|
|||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ValidateActions;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
|
||||
import org.elasticsearch.alerts.AlertsStore;
|
||||
import org.elasticsearch.alerts.ConfigurationManager;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -20,7 +20,6 @@ import java.io.IOException;
|
|||
*/
|
||||
public class ConfigAlertRequest extends MasterNodeOperationRequest<ConfigAlertRequest> {
|
||||
|
||||
private String configName;
|
||||
private BytesReference configSource;
|
||||
private boolean configSourceUnsafe;
|
||||
|
||||
|
@ -28,29 +27,6 @@ public class ConfigAlertRequest extends MasterNodeOperationRequest<ConfigAlertRe
|
|||
public ConfigAlertRequest() {
|
||||
}
|
||||
|
||||
/**
|
||||
* The constructor for the requests that takes the name of the config to modify
|
||||
* @param configName
|
||||
*/
|
||||
public ConfigAlertRequest(String configName) {
|
||||
this.configName = configName;
|
||||
}
|
||||
|
||||
/**
|
||||
* The name of the config to be modified
|
||||
* @return
|
||||
*/
|
||||
public String getConfigName() {
|
||||
return configName;
|
||||
}
|
||||
|
||||
/**
|
||||
* The name of the config to be modified
|
||||
* @param configName
|
||||
*/
|
||||
public void setConfigName(String configName) {
|
||||
this.configName = configName;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
|
@ -88,7 +64,7 @@ public class ConfigAlertRequest extends MasterNodeOperationRequest<ConfigAlertRe
|
|||
* @param configSource
|
||||
* @param configSourceUnsafe
|
||||
*/
|
||||
public void setAlertSource(BytesReference configSource, boolean configSourceUnsafe) {
|
||||
public void setConfigSource(BytesReference configSource, boolean configSourceUnsafe) {
|
||||
this.configSource = configSource;
|
||||
this.configSourceUnsafe = configSourceUnsafe;
|
||||
}
|
||||
|
@ -105,7 +81,7 @@ public class ConfigAlertRequest extends MasterNodeOperationRequest<ConfigAlertRe
|
|||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
if (configName == null){
|
||||
if (configSource == null){
|
||||
validationException = ValidateActions.addValidationError("configName is missing", validationException);
|
||||
}
|
||||
return validationException;
|
||||
|
@ -114,17 +90,18 @@ public class ConfigAlertRequest extends MasterNodeOperationRequest<ConfigAlertRe
|
|||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
configName = in.readString();
|
||||
configSource = in.readBytesReference();
|
||||
configSourceUnsafe = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(configName);
|
||||
out.writeBytesReference(configSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "delete {[" + AlertsStore.ALERT_INDEX + "][" + configName + "]}";
|
||||
return "delete {[" + ConfigurationManager.CONFIG_INDEX + "][" + ConfigurationManager.CONFIG_TYPE + "]}";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.alerts.transport.actions.config;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
|
||||
import org.elasticsearch.alerts.client.AlertsClient;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
||||
/**
|
||||
* A alert config action request builder.
|
||||
|
@ -19,17 +20,24 @@ public class ConfigAlertRequestBuilder
|
|||
super(client, new ConfigAlertRequest());
|
||||
}
|
||||
|
||||
public ConfigAlertRequestBuilder(AlertsClient client, String alertName) {
|
||||
super(client, new ConfigAlertRequest(alertName));
|
||||
/**
|
||||
* Sets the source of the config to be modified
|
||||
* @param configSource
|
||||
* @return
|
||||
*/
|
||||
public ConfigAlertRequestBuilder setConfigSource(BytesReference configSource) {
|
||||
this.request().setConfigSource(configSource);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the name of the config to be modified
|
||||
* @param configName
|
||||
* Sets the source of the config to be modified with boolean to control safety
|
||||
* @param configSource
|
||||
* @return
|
||||
*/
|
||||
public ConfigAlertRequestBuilder setAlertName(String configName) {
|
||||
this.request().setConfigName(configName);
|
||||
public ConfigAlertRequestBuilder setConfigSource(BytesReference configSource, boolean sourceUnsafe) {
|
||||
this.request().setConfigSource(configSource);
|
||||
this.request().setConfigSourceUnsafe(sourceUnsafe);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -57,16 +57,10 @@ public class TransportConfigAlertAction extends TransportMasterNodeOperationActi
|
|||
protected void masterOperation(ConfigAlertRequest request, ClusterState state, ActionListener<ConfigAlertResponse> listener) throws ElasticsearchException {
|
||||
try {
|
||||
|
||||
IndexResponse indexResponse = client.prepareIndex(ConfigurationManager.CONFIG_INDEX, ConfigurationManager.CONFIG_TYPE, request.getConfigName())
|
||||
IndexResponse indexResponse = client.prepareIndex(ConfigurationManager.CONFIG_INDEX, ConfigurationManager.CONFIG_TYPE, ConfigurationManager.GLOBAL_CONFIG_NAME)
|
||||
.setSource(request.getConfigSource(), request.isConfigSourceUnsafe()).get();
|
||||
|
||||
configManager.newConfig(request.getConfigName(), request.getConfigSource());
|
||||
|
||||
configManager.newConfig( request.getConfigSource());
|
||||
listener.onResponse(new ConfigAlertResponse(indexResponse));
|
||||
|
||||
|
||||
//ConfigAlertResponse response = new ConfigAlertResponse(alertManager.deleteAlert(request.getConfigName()));
|
||||
//listener.onResponse(response);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ public class ConfigTest extends ElasticsearchIntegrationTest {
|
|||
assertTrue(isReady); //Should always be ready on a clean start
|
||||
|
||||
SettingsListener settingsListener = new SettingsListener();
|
||||
configurationManager.registerListener("foo", settingsListener);
|
||||
configurationManager.registerListener(settingsListener);
|
||||
TimeValue tv2 = TimeValue.timeValueMillis(10);
|
||||
XContentBuilder jsonSettings = XContentFactory.jsonBuilder();
|
||||
jsonSettings.startObject();
|
||||
|
@ -52,7 +52,7 @@ public class ConfigTest extends ElasticsearchIntegrationTest {
|
|||
.field("bar", 100)
|
||||
.field("baz", false);
|
||||
jsonSettings.endObject();
|
||||
configurationManager.newConfig("foo", jsonSettings.bytes());
|
||||
configurationManager.newConfig(jsonSettings.bytes());
|
||||
|
||||
assertThat(settingsListener.settings.getAsTime("foo", new TimeValue(0)).getMillis(), equalTo(tv2.getMillis()));
|
||||
assertThat(settingsListener.settings.getAsInt("bar", 0), equalTo(100));
|
||||
|
|
Loading…
Reference in New Issue