Extract template management from Watcher (#41169)

This commit extracts the template management from Watcher into an
abstract class, so that templates and lifecycle policies can be managed
in the same way across multiple plugins. This will be useful for SLM, as
well as potentially ILM and any other plugins which need to manage index
templates.
This commit is contained in:
Gordon Brown 2019-04-17 13:42:36 -06:00 committed by GitHub
parent 7e62ff2823
commit 66366d0307
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 396 additions and 209 deletions

View File

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.template;
import java.nio.charset.StandardCharsets;
import java.util.regex.Pattern;
/**
* Describes an index template to be loaded from a resource file for use with an {@link IndexTemplateRegistry}.
*/
public class IndexTemplateConfig {
private final String templateName;
private final String fileName;
private final String version;
private final String versionProperty;
/**
* Describes a template to be loaded from a resource file. Includes handling for substituting a version property into the template.
*
* The {@code versionProperty} parameter will be used to substitute the value of {@code version} into the template. For example,
* this template:
* {@code {"myTemplateVersion": "${my.version.property}"}}
* With {@code version = "42"; versionProperty = "my.version.property"} will result in {@code {"myTemplateVersion": "42"}}.
*
* @param templateName The name that will be used for the index template. Literal, include the version in this string if
* it should be used.
* @param fileName The filename the template should be loaded from. Literal, should include leading {@literal /} and
* extension if necessary.
* @param version The version of the template. Substituted for {@code versionProperty} as described above.
* @param versionProperty The property that will be replaced with the {@code version} string as described above.
*/
public IndexTemplateConfig(String templateName, String fileName, String version, String versionProperty) {
this.templateName = templateName;
this.fileName = fileName;
this.version = version;
this.versionProperty = versionProperty;
}
public String getFileName() {
return fileName;
}
public String getTemplateName() {
return templateName;
}
/**
* Loads the template from disk as a UTF-8 byte array.
* @return The template as a UTF-8 byte array.
*/
public byte[] loadBytes() {
String template = TemplateUtils.loadTemplate(fileName, version,
Pattern.quote("${" + versionProperty + "}"));
assert template != null && template.length() > 0;
return template.getBytes(StandardCharsets.UTF_8);
}
}

View File

@ -0,0 +1,229 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.template;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
/**
* Abstracts the logic of managing versioned index templates and lifecycle policies for plugins that require such things.
*/
public abstract class IndexTemplateRegistry implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(IndexTemplateRegistry.class);
protected final Settings settings;
protected final Client client;
protected final ThreadPool threadPool;
protected final NamedXContentRegistry xContentRegistry;
protected final ConcurrentMap<String, AtomicBoolean> templateCreationsInProgress = new ConcurrentHashMap<>();
protected final ConcurrentMap<String, AtomicBoolean> policyCreationsInProgress = new ConcurrentHashMap<>();
public IndexTemplateRegistry(Settings nodeSettings, ClusterService clusterService, ThreadPool threadPool, Client client,
NamedXContentRegistry xContentRegistry) {
this.settings = nodeSettings;
this.client = client;
this.threadPool = threadPool;
this.xContentRegistry = xContentRegistry;
clusterService.addListener(this);
}
/**
* Retrieves return a list of {@link IndexTemplateConfig} that represents
* the index templates that should be installed and managed.
* @return The configurations for the templates that should be installed.
*/
protected abstract List<IndexTemplateConfig> getTemplateConfigs();
/**
* Retrieves a list of {@link LifecyclePolicyConfig} that represents the ILM
* policies that should be installed and managed. Only called if ILM is enabled.
* @return The configurations for the lifecycle policies that should be installed.
*/
protected abstract List<LifecyclePolicyConfig> getPolicyConfigs();
/**
* Retrieves an identifier that is used to identify which plugin is asking for this.
* @return A string ID for the plugin managing these templates.
*/
protected abstract String getOrigin();
/**
* Called when creation of an index template fails.
* @param config The template config that failed to be created.
* @param e The exception that caused the failure.
*/
protected void onPutTemplateFailure(IndexTemplateConfig config, Exception e) {
logger.error(new ParameterizedMessage("error adding index template [{}] from [{}] for [{}]",
config.getTemplateName(), config.getFileName(), getOrigin()), e);
}
/**
* Called when creation of a lifecycle policy fails.
* @param policy The lifecycle policy that failed to be created.
* @param e The exception that caused the failure.
*/
protected void onPutPolicyFailure(LifecyclePolicy policy, Exception e) {
logger.error(new ParameterizedMessage("error adding lifecycle policy [{}] for [{}]",
policy.getName(), getOrigin()), e);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
ClusterState state = event.state();
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have the index templates,
// while they actually do exist
return;
}
// no master node, exit immediately
DiscoveryNode masterNode = event.state().getNodes().getMasterNode();
if (masterNode == null) {
return;
}
// if this node is newer than the master node, we probably need to add the template, which might be newer than the
// template the master node has, so we need potentially add new templates despite being not the master node
DiscoveryNode localNode = event.state().getNodes().getLocalNode();
boolean localNodeVersionAfterMaster = localNode.getVersion().after(masterNode.getVersion());
if (event.localNodeMaster() || localNodeVersionAfterMaster) {
addTemplatesIfMissing(state);
addIndexLifecyclePoliciesIfMissing(state);
}
}
private void addTemplatesIfMissing(ClusterState state) {
final List<IndexTemplateConfig> indexTemplates = getTemplateConfigs();
for (IndexTemplateConfig template : indexTemplates) {
final String templateName = template.getTemplateName();
final AtomicBoolean creationCheck = templateCreationsInProgress.computeIfAbsent(templateName, key -> new AtomicBoolean(false));
if (creationCheck.compareAndSet(false, true)) {
if (!state.metaData().getTemplates().containsKey(templateName)) {
logger.debug("adding index template [{}] for [{}], because it doesn't exist", templateName, getOrigin());
putTemplate(template, creationCheck);
} else {
creationCheck.set(false);
logger.trace("not adding index template [{}] for [{}], because it already exists", templateName, getOrigin());
}
}
}
}
private void putTemplate(final IndexTemplateConfig config, final AtomicBoolean creationCheck) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
final String templateName = config.getTemplateName();
PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(config.loadBytes(), XContentType.JSON);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), getOrigin(), request,
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse response) {
creationCheck.set(false);
if (response.isAcknowledged() == false) {
logger.error("error adding index template [{}] for [{}], request was not acknowledged",
templateName, getOrigin());
}
}
@Override
public void onFailure(Exception e) {
creationCheck.set(false);
onPutTemplateFailure(config, e);
}
}, client.admin().indices()::putTemplate);
});
}
private void addIndexLifecyclePoliciesIfMissing(ClusterState state) {
boolean ilmSupported = XPackSettings.INDEX_LIFECYCLE_ENABLED.get(settings);
if (ilmSupported) {
Optional<IndexLifecycleMetadata> maybeMeta = Optional.ofNullable(state.metaData().custom(IndexLifecycleMetadata.TYPE));
List<LifecyclePolicy> policies = getPolicyConfigs().stream()
.map(policyConfig -> policyConfig.load(xContentRegistry))
.collect(Collectors.toList());
for (LifecyclePolicy policy : policies) {
final AtomicBoolean creationCheck = policyCreationsInProgress.computeIfAbsent(policy.getName(),
key -> new AtomicBoolean(false));
if (creationCheck.compareAndSet(false, true)) {
final boolean policyNeedsToBeCreated = maybeMeta
.flatMap(ilmMeta -> Optional.ofNullable(ilmMeta.getPolicies().get(policy.getName())))
.isPresent() == false;
if (policyNeedsToBeCreated) {
logger.debug("adding lifecycle policy [{}] for [{}], because it doesn't exist", policy.getName(), getOrigin());
putPolicy(policy, creationCheck);
} else {
logger.trace("not adding lifecycle policy [{}] for [{}], because it already exists",
policy.getName(), getOrigin());
creationCheck.set(false);
}
}
}
}
}
private void putPolicy(final LifecyclePolicy policy, final AtomicBoolean creationCheck) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
PutLifecycleAction.Request request = new PutLifecycleAction.Request(policy);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), getOrigin(), request,
new ActionListener<PutLifecycleAction.Response>() {
@Override
public void onResponse(PutLifecycleAction.Response response) {
creationCheck.set(false);
if (response.isAcknowledged() == false) {
logger.error("error adding lifecycle policy [{}] for [{}], request was not acknowledged",
policy.getName(), getOrigin());
}
}
@Override
public void onFailure(Exception e) {
creationCheck.set(false);
onPutPolicyFailure(policy, e);
}
}, (req, listener) -> new XPackClient(client).ilmClient().putLifecyclePolicy(req, listener));
});
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.template;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyUtils;
/**
* Describes an index lifecycle policy to be loaded from a resource file for use with an {@link IndexTemplateRegistry}.
*/
public class LifecyclePolicyConfig {
private final String policyName;
private final String fileName;
/**
* Describes a lifecycle policy definition to be loaded from a resource file.
*
* @param policyName The name that will be used for the policy.
* @param fileName The filename the policy definition should be loaded from. Literal, should include leading {@literal /} and
* extension if necessary.
*/
public LifecyclePolicyConfig(String policyName, String fileName) {
this.policyName = policyName;
this.fileName = fileName;
}
public String getPolicyName() {
return policyName;
}
public String getFileName() {
return fileName;
}
public LifecyclePolicy load(NamedXContentRegistry xContentRegistry) {
return LifecyclePolicyUtils.loadPolicy(policyName, fileName, xContentRegistry);
}
}

View File

@ -5,240 +5,83 @@
*/
package org.elasticsearch.xpack.watcher.support;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyUtils;
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
import org.elasticsearch.xpack.core.template.IndexTemplateRegistry;
import org.elasticsearch.xpack.core.template.LifecyclePolicyConfig;
import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class WatcherIndexTemplateRegistry implements ClusterStateListener {
public class WatcherIndexTemplateRegistry extends IndexTemplateRegistry {
public static final TemplateConfig TEMPLATE_CONFIG_TRIGGERED_WATCHES = new TemplateConfig(
WatcherIndexTemplateRegistryField.TRIGGERED_TEMPLATE_NAME, "triggered-watches");
public static final TemplateConfig TEMPLATE_CONFIG_WATCH_HISTORY = new TemplateConfig(
WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME, "watch-history");
public static final TemplateConfig TEMPLATE_CONFIG_WATCH_HISTORY_NO_ILM = new TemplateConfig(
WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME_NO_ILM, "watch-history-no-ilm");
public static final TemplateConfig TEMPLATE_CONFIG_WATCHES = new TemplateConfig(
WatcherIndexTemplateRegistryField.WATCHES_TEMPLATE_NAME, "watches");
public static final TemplateConfig[] TEMPLATE_CONFIGS = new TemplateConfig[]{
TEMPLATE_CONFIG_TRIGGERED_WATCHES, TEMPLATE_CONFIG_WATCH_HISTORY, TEMPLATE_CONFIG_WATCHES
};
public static final TemplateConfig[] TEMPLATE_CONFIGS_NO_ILM = new TemplateConfig[]{
TEMPLATE_CONFIG_TRIGGERED_WATCHES, TEMPLATE_CONFIG_WATCH_HISTORY_NO_ILM, TEMPLATE_CONFIG_WATCHES
};
public static final String WATCHER_TEMPLATE_VERSION_VARIABLE = "xpack.watcher.template.version";
public static final IndexTemplateConfig TEMPLATE_CONFIG_TRIGGERED_WATCHES = new IndexTemplateConfig(
WatcherIndexTemplateRegistryField.TRIGGERED_TEMPLATE_NAME,
"/triggered-watches.json",
WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION,
WATCHER_TEMPLATE_VERSION_VARIABLE);
public static final IndexTemplateConfig TEMPLATE_CONFIG_WATCH_HISTORY = new IndexTemplateConfig(
WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME,
"/watch-history.json",
WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION,
WATCHER_TEMPLATE_VERSION_VARIABLE);
public static final IndexTemplateConfig TEMPLATE_CONFIG_WATCH_HISTORY_NO_ILM = new IndexTemplateConfig(
WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME_NO_ILM,
"/watch-history-no-ilm.json",
WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION,
WATCHER_TEMPLATE_VERSION_VARIABLE);
public static final IndexTemplateConfig TEMPLATE_CONFIG_WATCHES = new IndexTemplateConfig(
WatcherIndexTemplateRegistryField.WATCHES_TEMPLATE_NAME,
"/watches.json",
WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION,
WATCHER_TEMPLATE_VERSION_VARIABLE);
public static final PolicyConfig POLICY_WATCH_HISTORY = new PolicyConfig("watch-history-ilm-policy", "/watch-history-ilm-policy.json");
public static final LifecyclePolicyConfig POLICY_WATCH_HISTORY = new LifecyclePolicyConfig("watch-history-ilm-policy",
"/watch-history-ilm-policy.json");
private static final Logger logger = LogManager.getLogger(WatcherIndexTemplateRegistry.class);
private final List<IndexTemplateConfig> templatesToUse;
private final Settings nodeSettings;
private final Client client;
private final ThreadPool threadPool;
private final NamedXContentRegistry xContentRegistry;
private final ConcurrentMap<String, AtomicBoolean> templateCreationsInProgress = new ConcurrentHashMap<>();
private final AtomicBoolean historyPolicyCreationInProgress = new AtomicBoolean();
public WatcherIndexTemplateRegistry(Settings nodeSettings, ClusterService clusterService,
ThreadPool threadPool, Client client,
public WatcherIndexTemplateRegistry(Settings nodeSettings, ClusterService clusterService, ThreadPool threadPool, Client client,
NamedXContentRegistry xContentRegistry) {
this.nodeSettings = nodeSettings;
this.client = client;
this.threadPool = threadPool;
this.xContentRegistry = xContentRegistry;
clusterService.addListener(this);
super(nodeSettings, clusterService, threadPool, client, xContentRegistry);
boolean ilmEnabled = XPackSettings.INDEX_LIFECYCLE_ENABLED.get(settings);
templatesToUse = Arrays.asList(
ilmEnabled ? TEMPLATE_CONFIG_WATCH_HISTORY : TEMPLATE_CONFIG_WATCH_HISTORY_NO_ILM,
TEMPLATE_CONFIG_TRIGGERED_WATCHES,
TEMPLATE_CONFIG_WATCHES
);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
ClusterState state = event.state();
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have the index templates,
// while they actually do exist
return;
}
// no master node, exit immediately
DiscoveryNode masterNode = event.state().getNodes().getMasterNode();
if (masterNode == null) {
return;
}
// if this node is newer than the master node, we probably need to add the history template, which might be newer than the
// history template the master node has, so we need potentially add new templates despite being not the master node
DiscoveryNode localNode = event.state().getNodes().getLocalNode();
boolean localNodeVersionAfterMaster = localNode.getVersion().after(masterNode.getVersion());
if (event.localNodeMaster() || localNodeVersionAfterMaster) {
addTemplatesIfMissing(state);
addIndexLifecyclePolicyIfMissing(state);
}
protected List<IndexTemplateConfig> getTemplateConfigs() {
return templatesToUse;
}
private void addTemplatesIfMissing(ClusterState state) {
boolean ilmSupported = XPackSettings.INDEX_LIFECYCLE_ENABLED.get(this.nodeSettings);
final TemplateConfig[] indexTemplates = ilmSupported ? TEMPLATE_CONFIGS : TEMPLATE_CONFIGS_NO_ILM;
for (TemplateConfig template : indexTemplates) {
final String templateName = template.getTemplateName();
final AtomicBoolean creationCheck = templateCreationsInProgress.computeIfAbsent(templateName, key -> new AtomicBoolean(false));
if (creationCheck.compareAndSet(false, true)) {
if (!state.metaData().getTemplates().containsKey(templateName)) {
logger.debug("adding index template [{}], because it doesn't exist", templateName);
putTemplate(template, creationCheck);
} else {
creationCheck.set(false);
logger.trace("not adding index template [{}], because it already exists", templateName);
}
}
}
@Override
protected List<LifecyclePolicyConfig> getPolicyConfigs() {
return Collections.singletonList(POLICY_WATCH_HISTORY);
}
private void putTemplate(final TemplateConfig config, final AtomicBoolean creationCheck) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
final String templateName = config.getTemplateName();
PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(config.load(), XContentType.JSON);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, request,
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse response) {
creationCheck.set(false);
if (response.isAcknowledged() == false) {
logger.error("Error adding watcher template [{}], request was not acknowledged", templateName);
}
}
@Override
public void onFailure(Exception e) {
creationCheck.set(false);
logger.error(new ParameterizedMessage("Error adding watcher template [{}]", templateName), e);
}
}, client.admin().indices()::putTemplate);
});
}
// Package visible for testing
LifecyclePolicy loadWatcherHistoryPolicy() {
return LifecyclePolicyUtils.loadPolicy(POLICY_WATCH_HISTORY.policyName, POLICY_WATCH_HISTORY.fileName, xContentRegistry);
}
private void addIndexLifecyclePolicyIfMissing(ClusterState state) {
boolean ilmSupported = XPackSettings.INDEX_LIFECYCLE_ENABLED.get(this.nodeSettings);
if (ilmSupported && historyPolicyCreationInProgress.compareAndSet(false, true)) {
final LifecyclePolicy policyOnDisk = loadWatcherHistoryPolicy();
Optional<IndexLifecycleMetadata> maybeMeta = Optional.ofNullable(state.metaData().custom(IndexLifecycleMetadata.TYPE));
final boolean needsUpdating = maybeMeta
.flatMap(ilmMeta -> Optional.ofNullable(ilmMeta.getPolicies().get(policyOnDisk.getName())))
.isPresent() == false; // If there is no policy then one needs to be put;
if (needsUpdating) {
putPolicy(policyOnDisk, historyPolicyCreationInProgress);
} else {
historyPolicyCreationInProgress.set(false);
}
}
}
private void putPolicy(final LifecyclePolicy policy, final AtomicBoolean creationCheck) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
PutLifecycleAction.Request request = new PutLifecycleAction.Request(policy);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, request,
new ActionListener<PutLifecycleAction.Response>() {
@Override
public void onResponse(PutLifecycleAction.Response response) {
creationCheck.set(false);
if (response.isAcknowledged() == false) {
logger.error("error adding watcher index lifecycle policy [{}], request was not acknowledged",
policy.getName());
}
}
@Override
public void onFailure(Exception e) {
creationCheck.set(false);
logger.error(new ParameterizedMessage("error adding watcher index lifecycle policy [{}]",
policy.getName()), e);
}
}, (req, listener) -> new XPackClient(client).ilmClient().putLifecyclePolicy(req, listener));
});
@Override
protected String getOrigin() {
return WATCHER_ORIGIN;
}
public static boolean validate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME) &&
state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.TRIGGERED_TEMPLATE_NAME) &&
state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.WATCHES_TEMPLATE_NAME);
state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.TRIGGERED_TEMPLATE_NAME) &&
state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.WATCHES_TEMPLATE_NAME);
}
public static class TemplateConfig {
private final String templateName;
private String fileName;
TemplateConfig(String templateName, String fileName) {
this.templateName = templateName;
this.fileName = fileName;
}
public String getFileName() {
return fileName;
}
public String getTemplateName() {
return templateName;
}
public byte[] load() {
String template = TemplateUtils.loadTemplate("/" + fileName + ".json", WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION,
Pattern.quote("${xpack.watcher.template.version}"));
assert template != null && template.length() > 0;
return template.getBytes(StandardCharsets.UTF_8);
}
}
public static class PolicyConfig {
private final String policyName;
private String fileName;
PolicyConfig(String templateName, String fileName) {
this.policyName = templateName;
this.fileName = fileName;
}
}
}

View File

@ -59,6 +59,7 @@ import java.util.stream.Collectors;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.elasticsearch.mock.orig.Mockito.when;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
@ -164,7 +165,11 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase {
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
Map<String, LifecyclePolicy> policyMap = new HashMap<>();
LifecyclePolicy policy = registry.loadWatcherHistoryPolicy();
List<LifecyclePolicy> policies = registry.getPolicyConfigs().stream()
.map(policyConfig -> policyConfig.load(xContentRegistry))
.collect(Collectors.toList());
assertThat(policies, hasSize(1));
LifecyclePolicy policy = policies.get(0);
policyMap.put(policy.getName(), policy);
ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), policyMap, nodes);
registry.clusterChanged(event);
@ -183,13 +188,17 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase {
verify(client, times(0)).execute(eq(PutLifecycleAction.INSTANCE), anyObject(), anyObject());
}
public void testPolicyAlreadyExistsButDiffers() throws IOException {
public void testPolicyAlreadyExistsButDiffers() throws IOException {
DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
Map<String, LifecyclePolicy> policyMap = new HashMap<>();
String policyStr = "{\"phases\":{\"delete\":{\"min_age\":\"1m\",\"actions\":{\"delete\":{}}}}}";
LifecyclePolicy policy = registry.loadWatcherHistoryPolicy();
List<LifecyclePolicy> policies = registry.getPolicyConfigs().stream()
.map(policyConfig -> policyConfig.load(xContentRegistry))
.collect(Collectors.toList());
assertThat(policies, hasSize(1));
LifecyclePolicy policy = policies.get(0);
try (XContentParser parser = XContentType.JSON.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.THROW_UNSUPPORTED_OPERATION, policyStr)) {
LifecyclePolicy different = LifecyclePolicy.parse(parser, policy.getName());