Watcher: Make settings reloadable (#31746)
This commit allows for rebuilding watcher secure secrets via the reload_secure_settings API call. The commit also renames a method in the Notification Service to make it a bit more readable.
This commit is contained in:
parent
b1bf643e41
commit
1f72afa773
|
@ -38,6 +38,7 @@ import org.elasticsearch.license.XPackLicenseState;
|
||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
import org.elasticsearch.plugins.ActionPlugin;
|
import org.elasticsearch.plugins.ActionPlugin;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
|
import org.elasticsearch.plugins.ReloadablePlugin;
|
||||||
import org.elasticsearch.plugins.ScriptPlugin;
|
import org.elasticsearch.plugins.ScriptPlugin;
|
||||||
import org.elasticsearch.rest.RestController;
|
import org.elasticsearch.rest.RestController;
|
||||||
import org.elasticsearch.rest.RestHandler;
|
import org.elasticsearch.rest.RestHandler;
|
||||||
|
@ -123,6 +124,7 @@ import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
|
||||||
import org.elasticsearch.xpack.watcher.input.simple.SimpleInputFactory;
|
import org.elasticsearch.xpack.watcher.input.simple.SimpleInputFactory;
|
||||||
import org.elasticsearch.xpack.watcher.input.transform.TransformInput;
|
import org.elasticsearch.xpack.watcher.input.transform.TransformInput;
|
||||||
import org.elasticsearch.xpack.watcher.input.transform.TransformInputFactory;
|
import org.elasticsearch.xpack.watcher.input.transform.TransformInputFactory;
|
||||||
|
import org.elasticsearch.xpack.watcher.notification.NotificationService;
|
||||||
import org.elasticsearch.xpack.watcher.notification.email.Account;
|
import org.elasticsearch.xpack.watcher.notification.email.Account;
|
||||||
import org.elasticsearch.xpack.watcher.notification.email.EmailService;
|
import org.elasticsearch.xpack.watcher.notification.email.EmailService;
|
||||||
import org.elasticsearch.xpack.watcher.notification.email.HtmlSanitizer;
|
import org.elasticsearch.xpack.watcher.notification.email.HtmlSanitizer;
|
||||||
|
@ -194,7 +196,7 @@ import java.util.function.UnaryOperator;
|
||||||
|
|
||||||
import static java.util.Collections.emptyList;
|
import static java.util.Collections.emptyList;
|
||||||
|
|
||||||
public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin {
|
public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, ReloadablePlugin {
|
||||||
|
|
||||||
// This setting is only here for backward compatibility reasons as 6.x indices made use of it. It can be removed in 8.x.
|
// This setting is only here for backward compatibility reasons as 6.x indices made use of it. It can be removed in 8.x.
|
||||||
@Deprecated
|
@Deprecated
|
||||||
|
@ -221,6 +223,7 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin {
|
||||||
protected final boolean transportClient;
|
protected final boolean transportClient;
|
||||||
protected final boolean enabled;
|
protected final boolean enabled;
|
||||||
protected final Environment env;
|
protected final Environment env;
|
||||||
|
protected List<NotificationService> reloadableServices = new ArrayList<>();
|
||||||
|
|
||||||
public Watcher(final Settings settings) {
|
public Watcher(final Settings settings) {
|
||||||
this.settings = settings;
|
this.settings = settings;
|
||||||
|
@ -275,6 +278,12 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin {
|
||||||
SlackService slackService = new SlackService(settings, httpClient, clusterService.getClusterSettings());
|
SlackService slackService = new SlackService(settings, httpClient, clusterService.getClusterSettings());
|
||||||
PagerDutyService pagerDutyService = new PagerDutyService(settings, httpClient, clusterService.getClusterSettings());
|
PagerDutyService pagerDutyService = new PagerDutyService(settings, httpClient, clusterService.getClusterSettings());
|
||||||
|
|
||||||
|
reloadableServices.add(emailService);
|
||||||
|
reloadableServices.add(hipChatService);
|
||||||
|
reloadableServices.add(jiraService);
|
||||||
|
reloadableServices.add(slackService);
|
||||||
|
reloadableServices.add(pagerDutyService);
|
||||||
|
|
||||||
TextTemplateEngine templateEngine = new TextTemplateEngine(settings, scriptService);
|
TextTemplateEngine templateEngine = new TextTemplateEngine(settings, scriptService);
|
||||||
Map<String, EmailAttachmentParser> emailAttachmentParsers = new HashMap<>();
|
Map<String, EmailAttachmentParser> emailAttachmentParsers = new HashMap<>();
|
||||||
emailAttachmentParsers.put(HttpEmailAttachementParser.TYPE, new HttpEmailAttachementParser(httpClient, httpTemplateParser,
|
emailAttachmentParsers.put(HttpEmailAttachementParser.TYPE, new HttpEmailAttachementParser(httpClient, httpTemplateParser,
|
||||||
|
@ -613,4 +622,15 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin {
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
IOUtils.closeWhileHandlingException(httpClient);
|
IOUtils.closeWhileHandlingException(httpClient);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reloads all the reloadable services in watcher.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void reload(Settings settings) {
|
||||||
|
if (enabled == false || transportClient) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
reloadableServices.forEach(s -> s.reload(settings));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ public abstract class NotificationService<Account> extends AbstractComponent {
|
||||||
public NotificationService(Settings settings, String type,
|
public NotificationService(Settings settings, String type,
|
||||||
ClusterSettings clusterSettings, List<Setting<?>> pluginSettings) {
|
ClusterSettings clusterSettings, List<Setting<?>> pluginSettings) {
|
||||||
this(settings, type);
|
this(settings, type);
|
||||||
clusterSettings.addSettingsUpdateConsumer(this::setAccountSetting, pluginSettings);
|
clusterSettings.addSettingsUpdateConsumer(this::reload, pluginSettings);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Used for testing only
|
// Used for testing only
|
||||||
|
@ -40,7 +40,7 @@ public abstract class NotificationService<Account> extends AbstractComponent {
|
||||||
this.type = type;
|
this.type = type;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized void setAccountSetting(Settings settings) {
|
public synchronized void reload(Settings settings) {
|
||||||
Tuple<Map<String, Account>, Account> accounts = buildAccounts(settings, this::createAccount);
|
Tuple<Map<String, Account>, Account> accounts = buildAccounts(settings, this::createAccount);
|
||||||
this.accounts = Collections.unmodifiableMap(accounts.v1());
|
this.accounts = Collections.unmodifiableMap(accounts.v1());
|
||||||
this.defaultAccount = accounts.v2();
|
this.defaultAccount = accounts.v2();
|
||||||
|
|
|
@ -127,7 +127,7 @@ public class EmailService extends NotificationService<Account> {
|
||||||
clusterSettings.addAffixUpdateConsumer(SETTING_SMTP_SEND_PARTIAL, (s, o) -> {}, (s, o) -> {});
|
clusterSettings.addAffixUpdateConsumer(SETTING_SMTP_SEND_PARTIAL, (s, o) -> {}, (s, o) -> {});
|
||||||
clusterSettings.addAffixUpdateConsumer(SETTING_SMTP_WAIT_ON_QUIT, (s, o) -> {}, (s, o) -> {});
|
clusterSettings.addAffixUpdateConsumer(SETTING_SMTP_WAIT_ON_QUIT, (s, o) -> {}, (s, o) -> {});
|
||||||
// do an initial load
|
// do an initial load
|
||||||
setAccountSetting(settings);
|
reload(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -79,13 +79,13 @@ public class HipChatService extends NotificationService<HipChatAccount> {
|
||||||
clusterSettings.addAffixUpdateConsumer(SETTING_PORT, (s, o) -> {}, (s, o) -> {});
|
clusterSettings.addAffixUpdateConsumer(SETTING_PORT, (s, o) -> {}, (s, o) -> {});
|
||||||
clusterSettings.addAffixUpdateConsumer(SETTING_MESSAGE_DEFAULTS, (s, o) -> {}, (s, o) -> {});
|
clusterSettings.addAffixUpdateConsumer(SETTING_MESSAGE_DEFAULTS, (s, o) -> {}, (s, o) -> {});
|
||||||
|
|
||||||
setAccountSetting(settings);
|
reload(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void setAccountSetting(Settings settings) {
|
public synchronized void reload(Settings settings) {
|
||||||
defaultServer = new HipChatServer(settings.getByPrefix("xpack.notification.hipchat."));
|
defaultServer = new HipChatServer(settings.getByPrefix("xpack.notification.hipchat."));
|
||||||
super.setAccountSetting(settings);
|
super.reload(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -75,7 +75,7 @@ public class JiraService extends NotificationService<JiraAccount> {
|
||||||
clusterSettings.addAffixUpdateConsumer(SETTING_SECURE_PASSWORD, (s, o) -> {}, (s, o) -> {});
|
clusterSettings.addAffixUpdateConsumer(SETTING_SECURE_PASSWORD, (s, o) -> {}, (s, o) -> {});
|
||||||
clusterSettings.addAffixUpdateConsumer(SETTING_DEFAULTS, (s, o) -> {}, (s, o) -> {});
|
clusterSettings.addAffixUpdateConsumer(SETTING_DEFAULTS, (s, o) -> {}, (s, o) -> {});
|
||||||
// do an initial load
|
// do an initial load
|
||||||
setAccountSetting(settings);
|
reload(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -46,7 +46,7 @@ public class PagerDutyService extends NotificationService<PagerDutyAccount> {
|
||||||
clusterSettings.addAffixUpdateConsumer(SETTING_SERVICE_API_KEY, (s, o) -> {}, (s, o) -> {});
|
clusterSettings.addAffixUpdateConsumer(SETTING_SERVICE_API_KEY, (s, o) -> {}, (s, o) -> {});
|
||||||
clusterSettings.addAffixUpdateConsumer(SETTING_SECURE_SERVICE_API_KEY, (s, o) -> {}, (s, o) -> {});
|
clusterSettings.addAffixUpdateConsumer(SETTING_SECURE_SERVICE_API_KEY, (s, o) -> {}, (s, o) -> {});
|
||||||
clusterSettings.addAffixUpdateConsumer(SETTING_DEFAULTS, (s, o) -> {}, (s, o) -> {});
|
clusterSettings.addAffixUpdateConsumer(SETTING_DEFAULTS, (s, o) -> {}, (s, o) -> {});
|
||||||
setAccountSetting(settings);
|
reload(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -46,7 +46,7 @@ public class SlackService extends NotificationService<SlackAccount> {
|
||||||
clusterSettings.addAffixUpdateConsumer(SETTING_URL, (s, o) -> {}, (s, o) -> {});
|
clusterSettings.addAffixUpdateConsumer(SETTING_URL, (s, o) -> {}, (s, o) -> {});
|
||||||
clusterSettings.addAffixUpdateConsumer(SETTING_URL_SECURE, (s, o) -> {}, (s, o) -> {});
|
clusterSettings.addAffixUpdateConsumer(SETTING_URL_SECURE, (s, o) -> {}, (s, o) -> {});
|
||||||
clusterSettings.addAffixUpdateConsumer(SETTING_DEFAULTS, (s, o) -> {}, (s, o) -> {});
|
clusterSettings.addAffixUpdateConsumer(SETTING_DEFAULTS, (s, o) -> {}, (s, o) -> {});
|
||||||
setAccountSetting(settings);
|
reload(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.IndexSettingsModule;
|
import org.elasticsearch.test.IndexSettingsModule;
|
||||||
import org.elasticsearch.threadpool.ExecutorBuilder;
|
import org.elasticsearch.threadpool.ExecutorBuilder;
|
||||||
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
||||||
|
import org.elasticsearch.xpack.watcher.notification.NotificationService;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -22,6 +23,10 @@ import static java.util.Collections.emptyMap;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.hasSize;
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
|
|
||||||
public class WatcherPluginTests extends ESTestCase {
|
public class WatcherPluginTests extends ESTestCase {
|
||||||
|
|
||||||
|
@ -97,4 +102,36 @@ public class WatcherPluginTests extends ESTestCase {
|
||||||
.build();
|
.build();
|
||||||
assertThat(Watcher.getWatcherThreadPoolSize(noDataNodeSettings), is(1));
|
assertThat(Watcher.getWatcherThreadPoolSize(noDataNodeSettings), is(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testReload() {
|
||||||
|
Settings settings = Settings.builder()
|
||||||
|
.put("xpack.watcher.enabled", true)
|
||||||
|
.put("path.home", createTempDir())
|
||||||
|
.build();
|
||||||
|
NotificationService mockService = mock(NotificationService.class);
|
||||||
|
Watcher watcher = new TestWatcher(settings, mockService);
|
||||||
|
|
||||||
|
watcher.reload(settings);
|
||||||
|
verify(mockService, times(1)).reload(settings);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testReloadDisabled() {
|
||||||
|
Settings settings = Settings.builder()
|
||||||
|
.put("xpack.watcher.enabled", false)
|
||||||
|
.put("path.home", createTempDir())
|
||||||
|
.build();
|
||||||
|
NotificationService mockService = mock(NotificationService.class);
|
||||||
|
Watcher watcher = new TestWatcher(settings, mockService);
|
||||||
|
|
||||||
|
watcher.reload(settings);
|
||||||
|
verifyNoMoreInteractions(mockService);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class TestWatcher extends Watcher {
|
||||||
|
|
||||||
|
TestWatcher(Settings settings, NotificationService service) {
|
||||||
|
super(settings);
|
||||||
|
reloadableServices.add(service);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,7 +82,7 @@ public class NotificationServiceTests extends ESTestCase {
|
||||||
|
|
||||||
TestNotificationService(Settings settings) {
|
TestNotificationService(Settings settings) {
|
||||||
super(settings, "test");
|
super(settings, "test");
|
||||||
setAccountSetting(settings);
|
reload(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue