From fe72991c7019a76c451b2557be98202dff5e9795 Mon Sep 17 00:00:00 2001 From: Suyog Rao Date: Tue, 13 Jun 2017 10:30:30 -0700 Subject: [PATCH] [Logstash] Add new component to x-pack to handle LS features (elastic/x-pack-elasticsearch#1530) This commit adds a new Logstash component to x-pack to support the config management work. Currently, the functionality in this component is really simple; all it does is upload a new index template for `.logstash` index. This index stores the actual LS configuration. On this template is bootstrapped in ES, Kibana can write user-created LS configs which adhere to the mapping defined here. In the future, we're looking into adding more functionality on the ES side to handle config documents, but for now, this is simple. relates elastic/x-pack-elasticsearch#1499, relates elastic/x-pack-elasticsearch#1471 Original commit: elastic/x-pack-elasticsearch@d7cc8675f73b88b01cba3f9dcb52f6b504eeb07c --- docs/en/rest-api/index.asciidoc | 5 + .../license/XPackLicenseState.java | 28 ++ .../org/elasticsearch/xpack/XPackPlugin.java | 11 + .../elasticsearch/xpack/XPackSettings.java | 3 + .../xpack/logstash/Logstash.java | 61 +++++ .../xpack/logstash/LogstashFeatureSet.java | 82 ++++++ .../logstash/LogstashTemplateRegistry.java | 112 ++++++++ .../support/IndexLifecycleManager.java | 50 +--- .../xpack/template/TemplateUtils.java | 78 ++++++ .../resources/logstash-index-template.json | 50 ++++ .../license/XPackLicenseStateTests.java | 11 + .../logstash/LogstashFeatureSetTests.java | 63 +++++ .../LogstashTemplateRegistryTests.java | 245 ++++++++++++++++++ ...wrong-version-logstash-index-template.json | 50 ++++ 14 files changed, 803 insertions(+), 46 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/logstash/Logstash.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/logstash/LogstashFeatureSet.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/logstash/LogstashTemplateRegistry.java create mode 100644 plugin/src/main/resources/logstash-index-template.json create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/logstash/LogstashFeatureSetTests.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/logstash/LogstashTemplateRegistryTests.java create mode 100644 plugin/src/test/resources/wrong-version-logstash-index-template.json diff --git a/docs/en/rest-api/index.asciidoc b/docs/en/rest-api/index.asciidoc index 6cc57e86bd3..3b8dc57e0ba 100644 --- a/docs/en/rest-api/index.asciidoc +++ b/docs/en/rest-api/index.asciidoc @@ -53,6 +53,11 @@ Example response: "available" : true, "enabled" : true }, + "logstash" : { + "description" : "Logstash management component for X-Pack", + "available" : true, + "enabled" : true + }, "ml" : { "description" : "Machine Learning for the Elastic Stack", "available" : true, diff --git a/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java b/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java index 275ba38b7d5..1ccbe610b59 100644 --- a/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java +++ b/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java @@ -48,6 +48,9 @@ public class XPackLicenseState { messages.put(XPackPlugin.MACHINE_LEARNING, new String[] { "Machine learning APIs are disabled" }); + messages.put(XPackPlugin.LOGSTASH, new String[] { + "Logstash specific APIs are disabled. You can continue to manage and poll stored configurations" + }); EXPIRATION_MESSAGES = Collections.unmodifiableMap(messages); } @@ -62,6 +65,7 @@ public class XPackLicenseState { messages.put(XPackPlugin.WATCHER, XPackLicenseState::watcherAcknowledgementMessages); messages.put(XPackPlugin.MONITORING, XPackLicenseState::monitoringAcknowledgementMessages); messages.put(XPackPlugin.GRAPH, XPackLicenseState::graphAcknowledgementMessages); + messages.put(XPackPlugin.LOGSTASH, XPackLicenseState::logstashAcknowledgementMessages); ACKNOWLEDGMENT_MESSAGES = Collections.unmodifiableMap(messages); } @@ -167,6 +171,22 @@ public class XPackLicenseState { return Strings.EMPTY_ARRAY; } + private static String[] logstashAcknowledgementMessages(OperationMode currentMode, OperationMode newMode) { + switch (newMode) { + case TRIAL: + switch (currentMode) { + case BASIC: + case STANDARD: + case GOLD: + case PLATINUM: + return new String[] { "Logstash specific APIs will be disabled, but you can continue to manage " + + "and poll stored configurations" }; + } + break; + } + return Strings.EMPTY_ARRAY; + } + /** A wrapper for the license mode and state, to allow atomically swapping. */ private static class Status { @@ -404,4 +424,12 @@ public class XPackLicenseState { return licensed && localStatus.active; } + + /** + * Logstash is always allowed as long as there is an active license + * @return {@code true} as long as there is a valid license + */ + public boolean isLogstashAllowed() { + return status.active; + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java b/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java index 4a5136dd2ae..2c73cb582b9 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java @@ -70,6 +70,8 @@ import org.elasticsearch.xpack.extensions.XPackExtension; import org.elasticsearch.xpack.extensions.XPackExtensionsService; import org.elasticsearch.xpack.graph.Graph; import org.elasticsearch.xpack.graph.GraphFeatureSet; +import org.elasticsearch.xpack.logstash.Logstash; +import org.elasticsearch.xpack.logstash.LogstashFeatureSet; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearningFeatureSet; import org.elasticsearch.xpack.monitoring.Monitoring; @@ -143,6 +145,9 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I /** Name constant for the machine learning feature. */ public static final String MACHINE_LEARNING = "ml"; + /** Name constant for the Logstash feature. */ + public static final String LOGSTASH = "logstash"; + // inside of YAML settings we still use xpack do not having handle issues with dashes private static final String SETTINGS_NAME = "xpack"; @@ -192,6 +197,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I protected Watcher watcher; protected Graph graph; protected MachineLearning machineLearning; + protected Logstash logstash; public XPackPlugin(Settings settings) throws IOException, DestroyFailedException, OperatorCreationException, GeneralSecurityException { this.settings = settings; @@ -206,6 +212,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I this.watcher = new Watcher(settings); this.graph = new Graph(settings); this.machineLearning = new MachineLearning(settings, env, licenseState); + this.logstash = new Logstash(settings); // Check if the node is a transport client. if (transportClientMode == false) { this.extensionsService = new XPackExtensionsService(settings, resolveXPackExtensionsFile(env), getExtensions()); @@ -233,6 +240,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I modules.addAll(watcher.nodeModules()); modules.addAll(graph.createGuiceModules()); modules.addAll(machineLearning.nodeModules()); + modules.addAll(logstash.nodeModules()); if (transportClientMode) { modules.add(b -> b.bind(XPackLicenseState.class).toProvider(Providers.of(null))); @@ -283,6 +291,8 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I components.addAll(machineLearning.createComponents(internalClient, clusterService, threadPool, xContentRegistry)); + components.addAll(logstash.createComponents(internalClient, clusterService)); + // just create the reloader as it will pull all of the loaded ssl configurations and start watching them new SSLConfigurationReloader(settings, env, sslService, resourceWatcherService); return components; @@ -455,6 +465,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I entries.add(new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, MONITORING, MonitoringFeatureSet.Usage::new)); entries.add(new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, GRAPH, GraphFeatureSet.Usage::new)); entries.add(new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, MACHINE_LEARNING, MachineLearningFeatureSet.Usage::new)); + entries.add(new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, LOGSTASH, LogstashFeatureSet.Usage::new)); entries.addAll(watcher.getNamedWriteables()); entries.addAll(machineLearning.getNamedWriteables()); entries.addAll(licensing.getNamedWriteables()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/XPackSettings.java b/plugin/src/main/java/org/elasticsearch/xpack/XPackSettings.java index f53b6f2445f..b55a3fb59a0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/XPackSettings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/XPackSettings.java @@ -50,6 +50,9 @@ public class XPackSettings { /** Setting for enabling or disabling document/field level security. Defaults to true. */ public static final Setting DLS_FLS_ENABLED = enabledSetting(XPackPlugin.SECURITY + ".dls_fls", true); + /** Setting for enabling or disabling Logstash extensions. Defaults to true. */ + public static final Setting LOGSTASH_ENABLED = enabledSetting(XPackPlugin.LOGSTASH, true); + /** * Legacy setting for enabling or disabling transport ssl. Defaults to true. This is just here to make upgrading easier since the * user needs to set this setting in 5.x to upgrade diff --git a/plugin/src/main/java/org/elasticsearch/xpack/logstash/Logstash.java b/plugin/src/main/java/org/elasticsearch/xpack/logstash/Logstash.java new file mode 100644 index 00000000000..9a232052830 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/logstash/Logstash.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.logstash; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.xpack.XPackPlugin; +import org.elasticsearch.xpack.XPackSettings; +import org.elasticsearch.xpack.security.InternalClient; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * This class activates/deactivates the logstash modules depending if we're running a node client or transport client + */ +public class Logstash implements ActionPlugin { + + public static final String NAME = "logstash"; + + private final Settings settings; + private final boolean enabled; + private final boolean transportClientMode; + + public Logstash(Settings settings) { + this.settings = settings; + this.enabled = XPackSettings.LOGSTASH_ENABLED.get(settings); + this.transportClientMode = XPackPlugin.transportClientMode(settings); + } + + boolean isEnabled() { + return enabled; + } + + boolean isTransportClient() { + return transportClientMode; + } + + public Collection nodeModules() { + List modules = new ArrayList<>(); + modules.add(b -> { + XPackPlugin.bindFeatureSet(b, LogstashFeatureSet.class); + }); + return modules; + } + + public Collection createComponents(InternalClient client, ClusterService clusterService) { + if (this.transportClientMode || enabled == false) { + return Collections.emptyList(); + } + + return Collections.singletonList(new LogstashTemplateRegistry(settings, clusterService, client)); + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/logstash/LogstashFeatureSet.java b/plugin/src/main/java/org/elasticsearch/xpack/logstash/LogstashFeatureSet.java new file mode 100644 index 00000000000..260a10edf0f --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/logstash/LogstashFeatureSet.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.logstash; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.XPackFeatureSet; +import org.elasticsearch.xpack.XPackSettings; + + +import java.io.IOException; +import java.util.Map; + +public class LogstashFeatureSet implements XPackFeatureSet { + + private final boolean enabled; + private final XPackLicenseState licenseState; + + @Inject + public LogstashFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState) { + this.enabled = XPackSettings.LOGSTASH_ENABLED.get(settings); + this.licenseState = licenseState; + } + + @Override + public String name() { + return Logstash.NAME; + } + + @Override + public String description() { + return "Logstash management component for X-Pack"; + } + + @Override + public boolean available() { + return licenseState != null && licenseState.isLogstashAllowed(); + } + + @Override + public boolean enabled() { + return enabled; + } + + @Override + public Map nativeCodeInfo() { + return null; + } + + @Override + public void usage(ActionListener listener) { + listener.onResponse(new LogstashFeatureSet.Usage(available(), enabled())); + } + + public static class Usage extends XPackFeatureSet.Usage { + + public Usage(StreamInput in) throws IOException { + super(in); + } + + public Usage(boolean available, boolean enabled) { + super(Logstash.NAME, available, enabled); + } + + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + protected void innerXContent(XContentBuilder builder, Params params) throws IOException { + super.innerXContent(builder, params); + } + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/logstash/LogstashTemplateRegistry.java b/plugin/src/main/java/org/elasticsearch/xpack/logstash/LogstashTemplateRegistry.java new file mode 100644 index 00000000000..cb37640cd86 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/logstash/LogstashTemplateRegistry.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.logstash; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.xpack.template.TemplateUtils; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; + +/** + * Registry for the Logstash index template and settings + * This class is based on xpack.security.SecurityLifecycleService. + */ +public class LogstashTemplateRegistry extends AbstractComponent implements ClusterStateListener { + + public static final String LOGSTASH_INDEX_NAME = ".logstash"; + public static final String LOGSTASH_TEMPLATE_NAME = "logstash-index-template"; + public static final String TEMPLATE_VERSION_PATTERN = + Pattern.quote("${logstash.template.version}"); + + private static final String LOGSTASH_VERSION_PROPERTY = "logstash-version"; + + private final Client client; + + private final AtomicBoolean templateIsUpToDate = new AtomicBoolean(false); + + // only put the template if this is not already in progress + private final AtomicBoolean templateCreationPending = new AtomicBoolean(false); + + public LogstashTemplateRegistry(Settings settings, ClusterService clusterService, Client client) { + super(settings); + this.client = client; + clusterService.addListener(this); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.localNodeMaster()) { + + // wait until the gateway has recovered from disk, + // otherwise we think may not have the index templates while they actually do exist + if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false) { + addTemplatesIfMissing(event.state()); + } + } + } + + public boolean isTemplateUpToDate() { + return templateIsUpToDate.get(); + } + + public boolean isTemplateCreationPending() { + return templateCreationPending.get(); + } + + private void addTemplatesIfMissing(ClusterState state) { + this.templateIsUpToDate.set(TemplateUtils.checkTemplateExistsAndIsUpToDate(LOGSTASH_TEMPLATE_NAME, + LOGSTASH_VERSION_PROPERTY, state, logger)); + + // only put the template if its not up to date and if its not already in progress + if (isTemplateUpToDate() == false && templateCreationPending.compareAndSet(false, true)) { + putTemplate(); + } + } + + private void putTemplate() { + logger.debug("putting the template [{}]", LOGSTASH_TEMPLATE_NAME); + String template = TemplateUtils.loadTemplate("/" + LOGSTASH_TEMPLATE_NAME + ".json", + Version.CURRENT.toString(), TEMPLATE_VERSION_PATTERN); + + PutIndexTemplateRequest putTemplateRequest = client.admin().indices() + .preparePutTemplate(LOGSTASH_TEMPLATE_NAME) + .setSource( + new BytesArray(template.getBytes(StandardCharsets.UTF_8)), + XContentType.JSON) + .request(); + + client.admin().indices().putTemplate(putTemplateRequest, ActionListener.wrap(r -> { + templateCreationPending.set(false); + if (r.isAcknowledged()) { + templateIsUpToDate.set(true); + logger.debug("successfully updated [{}] index template", LOGSTASH_TEMPLATE_NAME); + } else { + logger.error("put template [{}] was not acknowledged", LOGSTASH_TEMPLATE_NAME); + } + }, e -> { + templateCreationPending.set(false); + logger.warn(new ParameterizedMessage( + "failed to put template [{}]", LOGSTASH_TEMPLATE_NAME), e); + })); + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java b/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java index f9edc3d9cf4..dfe3151ccf3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java @@ -141,7 +141,8 @@ public class IndexLifecycleManager extends AbstractComponent { final ClusterState state = event.state(); this.indexExists = resolveConcreteIndex(indexName, event.state().metaData()) != null; this.indexAvailable = checkIndexAvailable(state); - this.templateIsUpToDate = checkTemplateExistsAndIsUpToDate(state); + this.templateIsUpToDate = TemplateUtils.checkTemplateExistsAndIsUpToDate(templateName, + SECURITY_VERSION_STRING, state, logger); this.mappingIsUpToDate = checkIndexMappingUpToDate(state); this.canWriteToIndex = templateIsUpToDate && mappingIsUpToDate; this.mappingVersion = oldestIndexMappingVersion(event.state()); @@ -178,54 +179,11 @@ public class IndexLifecycleManager extends AbstractComponent { } } - private boolean checkTemplateExistsAndIsUpToDate(ClusterState state) { - return checkTemplateExistsAndVersionMatches(templateName, state, logger, - Version.CURRENT::equals); - } - public static boolean checkTemplateExistsAndVersionMatches( String templateName, ClusterState state, Logger logger, Predicate predicate) { - IndexTemplateMetaData templateMeta = state.metaData().templates().get(templateName); - if (templateMeta == null) { - return false; - } - ImmutableOpenMap mappings = templateMeta.getMappings(); - // check all mappings contain correct version in _meta - // we have to parse the source here which is annoying - for (Object typeMapping : mappings.values().toArray()) { - CompressedXContent typeMappingXContent = (CompressedXContent) typeMapping; - try { - Map typeMappingMap = convertToMap( - new BytesArray(typeMappingXContent.uncompressed()), false, - XContentType.JSON).v2(); - // should always contain one entry with key = typename - assert (typeMappingMap.size() == 1); - String key = typeMappingMap.keySet().iterator().next(); - // get the actual mapping entries - @SuppressWarnings("unchecked") - Map mappingMap = (Map) typeMappingMap.get(key); - if (containsCorrectVersion(mappingMap, predicate) == false) { - return false; - } - } catch (ElasticsearchParseException e) { - logger.error(new ParameterizedMessage( - "Cannot parse the template [{}]", templateName), e); - throw new IllegalStateException("Cannot parse the template " + templateName, e); - } - } - return true; - } - - private static boolean containsCorrectVersion(Map typeMappingMap, - Predicate predicate) { - @SuppressWarnings("unchecked") - Map meta = (Map) typeMappingMap.get("_meta"); - if (meta == null) { - // pre 5.0, cannot be up to date - return false; - } - return predicate.test(Version.fromString((String) meta.get(SECURITY_VERSION_STRING))); + return TemplateUtils.checkTemplateExistsAndVersionMatches(templateName, SECURITY_VERSION_STRING, + state, logger, predicate); } private boolean checkIndexMappingUpToDate(ClusterState clusterState) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/template/TemplateUtils.java b/plugin/src/main/java/org/elasticsearch/xpack/template/TemplateUtils.java index dfb8bd2d2a2..b2d5fa660a3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/template/TemplateUtils.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/template/TemplateUtils.java @@ -5,9 +5,16 @@ */ package org.elasticsearch.xpack.template; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.xcontent.XContentHelper; @@ -17,8 +24,12 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.function.Predicate; import java.util.regex.Pattern; +import static org.elasticsearch.common.xcontent.XContentHelper.convertToMap; + /** * Handling versioned templates for time-based indices in x-pack */ @@ -78,4 +89,71 @@ public class TemplateUtils { .replaceAll(version); } + /** + * Checks if a versioned template exists, and if it exists checks if it is up-to-date with current version. + * @param versionKey The property in the mapping's _meta field which stores the version info + * @param templateName Name of the index template + * @param state Cluster state + * @param logger Logger + */ + public static boolean checkTemplateExistsAndIsUpToDate( + String templateName, String versionKey, ClusterState state, Logger logger) { + + return checkTemplateExistsAndVersionMatches(templateName, versionKey, state, logger, + Version.CURRENT::equals); + } + + /** + * Checks if template with given name exists and if it matches the version predicate given + * @param versionKey The property in the mapping's _meta field which stores the version info + * @param templateName Name of the index template + * @param state Cluster state + * @param logger Logger + * @param predicate Predicate to execute on version check + */ + public static boolean checkTemplateExistsAndVersionMatches( + String templateName, String versionKey, ClusterState state, Logger logger, Predicate predicate) { + + IndexTemplateMetaData templateMeta = state.metaData().templates().get(templateName); + if (templateMeta == null) { + return false; + } + ImmutableOpenMap mappings = templateMeta.getMappings(); + // check all mappings contain correct version in _meta + // we have to parse the source here which is annoying + for (Object typeMapping : mappings.values().toArray()) { + CompressedXContent typeMappingXContent = (CompressedXContent) typeMapping; + try { + Map typeMappingMap = convertToMap( + new BytesArray(typeMappingXContent.uncompressed()), false, + XContentType.JSON).v2(); + // should always contain one entry with key = typename + assert (typeMappingMap.size() == 1); + String key = typeMappingMap.keySet().iterator().next(); + // get the actual mapping entries + @SuppressWarnings("unchecked") + Map mappingMap = (Map) typeMappingMap.get(key); + if (containsCorrectVersion(versionKey, mappingMap, predicate) == false) { + return false; + } + } catch (ElasticsearchParseException e) { + logger.error(new ParameterizedMessage( + "Cannot parse the template [{}]", templateName), e); + throw new IllegalStateException("Cannot parse the template " + templateName, e); + } + } + return true; + } + + private static boolean containsCorrectVersion(String versionKey, Map typeMappingMap, + Predicate predicate) { + @SuppressWarnings("unchecked") + Map meta = (Map) typeMappingMap.get("_meta"); + if (meta == null) { + // pre 5.0, cannot be up to date + return false; + } + return predicate.test(Version.fromString((String) meta.get(versionKey))); + } + } diff --git a/plugin/src/main/resources/logstash-index-template.json b/plugin/src/main/resources/logstash-index-template.json new file mode 100644 index 00000000000..56a57dbd164 --- /dev/null +++ b/plugin/src/main/resources/logstash-index-template.json @@ -0,0 +1,50 @@ +{ + "index_patterns" : ".logstash", + "settings": { + "index": { + "number_of_shards": 1, + "number_of_replicas": 1, + "codec": "best_compression", + "mapping.single_type": false + } + }, + "mappings" : { + "doc" : { + "_meta": { + "logstash-version": "${logstash.template.version}" + }, + "dynamic": "strict", + "properties":{ + "description":{ + "type":"text" + }, + "last_modified":{ + "type":"date" + }, + "pipeline_metadata":{ + "properties":{ + "version":{ + "type":"short" + }, + "type":{ + "type":"keyword" + } + } + }, + "version":{ + "type":"keyword" + }, + "pipeline":{ + "type":"text" + }, + "username":{ + "type":"keyword" + }, + "metadata":{ + "type":"object", + "dynamic":false + } + } + } + } +} diff --git a/plugin/src/test/java/org/elasticsearch/license/XPackLicenseStateTests.java b/plugin/src/test/java/org/elasticsearch/license/XPackLicenseStateTests.java index 1fd56876859..a2cb37d6acd 100644 --- a/plugin/src/test/java/org/elasticsearch/license/XPackLicenseStateTests.java +++ b/plugin/src/test/java/org/elasticsearch/license/XPackLicenseStateTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.license; import org.elasticsearch.license.License.OperationMode; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.XPackPlugin; +import org.elasticsearch.xpack.logstash.Logstash; import org.elasticsearch.xpack.monitoring.Monitoring; import org.hamcrest.Matchers; @@ -290,4 +291,14 @@ public class XPackLicenseStateTests extends ESTestCase { assertAllowed(TRIAL, false, XPackLicenseState::isMachineLearningAllowed, false); assertAllowed(PLATINUM, false, XPackLicenseState::isMachineLearningAllowed, false); } + + public void testLogstashAllowed() { + assertAllowed(randomMode(), true, XPackLicenseState::isLogstashAllowed, true); + assertAllowed(randomMode(), false, XPackLicenseState::isLogstashAllowed, false); + } + + public void testLogstashAckNotBasicToTrial() { + OperationMode from = randomFrom(STANDARD, BASIC, GOLD, PLATINUM); + assertAckMesssages(Logstash.NAME, from, TRIAL, 1); + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/logstash/LogstashFeatureSetTests.java b/plugin/src/test/java/org/elasticsearch/xpack/logstash/LogstashFeatureSetTests.java new file mode 100644 index 00000000000..76085505adc --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/logstash/LogstashFeatureSetTests.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.logstash; + +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.XPackFeatureSet; + +import static org.mockito.Mockito.mock; +import static org.hamcrest.core.Is.is; +import static org.mockito.Mockito.when; + +public class LogstashFeatureSetTests extends ESTestCase { + + public void testEnabledSetting() throws Exception { + boolean enabled = randomBoolean(); + Settings settings = Settings.builder() + .put("path.home", createTempDir()) + .put("xpack.logstash.enabled", enabled) + .build(); + LogstashFeatureSet featureSet = new LogstashFeatureSet(settings, null); + assertThat(featureSet.enabled(), is(enabled)); + + PlainActionFuture future = new PlainActionFuture<>(); + featureSet.usage(future); + XPackFeatureSet.Usage usage = future.get(); + + BytesStreamOutput out = new BytesStreamOutput(); + usage.writeTo(out); + XPackFeatureSet.Usage serializedUsage = new LogstashFeatureSet.Usage(out.bytes().streamInput()); + assertThat(serializedUsage.enabled(), is(enabled)); + } + + public void testEnabledDefault() throws Exception { + Settings settings = Settings.builder().put("path.home", createTempDir()).build(); + LogstashFeatureSet featureSet = new LogstashFeatureSet(settings, null); + assertThat(featureSet.enabled(), is(true)); + } + + public void testAvailable() throws Exception { + final XPackLicenseState licenseState = mock(XPackLicenseState.class); + LogstashFeatureSet featureSet = new LogstashFeatureSet(Settings.EMPTY, licenseState); + boolean available = randomBoolean(); + when(licenseState.isLogstashAllowed()).thenReturn(available); + assertThat(featureSet.available(), is(available)); + + PlainActionFuture future = new PlainActionFuture<>(); + featureSet.usage(future); + XPackFeatureSet.Usage usage = future.get(); + assertThat(usage.available(), is(available)); + + BytesStreamOutput out = new BytesStreamOutput(); + usage.writeTo(out); + XPackFeatureSet.Usage serializedUsage = new LogstashFeatureSet.Usage(out.bytes().streamInput()); + assertThat(serializedUsage.available(), is(available)); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/logstash/LogstashTemplateRegistryTests.java b/plugin/src/test/java/org/elasticsearch/xpack/logstash/LogstashTemplateRegistryTests.java new file mode 100644 index 00000000000..44b2a79fa5e --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/logstash/LogstashTemplateRegistryTests.java @@ -0,0 +1,245 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.logstash; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.MockTransportClient; +import org.elasticsearch.xpack.security.InternalClient; +import org.elasticsearch.xpack.template.TemplateUtils; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; + +import static org.elasticsearch.mock.orig.Mockito.times; +import static org.elasticsearch.xpack.logstash.LogstashTemplateRegistry.LOGSTASH_INDEX_NAME; +import static org.elasticsearch.xpack.logstash.LogstashTemplateRegistry.LOGSTASH_TEMPLATE_NAME; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class LogstashTemplateRegistryTests extends ESTestCase { + private static final int NUM_LOGSTASH_INDEXES = 1; // .logstash + + private InternalClient client; + private ExecutorService executorService; + private TransportClient transportClient; + private ThreadPool threadPool; + private ClusterService clusterService; + private LogstashTemplateRegistry logstashTemplateRegistry; + private static final ClusterState EMPTY_CLUSTER_STATE = + new ClusterState.Builder(new ClusterName("test-cluster")).build(); + CopyOnWriteArrayList listeners; + + @Before + public void setup() { + executorService = mock(ExecutorService.class); + threadPool = mock(ThreadPool.class); + clusterService = mock(ClusterService.class); + + final ExecutorService executorService = EsExecutors.newDirectExecutorService(); + when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); + + transportClient = new MockTransportClient(Settings.EMPTY); + class TestInternalClient extends InternalClient { + TestInternalClient(Client transportClient) { + super(Settings.EMPTY, null, transportClient); + } + + @Override + protected > + void doExecute(Action action, Request request, + ActionListener listener) { + listeners.add(listener); + } + } + client = new TestInternalClient(transportClient); + listeners = new CopyOnWriteArrayList<>(); + logstashTemplateRegistry = new LogstashTemplateRegistry(Settings.EMPTY, clusterService, client); + } + + @After + public void stop() throws InterruptedException { + if (transportClient != null) { + transportClient.close(); + } + } + + public void testAddsListener() throws Exception { + LogstashTemplateRegistry templateRegistry = new LogstashTemplateRegistry(Settings.EMPTY, clusterService, client); + verify(clusterService, times(1)).addListener(templateRegistry); + } + + public void testAddTemplatesIfMissing() throws IOException { + ClusterState.Builder clusterStateBuilder = createClusterStateWithTemplate( + "/" + LOGSTASH_TEMPLATE_NAME + ".json" + ); + logstashTemplateRegistry.clusterChanged(new ClusterChangedEvent("test-event", + clusterStateBuilder.build(), EMPTY_CLUSTER_STATE)); + assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(true)); + assertThat(listeners, hasSize(0)); + } + + public void testWrongVersionIndexTemplate_isIdentifiedAsNotUpToDate() throws IOException { + String templateString = "/wrong-version-" + LOGSTASH_TEMPLATE_NAME + ".json"; + ClusterState.Builder clusterStateBuilder = createClusterStateWithTemplate(templateString); + + logstashTemplateRegistry.clusterChanged(new ClusterChangedEvent("test-event", + clusterStateBuilder.build(), EMPTY_CLUSTER_STATE)); + assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(false)); + assertThat(listeners, hasSize(NUM_LOGSTASH_INDEXES)); + } + + public void testWrongVersionIndexTemplate_isUpdated() throws IOException { + String templateString = "/wrong-version-" + LOGSTASH_TEMPLATE_NAME + ".json"; + ClusterState.Builder clusterStateBuilder = createClusterStateWithTemplate(templateString); + + final ClusterState clusterState = clusterStateBuilder.build(); + logstashTemplateRegistry.clusterChanged(new ClusterChangedEvent("test-event", + clusterState, EMPTY_CLUSTER_STATE)); + assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(false)); + assertThat(listeners, hasSize(NUM_LOGSTASH_INDEXES)); + assertThat("Expected pending template creation", logstashTemplateRegistry.isTemplateCreationPending(), is(true)); + + // if we do it again this should not send an update + ActionListener listener = listeners.get(0); + listeners.clear(); + logstashTemplateRegistry.clusterChanged(new ClusterChangedEvent("test-event", + clusterState, EMPTY_CLUSTER_STATE)); + assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(false)); + assertThat(listeners, hasSize(0)); + assertThat("Expected pending template creation", logstashTemplateRegistry.isTemplateCreationPending(), is(true)); + + // if we now simulate an error... + listener.onFailure(new Exception()); + assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(false)); + assertFalse(logstashTemplateRegistry.isTemplateCreationPending()); + + // ... we should be able to send a new update + logstashTemplateRegistry.clusterChanged(new ClusterChangedEvent("test-event", + clusterState, EMPTY_CLUSTER_STATE)); + assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(false)); + assertThat(listeners, hasSize(1)); + assertThat("Expected pending template creation", logstashTemplateRegistry.isTemplateCreationPending(), is(true)); + + // now check what happens if we get back an unacknowledged response + listeners.get(0).onResponse(new TestPutIndexTemplateResponse()); + assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(false)); + assertThat("Didn't expect pending template creation", logstashTemplateRegistry.isTemplateCreationPending(), is(false)); + + // and now let's see what happens if we get back a response + listeners.clear(); + logstashTemplateRegistry.clusterChanged(new ClusterChangedEvent("test-event", + clusterState, EMPTY_CLUSTER_STATE)); + assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(false)); + assertThat("Expected pending template creation", logstashTemplateRegistry.isTemplateCreationPending(), is(true)); + assertThat(listeners, hasSize(1)); + listeners.get(0).onResponse(new TestPutIndexTemplateResponse(true)); + assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(true)); + assertThat("Didn't expect pending template creation", logstashTemplateRegistry.isTemplateCreationPending(), is(false)); + } + + private static ClusterState.Builder createClusterStateWithTemplate(String logstashTemplateString) throws IOException { + MetaData.Builder metaDataBuilder = new MetaData.Builder(); + + IndexTemplateMetaData.Builder logstashTemplateBuilder = + getIndexTemplateMetaData(LOGSTASH_TEMPLATE_NAME, logstashTemplateString); + metaDataBuilder.put(logstashTemplateBuilder); + // add the correct mapping no matter what the template + String logstashMappingString = "/" + LOGSTASH_TEMPLATE_NAME + ".json"; + IndexMetaData.Builder logstashIndexMeta = + createIndexMetadata(LOGSTASH_INDEX_NAME, logstashMappingString); + metaDataBuilder.put(logstashIndexMeta); + + return ClusterState.builder(state()).metaData(metaDataBuilder.build()); + } + + private static IndexTemplateMetaData.Builder getIndexTemplateMetaData( + String templateName, String templateString) throws IOException { + + String template = TemplateUtils.loadTemplate(templateString, Version.CURRENT.toString(), + LogstashTemplateRegistry.TEMPLATE_VERSION_PATTERN); + PutIndexTemplateRequest request = new PutIndexTemplateRequest(); + request.source(template, XContentType.JSON); + IndexTemplateMetaData.Builder templateBuilder = + IndexTemplateMetaData.builder(templateName); + for (Map.Entry entry : request.mappings().entrySet()) { + templateBuilder.putMapping(entry.getKey(), entry.getValue()); + } + return templateBuilder; + } + + private static IndexMetaData.Builder createIndexMetadata( + String indexName, String templateString) throws IOException { + String template = TemplateUtils.loadTemplate(templateString, Version.CURRENT.toString(), + LogstashTemplateRegistry.TEMPLATE_VERSION_PATTERN); + PutIndexTemplateRequest request = new PutIndexTemplateRequest(); + request.source(template, XContentType.JSON); + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); + indexMetaData.settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build()); + + for (Map.Entry entry : request.mappings().entrySet()) { + indexMetaData.putMapping(entry.getKey(), entry.getValue()); + } + return indexMetaData; + } + + // cluster state where local node is master + private static ClusterState state() { + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + discoBuilder.masterNodeId("1"); + discoBuilder.localNodeId("1"); + ClusterState.Builder state = ClusterState.builder(new ClusterName("test-cluster")); + state.nodes(discoBuilder); + state.metaData(MetaData.builder().generateClusterUuidIfNeeded()); + return state.build(); + } + + private static class TestPutIndexTemplateResponse extends PutIndexTemplateResponse { + TestPutIndexTemplateResponse(boolean acknowledged) { + super(acknowledged); + } + + TestPutIndexTemplateResponse() { + super(); + } + } +} diff --git a/plugin/src/test/resources/wrong-version-logstash-index-template.json b/plugin/src/test/resources/wrong-version-logstash-index-template.json new file mode 100644 index 00000000000..6a6b616a8a5 --- /dev/null +++ b/plugin/src/test/resources/wrong-version-logstash-index-template.json @@ -0,0 +1,50 @@ +{ + "index_patterns" : ".logstash", + "settings": { + "index": { + "number_of_shards": 1, + "number_of_replicas": 1, + "codec": "best_compression", + "mapping.single_type": false + } + }, + "mappings" : { + "doc" : { + "_meta": { + "logstash-version": "4.0.0" + }, + "dynamic": "strict", + "properties":{ + "description":{ + "type":"text" + }, + "last_modified":{ + "type":"date" + }, + "pipeline_metadata":{ + "properties":{ + "version":{ + "type":"short" + }, + "type":{ + "type":"keyword" + } + } + }, + "version":{ + "type":"keyword" + }, + "pipeline":{ + "type":"text" + }, + "username":{ + "type":"keyword" + }, + "metadata":{ + "type":"object", + "dynamic":false + } + } + } + } +}