From 427bc7f9401946b706e731dbb6e455fd99388abc Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 23 Jan 2019 10:18:08 -0700 Subject: [PATCH] Use ILM for Watcher history deletion (#37443) * Use ILM for Watcher history deletion This commit adds an index lifecycle policy for the `.watch-history-*` indices. This policy is automatically used for all new watch history indices. This does not yet remove the automatic cleanup that the monitoring plugin does for the .watch-history indices, and it does not touch the `xpack.watcher.history.cleaner_service.enabled` setting. Relates to #32041 --- .../reference/ilm/apis/get-lifecycle.asciidoc | 2 +- .../ilm/update-lifecycle-policy.asciidoc | 2 +- .../indexlifecycle/LifecyclePolicyUtils.java | 76 +++++++++++++++++++ .../resources/watch-history-ilm-policy.json | 10 +++ .../src/main/resources/watch-history.json | 1 + .../action/RestPutLifecycleAction.java | 11 +-- .../action/TransportPutLifecycleAction.java | 7 +- .../qa/native-multi-node-tests/build.gradle | 1 + x-pack/plugin/watcher/build.gradle | 1 + .../elasticsearch/xpack/watcher/Watcher.java | 2 +- .../support/WatcherIndexTemplateRegistry.java | 76 ++++++++++++++++++- .../WatcherIndexTemplateRegistryTests.java | 74 +++++++++++++++++- .../AbstractWatcherIntegrationTestCase.java | 3 + 13 files changed, 254 insertions(+), 12 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyUtils.java create mode 100644 x-pack/plugin/core/src/main/resources/watch-history-ilm-policy.json diff --git a/docs/reference/ilm/apis/get-lifecycle.asciidoc b/docs/reference/ilm/apis/get-lifecycle.asciidoc index 9bdf14d970c..a36703967af 100644 --- a/docs/reference/ilm/apis/get-lifecycle.asciidoc +++ b/docs/reference/ilm/apis/get-lifecycle.asciidoc @@ -69,7 +69,7 @@ PUT _ilm/policy/my_policy [source,js] -------------------------------------------------- -GET _ilm/policy +GET _ilm/policy/my_policy -------------------------------------------------- // CONSOLE // TEST[continued] diff --git a/docs/reference/ilm/update-lifecycle-policy.asciidoc b/docs/reference/ilm/update-lifecycle-policy.asciidoc index 3e6627fdd3a..bc41da6bdff 100644 --- a/docs/reference/ilm/update-lifecycle-policy.asciidoc +++ b/docs/reference/ilm/update-lifecycle-policy.asciidoc @@ -87,7 +87,7 @@ PUT _ilm/policy/my_policy ////////// [source,js] -------------------------------------------------- -GET _ilm/policy +GET _ilm/policy/my_policy -------------------------------------------------- // CONSOLE // TEST[continued] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyUtils.java new file mode 100644 index 00000000000..c3752e3927b --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyUtils.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.NotXContentException; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * A utility class used for loading index lifecycle policies from the resource classpath + */ +public class LifecyclePolicyUtils { + + private LifecyclePolicyUtils() {}; + + /** + * Loads a built-in index lifecycle policy and returns its source. + */ + public static LifecyclePolicy loadPolicy(String name, String resource, NamedXContentRegistry xContentRegistry) { + try { + BytesReference source = load(resource); + validate(source); + + try (XContentParser parser = XContentType.JSON.xContent() + .createParser(xContentRegistry, LoggingDeprecationHandler.THROW_UNSUPPORTED_OPERATION, source.utf8ToString())) { + return LifecyclePolicy.parse(parser, name); + } + } catch (Exception e) { + throw new IllegalArgumentException("unable to load policy [" + name + "] from [" + resource + "]", e); + } + } + + /** + * Loads a resource from the classpath and returns it as a {@link BytesReference} + */ + private static BytesReference load(String name) throws IOException { + try (InputStream is = LifecyclePolicyUtils.class.getResourceAsStream(name)) { + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + Streams.copy(is, out); + return new BytesArray(out.toByteArray()); + } + } + } + + /** + * Parses and validates that the source is not empty. + */ + private static void validate(BytesReference source) { + if (source == null) { + throw new ElasticsearchParseException("policy must not be null"); + } + + try { + XContentHelper.convertToMap(source, false, XContentType.JSON).v2(); + } catch (NotXContentException e) { + throw new ElasticsearchParseException("policy must not be empty"); + } catch (Exception e) { + throw new ElasticsearchParseException("invalid policy", e); + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/watch-history-ilm-policy.json b/x-pack/plugin/core/src/main/resources/watch-history-ilm-policy.json new file mode 100644 index 00000000000..e45e6b25e8f --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/watch-history-ilm-policy.json @@ -0,0 +1,10 @@ +{ + "phases": { + "delete": { + "min_age": "7d", + "actions": { + "delete": {} + } + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/watch-history.json b/x-pack/plugin/core/src/main/resources/watch-history.json index 9a4a96409b0..db6fd4aff95 100644 --- a/x-pack/plugin/core/src/main/resources/watch-history.json +++ b/x-pack/plugin/core/src/main/resources/watch-history.json @@ -5,6 +5,7 @@ "index.number_of_shards": 1, "index.number_of_replicas": 0, "index.auto_expand_replicas": "0-1", + "index.lifecycle.name": "watch-history-ilm-policy", "index.format": 6 }, "mappings": { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/RestPutLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/RestPutLifecycleAction.java index 586c3c68326..aad85426fc3 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/RestPutLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/RestPutLifecycleAction.java @@ -32,11 +32,12 @@ public class RestPutLifecycleAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String lifecycleName = restRequest.param("name"); - XContentParser parser = restRequest.contentParser(); - PutLifecycleAction.Request putLifecycleRequest = PutLifecycleAction.Request.parseRequest(lifecycleName, parser); - putLifecycleRequest.timeout(restRequest.paramAsTime("timeout", putLifecycleRequest.timeout())); - putLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putLifecycleRequest.masterNodeTimeout())); + try (XContentParser parser = restRequest.contentParser()) { + PutLifecycleAction.Request putLifecycleRequest = PutLifecycleAction.Request.parseRequest(lifecycleName, parser); + putLifecycleRequest.timeout(restRequest.paramAsTime("timeout", putLifecycleRequest.timeout())); + putLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putLifecycleRequest.masterNodeTimeout())); - return channel -> client.execute(PutLifecycleAction.INSTANCE, putLifecycleRequest, new RestToXContentListener<>(channel)); + return channel -> client.execute(PutLifecycleAction.INSTANCE, putLifecycleRequest, new RestToXContentListener<>(channel)); + } } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/TransportPutLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/TransportPutLifecycleAction.java index da51bb68962..61f9be3558f 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/TransportPutLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/TransportPutLifecycleAction.java @@ -87,7 +87,12 @@ public class TransportPutLifecycleAction extends TransportMasterNodeAction newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas()); LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(request.getPolicy(), filteredHeaders, nextVersion, Instant.now().toEpochMilli()); - newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata); + LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata); + if (oldPolicy == null) { + logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName()); + } else { + logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName()); + } IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, OperationMode.RUNNING); newState.metaData(MetaData.builder(currentState.getMetaData()) .putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build()); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle index 188086afcf1..95a2f8db2c4 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle @@ -36,6 +36,7 @@ integTestCluster { dependsOn copyKeyCerts setting 'xpack.security.enabled', 'true' setting 'xpack.ml.enabled', 'true' + setting 'xpack.watcher.enabled', 'false' setting 'logger.org.elasticsearch.xpack.ml.datafeed', 'TRACE' setting 'xpack.monitoring.enabled', 'false' setting 'xpack.security.authc.token.enabled', 'true' diff --git a/x-pack/plugin/watcher/build.gradle b/x-pack/plugin/watcher/build.gradle index fe6885b1c9d..dd26db984cd 100644 --- a/x-pack/plugin/watcher/build.gradle +++ b/x-pack/plugin/watcher/build.gradle @@ -31,6 +31,7 @@ dependencies { compileOnly project(path: ':plugins:transport-nio', configuration: 'runtime') testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') + testCompile "org.elasticsearch.plugin:x-pack-ilm:${version}" // watcher deps compile 'com.googlecode.owasp-java-html-sanitizer:owasp-java-html-sanitizer:r239' diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 7b17d7f9973..d5eea54a80c 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -270,7 +270,7 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa throw new UncheckedIOException(e); } - new WatcherIndexTemplateRegistry(clusterService, threadPool, client); + new WatcherIndexTemplateRegistry(clusterService, threadPool, client, xContentRegistry); // http client httpClient = new HttpClient(settings, getSslService(), cryptoService, clusterService); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java index 3258a7a481b..0fdb2b3a17d 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java @@ -18,13 +18,20 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.XPackClient; +import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata; +import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; +import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyUtils; +import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction; import org.elasticsearch.xpack.core.template.TemplateUtils; import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField; import java.nio.charset.StandardCharsets; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -46,17 +53,23 @@ public class WatcherIndexTemplateRegistry implements ClusterStateListener { TEMPLATE_CONFIG_TRIGGERED_WATCHES, TEMPLATE_CONFIG_WATCH_HISTORY, TEMPLATE_CONFIG_WATCHES }; + public static final PolicyConfig POLICY_WATCH_HISTORY = new PolicyConfig("watch-history-ilm-policy", "/watch-history-ilm-policy.json"); + private static final Logger logger = LogManager.getLogger(WatcherIndexTemplateRegistry.class); private final Client client; private final ThreadPool threadPool; private final TemplateConfig[] indexTemplates; + private final NamedXContentRegistry xContentRegistry; private final ConcurrentMap templateCreationsInProgress = new ConcurrentHashMap<>(); + private final AtomicBoolean historyPolicyCreationInProgress = new AtomicBoolean(); - public WatcherIndexTemplateRegistry(ClusterService clusterService, ThreadPool threadPool, Client client) { + public WatcherIndexTemplateRegistry(ClusterService clusterService, ThreadPool threadPool, Client client, + NamedXContentRegistry xContentRegistry) { this.client = client; this.threadPool = threadPool; this.indexTemplates = TEMPLATE_CONFIGS; + this.xContentRegistry = xContentRegistry; clusterService.addListener(this); } @@ -82,6 +95,7 @@ public class WatcherIndexTemplateRegistry implements ClusterStateListener { if (event.localNodeMaster() || localNodeVersionAfterMaster) { addTemplatesIfMissing(state); + addIndexLifecyclePolicyIfMissing(state); } } @@ -127,6 +141,54 @@ public class WatcherIndexTemplateRegistry implements ClusterStateListener { }); } + // Package visible for testing + LifecyclePolicy loadWatcherHistoryPolicy() { + return LifecyclePolicyUtils.loadPolicy(POLICY_WATCH_HISTORY.policyName, POLICY_WATCH_HISTORY.fileName, xContentRegistry); + } + + private void addIndexLifecyclePolicyIfMissing(ClusterState state) { + if (historyPolicyCreationInProgress.compareAndSet(false, true)) { + final LifecyclePolicy policyOnDisk = loadWatcherHistoryPolicy(); + + Optional maybeMeta = Optional.ofNullable(state.metaData().custom(IndexLifecycleMetadata.TYPE)); + final boolean needsUpdating = maybeMeta + .flatMap(ilmMeta -> Optional.ofNullable(ilmMeta.getPolicies().get(policyOnDisk.getName()))) + .isPresent() == false; // If there is no policy then one needs to be put; + + if (needsUpdating) { + putPolicy(policyOnDisk, historyPolicyCreationInProgress); + } else { + historyPolicyCreationInProgress.set(false); + } + } + } + + private void putPolicy(final LifecyclePolicy policy, final AtomicBoolean creationCheck) { + final Executor executor = threadPool.generic(); + executor.execute(() -> { + PutLifecycleAction.Request request = new PutLifecycleAction.Request(policy); + request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, request, + new ActionListener() { + @Override + public void onResponse(PutLifecycleAction.Response response) { + creationCheck.set(false); + if (response.isAcknowledged() == false) { + logger.error("error adding watcher index lifecycle policy [{}], request was not acknowledged", + policy.getName()); + } + } + + @Override + public void onFailure(Exception e) { + creationCheck.set(false); + logger.error(new ParameterizedMessage("error adding watcher index lifecycle policy [{}]", + policy.getName()), e); + } + }, (req, listener) -> new XPackClient(client).ilmClient().putLifecyclePolicy(req, listener)); + }); + } + public static boolean validate(ClusterState state) { return state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME) && state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.TRIGGERED_TEMPLATE_NAME) && @@ -153,9 +215,19 @@ public class WatcherIndexTemplateRegistry implements ClusterStateListener { public byte[] load() { String template = TemplateUtils.loadTemplate("/" + fileName + ".json", WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION, - Pattern.quote("${xpack.watcher.template.version}")); + Pattern.quote("${xpack.watcher.template.version}")); assert template != null && template.length() > 0; return template.getBytes(StandardCharsets.UTF_8); } } + public static class PolicyConfig { + + private final String policyName; + private String fileName; + + PolicyConfig(String templateName, String fileName) { + this.policyName = templateName; + this.fileName = fileName; + } + } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java index e93a86b93eb..60ca2b83b2f 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -21,20 +22,36 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexlifecycle.DeleteAction; +import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction; +import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction; import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField; import org.junit.Before; import org.mockito.ArgumentCaptor; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.elasticsearch.mock.orig.Mockito.verify; import static org.elasticsearch.mock.orig.Mockito.when; @@ -50,6 +67,7 @@ import static org.mockito.Mockito.verifyZeroInteractions; public class WatcherIndexTemplateRegistryTests extends ESTestCase { private WatcherIndexTemplateRegistry registry; + private NamedXContentRegistry xContentRegistry; private Client client; @Before @@ -72,7 +90,13 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase { }).when(indicesAdminClient).putTemplate(any(PutIndexTemplateRequest.class), any(ActionListener.class)); ClusterService clusterService = mock(ClusterService.class); - registry = new WatcherIndexTemplateRegistry(clusterService, threadPool, client); + List entries = new ArrayList<>(ClusterModule.getNamedXWriteables()); + entries.addAll(Arrays.asList( + new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE), + (p) -> TimeseriesLifecycleType.INSTANCE), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse))); + xContentRegistry = new NamedXContentRegistry(entries); + registry = new WatcherIndexTemplateRegistry(clusterService, threadPool, client, xContentRegistry); } public void testThatNonExistingTemplatesAreAddedImmediately() { @@ -91,6 +115,44 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase { verify(client.admin().indices(), times(4)).putTemplate(argumentCaptor.capture(), anyObject()); } + public void testThatNonExistingPoliciesAreAddedImmediately() { + DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + + ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), nodes); + registry.clusterChanged(event); + verify(client, times(1)).execute(eq(PutLifecycleAction.INSTANCE), anyObject(), anyObject()); + } + + public void testPolicyAlreadyExists() { + DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + + Map policyMap = new HashMap<>(); + LifecyclePolicy policy = registry.loadWatcherHistoryPolicy(); + policyMap.put(policy.getName(), policy); + ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), policyMap, nodes); + registry.clusterChanged(event); + verify(client, times(0)).execute(eq(PutLifecycleAction.INSTANCE), anyObject(), anyObject()); + } + + public void testPolicyAlreadyExistsButDiffers() throws IOException { + DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + + Map policyMap = new HashMap<>(); + String policyStr = "{\"phases\":{\"delete\":{\"min_age\":\"1m\",\"actions\":{\"delete\":{}}}}}"; + LifecyclePolicy policy = registry.loadWatcherHistoryPolicy(); + try (XContentParser parser = XContentType.JSON.xContent() + .createParser(xContentRegistry, LoggingDeprecationHandler.THROW_UNSUPPORTED_OPERATION, policyStr)) { + LifecyclePolicy different = LifecyclePolicy.parse(parser, policy.getName()); + policyMap.put(policy.getName(), different); + ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), policyMap, nodes); + registry.clusterChanged(event); + verify(client, times(0)).execute(eq(PutLifecycleAction.INSTANCE), anyObject(), anyObject()); + } + } + public void testThatTemplatesExist() { assertThat(WatcherIndexTemplateRegistry.validate(createClusterState(".watch-history")), is(false)); assertThat(WatcherIndexTemplateRegistry.validate(createClusterState(".watch-history", ".triggered_watches", ".watches")), @@ -141,6 +203,12 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase { } private ClusterChangedEvent createClusterChangedEvent(List existingTemplateNames, DiscoveryNodes nodes) { + return createClusterChangedEvent(existingTemplateNames, Collections.emptyMap(), nodes); + } + + private ClusterChangedEvent createClusterChangedEvent(List existingTemplateNames, + Map existingPolicies, + DiscoveryNodes nodes) { ClusterChangedEvent event = mock(ClusterChangedEvent.class); when(event.localNodeMaster()).thenReturn(nodes.isLocalNodeElectedMaster()); ClusterState cs = mock(ClusterState.class); @@ -158,6 +226,10 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase { } when(metaData.getTemplates()).thenReturn(indexTemplates.build()); + + IndexLifecycleMetadata ilmMeta = mock(IndexLifecycleMetadata.class); + when(ilmMeta.getPolicies()).thenReturn(existingPolicies); + when(metaData.custom(anyObject())).thenReturn(ilmMeta); when(cs.metaData()).thenReturn(metaData); return event; diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 615fc8fbd08..6737de19baa 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -49,6 +49,7 @@ import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse; import org.elasticsearch.xpack.core.watcher.watch.ClockMock; import org.elasticsearch.xpack.core.watcher.watch.Watch; +import org.elasticsearch.xpack.indexlifecycle.IndexLifecycle; import org.elasticsearch.xpack.watcher.history.HistoryStore; import org.elasticsearch.xpack.watcher.notification.email.Authentication; import org.elasticsearch.xpack.watcher.notification.email.Email; @@ -161,6 +162,8 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase } types.add(CommonAnalysisPlugin.class); + // ILM is required for watcher template index settings + types.add(IndexLifecycle.class); return types; }