Use ILM for Watcher history deletion (#37443)

* Use ILM for Watcher history deletion

This commit adds an index lifecycle policy for the `.watch-history-*` indices.
This policy is automatically used for all new watch history indices.

This does not yet remove the automatic cleanup that the monitoring plugin does
for the .watch-history indices, and it does not touch the
`xpack.watcher.history.cleaner_service.enabled` setting.

Relates to #32041
This commit is contained in:
Lee Hinman 2019-01-23 10:18:08 -07:00 committed by GitHub
parent f0fc6e8003
commit 427bc7f940
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 254 additions and 12 deletions

View File

@ -69,7 +69,7 @@ PUT _ilm/policy/my_policy
[source,js]
--------------------------------------------------
GET _ilm/policy
GET _ilm/policy/my_policy
--------------------------------------------------
// CONSOLE
// TEST[continued]

View File

@ -87,7 +87,7 @@ PUT _ilm/policy/my_policy
//////////
[source,js]
--------------------------------------------------
GET _ilm/policy
GET _ilm/policy/my_policy
--------------------------------------------------
// CONSOLE
// TEST[continued]

View File

@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* A utility class used for loading index lifecycle policies from the resource classpath
*/
public class LifecyclePolicyUtils {
private LifecyclePolicyUtils() {};
/**
* Loads a built-in index lifecycle policy and returns its source.
*/
public static LifecyclePolicy loadPolicy(String name, String resource, NamedXContentRegistry xContentRegistry) {
try {
BytesReference source = load(resource);
validate(source);
try (XContentParser parser = XContentType.JSON.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.THROW_UNSUPPORTED_OPERATION, source.utf8ToString())) {
return LifecyclePolicy.parse(parser, name);
}
} catch (Exception e) {
throw new IllegalArgumentException("unable to load policy [" + name + "] from [" + resource + "]", e);
}
}
/**
* Loads a resource from the classpath and returns it as a {@link BytesReference}
*/
private static BytesReference load(String name) throws IOException {
try (InputStream is = LifecyclePolicyUtils.class.getResourceAsStream(name)) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
Streams.copy(is, out);
return new BytesArray(out.toByteArray());
}
}
}
/**
* Parses and validates that the source is not empty.
*/
private static void validate(BytesReference source) {
if (source == null) {
throw new ElasticsearchParseException("policy must not be null");
}
try {
XContentHelper.convertToMap(source, false, XContentType.JSON).v2();
} catch (NotXContentException e) {
throw new ElasticsearchParseException("policy must not be empty");
} catch (Exception e) {
throw new ElasticsearchParseException("invalid policy", e);
}
}
}

View File

@ -0,0 +1,10 @@
{
"phases": {
"delete": {
"min_age": "7d",
"actions": {
"delete": {}
}
}
}
}

View File

@ -5,6 +5,7 @@
"index.number_of_shards": 1,
"index.number_of_replicas": 0,
"index.auto_expand_replicas": "0-1",
"index.lifecycle.name": "watch-history-ilm-policy",
"index.format": 6
},
"mappings": {

View File

@ -32,11 +32,12 @@ public class RestPutLifecycleAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String lifecycleName = restRequest.param("name");
XContentParser parser = restRequest.contentParser();
PutLifecycleAction.Request putLifecycleRequest = PutLifecycleAction.Request.parseRequest(lifecycleName, parser);
putLifecycleRequest.timeout(restRequest.paramAsTime("timeout", putLifecycleRequest.timeout()));
putLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putLifecycleRequest.masterNodeTimeout()));
try (XContentParser parser = restRequest.contentParser()) {
PutLifecycleAction.Request putLifecycleRequest = PutLifecycleAction.Request.parseRequest(lifecycleName, parser);
putLifecycleRequest.timeout(restRequest.paramAsTime("timeout", putLifecycleRequest.timeout()));
putLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putLifecycleRequest.masterNodeTimeout()));
return channel -> client.execute(PutLifecycleAction.INSTANCE, putLifecycleRequest, new RestToXContentListener<>(channel));
return channel -> client.execute(PutLifecycleAction.INSTANCE, putLifecycleRequest, new RestToXContentListener<>(channel));
}
}
}

View File

@ -87,7 +87,12 @@ public class TransportPutLifecycleAction extends TransportMasterNodeAction<Reque
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(request.getPolicy(), filteredHeaders,
nextVersion, Instant.now().toEpochMilli());
newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
if (oldPolicy == null) {
logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName());
} else {
logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName());
}
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, OperationMode.RUNNING);
newState.metaData(MetaData.builder(currentState.getMetaData())
.putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());

View File

@ -36,6 +36,7 @@ integTestCluster {
dependsOn copyKeyCerts
setting 'xpack.security.enabled', 'true'
setting 'xpack.ml.enabled', 'true'
setting 'xpack.watcher.enabled', 'false'
setting 'logger.org.elasticsearch.xpack.ml.datafeed', 'TRACE'
setting 'xpack.monitoring.enabled', 'false'
setting 'xpack.security.authc.token.enabled', 'true'

View File

@ -31,6 +31,7 @@ dependencies {
compileOnly project(path: ':plugins:transport-nio', configuration: 'runtime')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile "org.elasticsearch.plugin:x-pack-ilm:${version}"
// watcher deps
compile 'com.googlecode.owasp-java-html-sanitizer:owasp-java-html-sanitizer:r239'

View File

@ -270,7 +270,7 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa
throw new UncheckedIOException(e);
}
new WatcherIndexTemplateRegistry(clusterService, threadPool, client);
new WatcherIndexTemplateRegistry(clusterService, threadPool, client, xContentRegistry);
// http client
httpClient = new HttpClient(settings, getSslService(), cryptoService, clusterService);

View File

@ -18,13 +18,20 @@ import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyUtils;
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@ -46,17 +53,23 @@ public class WatcherIndexTemplateRegistry implements ClusterStateListener {
TEMPLATE_CONFIG_TRIGGERED_WATCHES, TEMPLATE_CONFIG_WATCH_HISTORY, TEMPLATE_CONFIG_WATCHES
};
public static final PolicyConfig POLICY_WATCH_HISTORY = new PolicyConfig("watch-history-ilm-policy", "/watch-history-ilm-policy.json");
private static final Logger logger = LogManager.getLogger(WatcherIndexTemplateRegistry.class);
private final Client client;
private final ThreadPool threadPool;
private final TemplateConfig[] indexTemplates;
private final NamedXContentRegistry xContentRegistry;
private final ConcurrentMap<String, AtomicBoolean> templateCreationsInProgress = new ConcurrentHashMap<>();
private final AtomicBoolean historyPolicyCreationInProgress = new AtomicBoolean();
public WatcherIndexTemplateRegistry(ClusterService clusterService, ThreadPool threadPool, Client client) {
public WatcherIndexTemplateRegistry(ClusterService clusterService, ThreadPool threadPool, Client client,
NamedXContentRegistry xContentRegistry) {
this.client = client;
this.threadPool = threadPool;
this.indexTemplates = TEMPLATE_CONFIGS;
this.xContentRegistry = xContentRegistry;
clusterService.addListener(this);
}
@ -82,6 +95,7 @@ public class WatcherIndexTemplateRegistry implements ClusterStateListener {
if (event.localNodeMaster() || localNodeVersionAfterMaster) {
addTemplatesIfMissing(state);
addIndexLifecyclePolicyIfMissing(state);
}
}
@ -127,6 +141,54 @@ public class WatcherIndexTemplateRegistry implements ClusterStateListener {
});
}
// Package visible for testing
LifecyclePolicy loadWatcherHistoryPolicy() {
return LifecyclePolicyUtils.loadPolicy(POLICY_WATCH_HISTORY.policyName, POLICY_WATCH_HISTORY.fileName, xContentRegistry);
}
private void addIndexLifecyclePolicyIfMissing(ClusterState state) {
if (historyPolicyCreationInProgress.compareAndSet(false, true)) {
final LifecyclePolicy policyOnDisk = loadWatcherHistoryPolicy();
Optional<IndexLifecycleMetadata> maybeMeta = Optional.ofNullable(state.metaData().custom(IndexLifecycleMetadata.TYPE));
final boolean needsUpdating = maybeMeta
.flatMap(ilmMeta -> Optional.ofNullable(ilmMeta.getPolicies().get(policyOnDisk.getName())))
.isPresent() == false; // If there is no policy then one needs to be put;
if (needsUpdating) {
putPolicy(policyOnDisk, historyPolicyCreationInProgress);
} else {
historyPolicyCreationInProgress.set(false);
}
}
}
private void putPolicy(final LifecyclePolicy policy, final AtomicBoolean creationCheck) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
PutLifecycleAction.Request request = new PutLifecycleAction.Request(policy);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, request,
new ActionListener<PutLifecycleAction.Response>() {
@Override
public void onResponse(PutLifecycleAction.Response response) {
creationCheck.set(false);
if (response.isAcknowledged() == false) {
logger.error("error adding watcher index lifecycle policy [{}], request was not acknowledged",
policy.getName());
}
}
@Override
public void onFailure(Exception e) {
creationCheck.set(false);
logger.error(new ParameterizedMessage("error adding watcher index lifecycle policy [{}]",
policy.getName()), e);
}
}, (req, listener) -> new XPackClient(client).ilmClient().putLifecyclePolicy(req, listener));
});
}
public static boolean validate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME) &&
state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.TRIGGERED_TEMPLATE_NAME) &&
@ -153,9 +215,19 @@ public class WatcherIndexTemplateRegistry implements ClusterStateListener {
public byte[] load() {
String template = TemplateUtils.loadTemplate("/" + fileName + ".json", WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION,
Pattern.quote("${xpack.watcher.template.version}"));
Pattern.quote("${xpack.watcher.template.version}"));
assert template != null && template.length() > 0;
return template.getBytes(StandardCharsets.UTF_8);
}
}
public static class PolicyConfig {
private final String policyName;
private String fileName;
PolicyConfig(String templateName, String fileName) {
this.policyName = templateName;
this.fileName = fileName;
}
}
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -21,20 +22,36 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexlifecycle.DeleteAction;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType;
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction;
import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.elasticsearch.mock.orig.Mockito.when;
@ -50,6 +67,7 @@ import static org.mockito.Mockito.verifyZeroInteractions;
public class WatcherIndexTemplateRegistryTests extends ESTestCase {
private WatcherIndexTemplateRegistry registry;
private NamedXContentRegistry xContentRegistry;
private Client client;
@Before
@ -72,7 +90,13 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase {
}).when(indicesAdminClient).putTemplate(any(PutIndexTemplateRequest.class), any(ActionListener.class));
ClusterService clusterService = mock(ClusterService.class);
registry = new WatcherIndexTemplateRegistry(clusterService, threadPool, client);
List<NamedXContentRegistry.Entry> entries = new ArrayList<>(ClusterModule.getNamedXWriteables());
entries.addAll(Arrays.asList(
new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE),
(p) -> TimeseriesLifecycleType.INSTANCE),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse)));
xContentRegistry = new NamedXContentRegistry(entries);
registry = new WatcherIndexTemplateRegistry(clusterService, threadPool, client, xContentRegistry);
}
public void testThatNonExistingTemplatesAreAddedImmediately() {
@ -91,6 +115,44 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase {
verify(client.admin().indices(), times(4)).putTemplate(argumentCaptor.capture(), anyObject());
}
public void testThatNonExistingPoliciesAreAddedImmediately() {
DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), nodes);
registry.clusterChanged(event);
verify(client, times(1)).execute(eq(PutLifecycleAction.INSTANCE), anyObject(), anyObject());
}
public void testPolicyAlreadyExists() {
DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
Map<String, LifecyclePolicy> policyMap = new HashMap<>();
LifecyclePolicy policy = registry.loadWatcherHistoryPolicy();
policyMap.put(policy.getName(), policy);
ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), policyMap, nodes);
registry.clusterChanged(event);
verify(client, times(0)).execute(eq(PutLifecycleAction.INSTANCE), anyObject(), anyObject());
}
public void testPolicyAlreadyExistsButDiffers() throws IOException {
DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
Map<String, LifecyclePolicy> policyMap = new HashMap<>();
String policyStr = "{\"phases\":{\"delete\":{\"min_age\":\"1m\",\"actions\":{\"delete\":{}}}}}";
LifecyclePolicy policy = registry.loadWatcherHistoryPolicy();
try (XContentParser parser = XContentType.JSON.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.THROW_UNSUPPORTED_OPERATION, policyStr)) {
LifecyclePolicy different = LifecyclePolicy.parse(parser, policy.getName());
policyMap.put(policy.getName(), different);
ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), policyMap, nodes);
registry.clusterChanged(event);
verify(client, times(0)).execute(eq(PutLifecycleAction.INSTANCE), anyObject(), anyObject());
}
}
public void testThatTemplatesExist() {
assertThat(WatcherIndexTemplateRegistry.validate(createClusterState(".watch-history")), is(false));
assertThat(WatcherIndexTemplateRegistry.validate(createClusterState(".watch-history", ".triggered_watches", ".watches")),
@ -141,6 +203,12 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase {
}
private ClusterChangedEvent createClusterChangedEvent(List<String> existingTemplateNames, DiscoveryNodes nodes) {
return createClusterChangedEvent(existingTemplateNames, Collections.emptyMap(), nodes);
}
private ClusterChangedEvent createClusterChangedEvent(List<String> existingTemplateNames,
Map<String, LifecyclePolicy> existingPolicies,
DiscoveryNodes nodes) {
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
when(event.localNodeMaster()).thenReturn(nodes.isLocalNodeElectedMaster());
ClusterState cs = mock(ClusterState.class);
@ -158,6 +226,10 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase {
}
when(metaData.getTemplates()).thenReturn(indexTemplates.build());
IndexLifecycleMetadata ilmMeta = mock(IndexLifecycleMetadata.class);
when(ilmMeta.getPolicies()).thenReturn(existingPolicies);
when(metaData.custom(anyObject())).thenReturn(ilmMeta);
when(cs.metaData()).thenReturn(metaData);
return event;

View File

@ -49,6 +49,7 @@ import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycle;
import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.notification.email.Authentication;
import org.elasticsearch.xpack.watcher.notification.email.Email;
@ -161,6 +162,8 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
}
types.add(CommonAnalysisPlugin.class);
// ILM is required for watcher template index settings
types.add(IndexLifecycle.class);
return types;
}