[Logstash] Add new component to x-pack to handle LS features (elastic/x-pack-elasticsearch#1530)

This commit adds a new Logstash component to x-pack to support the config management work. Currently, the functionality in this component is really simple; all it does is upload a new index template for `.logstash` index. This index stores the actual LS configuration.

On this template is bootstrapped in ES, Kibana can write user-created LS configs which adhere to the mapping defined here. In the future, we're looking into adding more functionality on the ES side to handle config documents, but for now, this is simple.

relates elastic/x-pack-elasticsearch#1499, relates elastic/x-pack-elasticsearch#1471

Original commit: elastic/x-pack-elasticsearch@d7cc8675f7
This commit is contained in:
Suyog Rao 2017-06-13 10:30:30 -07:00 committed by GitHub
parent f2e2ccae01
commit fe72991c70
14 changed files with 803 additions and 46 deletions

View File

@ -53,6 +53,11 @@ Example response:
"available" : true,
"enabled" : true
},
"logstash" : {
"description" : "Logstash management component for X-Pack",
"available" : true,
"enabled" : true
},
"ml" : {
"description" : "Machine Learning for the Elastic Stack",
"available" : true,

View File

@ -48,6 +48,9 @@ public class XPackLicenseState {
messages.put(XPackPlugin.MACHINE_LEARNING, new String[] {
"Machine learning APIs are disabled"
});
messages.put(XPackPlugin.LOGSTASH, new String[] {
"Logstash specific APIs are disabled. You can continue to manage and poll stored configurations"
});
EXPIRATION_MESSAGES = Collections.unmodifiableMap(messages);
}
@ -62,6 +65,7 @@ public class XPackLicenseState {
messages.put(XPackPlugin.WATCHER, XPackLicenseState::watcherAcknowledgementMessages);
messages.put(XPackPlugin.MONITORING, XPackLicenseState::monitoringAcknowledgementMessages);
messages.put(XPackPlugin.GRAPH, XPackLicenseState::graphAcknowledgementMessages);
messages.put(XPackPlugin.LOGSTASH, XPackLicenseState::logstashAcknowledgementMessages);
ACKNOWLEDGMENT_MESSAGES = Collections.unmodifiableMap(messages);
}
@ -167,6 +171,22 @@ public class XPackLicenseState {
return Strings.EMPTY_ARRAY;
}
private static String[] logstashAcknowledgementMessages(OperationMode currentMode, OperationMode newMode) {
switch (newMode) {
case TRIAL:
switch (currentMode) {
case BASIC:
case STANDARD:
case GOLD:
case PLATINUM:
return new String[] { "Logstash specific APIs will be disabled, but you can continue to manage " +
"and poll stored configurations" };
}
break;
}
return Strings.EMPTY_ARRAY;
}
/** A wrapper for the license mode and state, to allow atomically swapping. */
private static class Status {
@ -404,4 +424,12 @@ public class XPackLicenseState {
return licensed && localStatus.active;
}
/**
* Logstash is always allowed as long as there is an active license
* @return {@code true} as long as there is a valid license
*/
public boolean isLogstashAllowed() {
return status.active;
}
}

View File

@ -70,6 +70,8 @@ import org.elasticsearch.xpack.extensions.XPackExtension;
import org.elasticsearch.xpack.extensions.XPackExtensionsService;
import org.elasticsearch.xpack.graph.Graph;
import org.elasticsearch.xpack.graph.GraphFeatureSet;
import org.elasticsearch.xpack.logstash.Logstash;
import org.elasticsearch.xpack.logstash.LogstashFeatureSet;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MachineLearningFeatureSet;
import org.elasticsearch.xpack.monitoring.Monitoring;
@ -143,6 +145,9 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
/** Name constant for the machine learning feature. */
public static final String MACHINE_LEARNING = "ml";
/** Name constant for the Logstash feature. */
public static final String LOGSTASH = "logstash";
// inside of YAML settings we still use xpack do not having handle issues with dashes
private static final String SETTINGS_NAME = "xpack";
@ -192,6 +197,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
protected Watcher watcher;
protected Graph graph;
protected MachineLearning machineLearning;
protected Logstash logstash;
public XPackPlugin(Settings settings) throws IOException, DestroyFailedException, OperatorCreationException, GeneralSecurityException {
this.settings = settings;
@ -206,6 +212,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
this.watcher = new Watcher(settings);
this.graph = new Graph(settings);
this.machineLearning = new MachineLearning(settings, env, licenseState);
this.logstash = new Logstash(settings);
// Check if the node is a transport client.
if (transportClientMode == false) {
this.extensionsService = new XPackExtensionsService(settings, resolveXPackExtensionsFile(env), getExtensions());
@ -233,6 +240,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
modules.addAll(watcher.nodeModules());
modules.addAll(graph.createGuiceModules());
modules.addAll(machineLearning.nodeModules());
modules.addAll(logstash.nodeModules());
if (transportClientMode) {
modules.add(b -> b.bind(XPackLicenseState.class).toProvider(Providers.of(null)));
@ -283,6 +291,8 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
components.addAll(machineLearning.createComponents(internalClient, clusterService, threadPool, xContentRegistry));
components.addAll(logstash.createComponents(internalClient, clusterService));
// just create the reloader as it will pull all of the loaded ssl configurations and start watching them
new SSLConfigurationReloader(settings, env, sslService, resourceWatcherService);
return components;
@ -455,6 +465,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
entries.add(new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, MONITORING, MonitoringFeatureSet.Usage::new));
entries.add(new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, GRAPH, GraphFeatureSet.Usage::new));
entries.add(new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, MACHINE_LEARNING, MachineLearningFeatureSet.Usage::new));
entries.add(new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, LOGSTASH, LogstashFeatureSet.Usage::new));
entries.addAll(watcher.getNamedWriteables());
entries.addAll(machineLearning.getNamedWriteables());
entries.addAll(licensing.getNamedWriteables());

View File

@ -50,6 +50,9 @@ public class XPackSettings {
/** Setting for enabling or disabling document/field level security. Defaults to true. */
public static final Setting<Boolean> DLS_FLS_ENABLED = enabledSetting(XPackPlugin.SECURITY + ".dls_fls", true);
/** Setting for enabling or disabling Logstash extensions. Defaults to true. */
public static final Setting<Boolean> LOGSTASH_ENABLED = enabledSetting(XPackPlugin.LOGSTASH, true);
/**
* Legacy setting for enabling or disabling transport ssl. Defaults to true. This is just here to make upgrading easier since the
* user needs to set this setting in 5.x to upgrade

View File

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* This class activates/deactivates the logstash modules depending if we're running a node client or transport client
*/
public class Logstash implements ActionPlugin {
public static final String NAME = "logstash";
private final Settings settings;
private final boolean enabled;
private final boolean transportClientMode;
public Logstash(Settings settings) {
this.settings = settings;
this.enabled = XPackSettings.LOGSTASH_ENABLED.get(settings);
this.transportClientMode = XPackPlugin.transportClientMode(settings);
}
boolean isEnabled() {
return enabled;
}
boolean isTransportClient() {
return transportClientMode;
}
public Collection<Module> nodeModules() {
List<Module> modules = new ArrayList<>();
modules.add(b -> {
XPackPlugin.bindFeatureSet(b, LogstashFeatureSet.class);
});
return modules;
}
public Collection<Object> createComponents(InternalClient client, ClusterService clusterService) {
if (this.transportClientMode || enabled == false) {
return Collections.emptyList();
}
return Collections.singletonList(new LogstashTemplateRegistry(settings, clusterService, client));
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.XPackFeatureSet;
import org.elasticsearch.xpack.XPackSettings;
import java.io.IOException;
import java.util.Map;
public class LogstashFeatureSet implements XPackFeatureSet {
private final boolean enabled;
private final XPackLicenseState licenseState;
@Inject
public LogstashFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState) {
this.enabled = XPackSettings.LOGSTASH_ENABLED.get(settings);
this.licenseState = licenseState;
}
@Override
public String name() {
return Logstash.NAME;
}
@Override
public String description() {
return "Logstash management component for X-Pack";
}
@Override
public boolean available() {
return licenseState != null && licenseState.isLogstashAllowed();
}
@Override
public boolean enabled() {
return enabled;
}
@Override
public Map<String, Object> nativeCodeInfo() {
return null;
}
@Override
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
listener.onResponse(new LogstashFeatureSet.Usage(available(), enabled()));
}
public static class Usage extends XPackFeatureSet.Usage {
public Usage(StreamInput in) throws IOException {
super(in);
}
public Usage(boolean available, boolean enabled) {
super(Logstash.NAME, available, enabled);
}
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
super.innerXContent(builder, params);
}
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.xpack.template.TemplateUtils;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
/**
* Registry for the Logstash index template and settings
* This class is based on xpack.security.SecurityLifecycleService.
*/
public class LogstashTemplateRegistry extends AbstractComponent implements ClusterStateListener {
public static final String LOGSTASH_INDEX_NAME = ".logstash";
public static final String LOGSTASH_TEMPLATE_NAME = "logstash-index-template";
public static final String TEMPLATE_VERSION_PATTERN =
Pattern.quote("${logstash.template.version}");
private static final String LOGSTASH_VERSION_PROPERTY = "logstash-version";
private final Client client;
private final AtomicBoolean templateIsUpToDate = new AtomicBoolean(false);
// only put the template if this is not already in progress
private final AtomicBoolean templateCreationPending = new AtomicBoolean(false);
public LogstashTemplateRegistry(Settings settings, ClusterService clusterService, Client client) {
super(settings);
this.client = client;
clusterService.addListener(this);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeMaster()) {
// wait until the gateway has recovered from disk,
// otherwise we think may not have the index templates while they actually do exist
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false) {
addTemplatesIfMissing(event.state());
}
}
}
public boolean isTemplateUpToDate() {
return templateIsUpToDate.get();
}
public boolean isTemplateCreationPending() {
return templateCreationPending.get();
}
private void addTemplatesIfMissing(ClusterState state) {
this.templateIsUpToDate.set(TemplateUtils.checkTemplateExistsAndIsUpToDate(LOGSTASH_TEMPLATE_NAME,
LOGSTASH_VERSION_PROPERTY, state, logger));
// only put the template if its not up to date and if its not already in progress
if (isTemplateUpToDate() == false && templateCreationPending.compareAndSet(false, true)) {
putTemplate();
}
}
private void putTemplate() {
logger.debug("putting the template [{}]", LOGSTASH_TEMPLATE_NAME);
String template = TemplateUtils.loadTemplate("/" + LOGSTASH_TEMPLATE_NAME + ".json",
Version.CURRENT.toString(), TEMPLATE_VERSION_PATTERN);
PutIndexTemplateRequest putTemplateRequest = client.admin().indices()
.preparePutTemplate(LOGSTASH_TEMPLATE_NAME)
.setSource(
new BytesArray(template.getBytes(StandardCharsets.UTF_8)),
XContentType.JSON)
.request();
client.admin().indices().putTemplate(putTemplateRequest, ActionListener.wrap(r -> {
templateCreationPending.set(false);
if (r.isAcknowledged()) {
templateIsUpToDate.set(true);
logger.debug("successfully updated [{}] index template", LOGSTASH_TEMPLATE_NAME);
} else {
logger.error("put template [{}] was not acknowledged", LOGSTASH_TEMPLATE_NAME);
}
}, e -> {
templateCreationPending.set(false);
logger.warn(new ParameterizedMessage(
"failed to put template [{}]", LOGSTASH_TEMPLATE_NAME), e);
}));
}
}

View File

@ -141,7 +141,8 @@ public class IndexLifecycleManager extends AbstractComponent {
final ClusterState state = event.state();
this.indexExists = resolveConcreteIndex(indexName, event.state().metaData()) != null;
this.indexAvailable = checkIndexAvailable(state);
this.templateIsUpToDate = checkTemplateExistsAndIsUpToDate(state);
this.templateIsUpToDate = TemplateUtils.checkTemplateExistsAndIsUpToDate(templateName,
SECURITY_VERSION_STRING, state, logger);
this.mappingIsUpToDate = checkIndexMappingUpToDate(state);
this.canWriteToIndex = templateIsUpToDate && mappingIsUpToDate;
this.mappingVersion = oldestIndexMappingVersion(event.state());
@ -178,54 +179,11 @@ public class IndexLifecycleManager extends AbstractComponent {
}
}
private boolean checkTemplateExistsAndIsUpToDate(ClusterState state) {
return checkTemplateExistsAndVersionMatches(templateName, state, logger,
Version.CURRENT::equals);
}
public static boolean checkTemplateExistsAndVersionMatches(
String templateName, ClusterState state, Logger logger, Predicate<Version> predicate) {
IndexTemplateMetaData templateMeta = state.metaData().templates().get(templateName);
if (templateMeta == null) {
return false;
}
ImmutableOpenMap<String, CompressedXContent> mappings = templateMeta.getMappings();
// check all mappings contain correct version in _meta
// we have to parse the source here which is annoying
for (Object typeMapping : mappings.values().toArray()) {
CompressedXContent typeMappingXContent = (CompressedXContent) typeMapping;
try {
Map<String, Object> typeMappingMap = convertToMap(
new BytesArray(typeMappingXContent.uncompressed()), false,
XContentType.JSON).v2();
// should always contain one entry with key = typename
assert (typeMappingMap.size() == 1);
String key = typeMappingMap.keySet().iterator().next();
// get the actual mapping entries
@SuppressWarnings("unchecked")
Map<String, Object> mappingMap = (Map<String, Object>) typeMappingMap.get(key);
if (containsCorrectVersion(mappingMap, predicate) == false) {
return false;
}
} catch (ElasticsearchParseException e) {
logger.error(new ParameterizedMessage(
"Cannot parse the template [{}]", templateName), e);
throw new IllegalStateException("Cannot parse the template " + templateName, e);
}
}
return true;
}
private static boolean containsCorrectVersion(Map<String, Object> typeMappingMap,
Predicate<Version> predicate) {
@SuppressWarnings("unchecked")
Map<String, Object> meta = (Map<String, Object>) typeMappingMap.get("_meta");
if (meta == null) {
// pre 5.0, cannot be up to date
return false;
}
return predicate.test(Version.fromString((String) meta.get(SECURITY_VERSION_STRING)));
return TemplateUtils.checkTemplateExistsAndVersionMatches(templateName, SECURITY_VERSION_STRING,
state, logger, predicate);
}
private boolean checkIndexMappingUpToDate(ClusterState clusterState) {

View File

@ -5,9 +5,16 @@
*/
package org.elasticsearch.xpack.template;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.XContentHelper;
@ -17,8 +24,12 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import static org.elasticsearch.common.xcontent.XContentHelper.convertToMap;
/**
* Handling versioned templates for time-based indices in x-pack
*/
@ -78,4 +89,71 @@ public class TemplateUtils {
.replaceAll(version);
}
/**
* Checks if a versioned template exists, and if it exists checks if it is up-to-date with current version.
* @param versionKey The property in the mapping's _meta field which stores the version info
* @param templateName Name of the index template
* @param state Cluster state
* @param logger Logger
*/
public static boolean checkTemplateExistsAndIsUpToDate(
String templateName, String versionKey, ClusterState state, Logger logger) {
return checkTemplateExistsAndVersionMatches(templateName, versionKey, state, logger,
Version.CURRENT::equals);
}
/**
* Checks if template with given name exists and if it matches the version predicate given
* @param versionKey The property in the mapping's _meta field which stores the version info
* @param templateName Name of the index template
* @param state Cluster state
* @param logger Logger
* @param predicate Predicate to execute on version check
*/
public static boolean checkTemplateExistsAndVersionMatches(
String templateName, String versionKey, ClusterState state, Logger logger, Predicate<Version> predicate) {
IndexTemplateMetaData templateMeta = state.metaData().templates().get(templateName);
if (templateMeta == null) {
return false;
}
ImmutableOpenMap<String, CompressedXContent> mappings = templateMeta.getMappings();
// check all mappings contain correct version in _meta
// we have to parse the source here which is annoying
for (Object typeMapping : mappings.values().toArray()) {
CompressedXContent typeMappingXContent = (CompressedXContent) typeMapping;
try {
Map<String, Object> typeMappingMap = convertToMap(
new BytesArray(typeMappingXContent.uncompressed()), false,
XContentType.JSON).v2();
// should always contain one entry with key = typename
assert (typeMappingMap.size() == 1);
String key = typeMappingMap.keySet().iterator().next();
// get the actual mapping entries
@SuppressWarnings("unchecked")
Map<String, Object> mappingMap = (Map<String, Object>) typeMappingMap.get(key);
if (containsCorrectVersion(versionKey, mappingMap, predicate) == false) {
return false;
}
} catch (ElasticsearchParseException e) {
logger.error(new ParameterizedMessage(
"Cannot parse the template [{}]", templateName), e);
throw new IllegalStateException("Cannot parse the template " + templateName, e);
}
}
return true;
}
private static boolean containsCorrectVersion(String versionKey, Map<String, Object> typeMappingMap,
Predicate<Version> predicate) {
@SuppressWarnings("unchecked")
Map<String, Object> meta = (Map<String, Object>) typeMappingMap.get("_meta");
if (meta == null) {
// pre 5.0, cannot be up to date
return false;
}
return predicate.test(Version.fromString((String) meta.get(versionKey)));
}
}

View File

@ -0,0 +1,50 @@
{
"index_patterns" : ".logstash",
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 1,
"codec": "best_compression",
"mapping.single_type": false
}
},
"mappings" : {
"doc" : {
"_meta": {
"logstash-version": "${logstash.template.version}"
},
"dynamic": "strict",
"properties":{
"description":{
"type":"text"
},
"last_modified":{
"type":"date"
},
"pipeline_metadata":{
"properties":{
"version":{
"type":"short"
},
"type":{
"type":"keyword"
}
}
},
"version":{
"type":"keyword"
},
"pipeline":{
"type":"text"
},
"username":{
"type":"keyword"
},
"metadata":{
"type":"object",
"dynamic":false
}
}
}
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.license;
import org.elasticsearch.license.License.OperationMode;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.logstash.Logstash;
import org.elasticsearch.xpack.monitoring.Monitoring;
import org.hamcrest.Matchers;
@ -290,4 +291,14 @@ public class XPackLicenseStateTests extends ESTestCase {
assertAllowed(TRIAL, false, XPackLicenseState::isMachineLearningAllowed, false);
assertAllowed(PLATINUM, false, XPackLicenseState::isMachineLearningAllowed, false);
}
public void testLogstashAllowed() {
assertAllowed(randomMode(), true, XPackLicenseState::isLogstashAllowed, true);
assertAllowed(randomMode(), false, XPackLicenseState::isLogstashAllowed, false);
}
public void testLogstashAckNotBasicToTrial() {
OperationMode from = randomFrom(STANDARD, BASIC, GOLD, PLATINUM);
assertAckMesssages(Logstash.NAME, from, TRIAL, 1);
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.XPackFeatureSet;
import static org.mockito.Mockito.mock;
import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.when;
public class LogstashFeatureSetTests extends ESTestCase {
public void testEnabledSetting() throws Exception {
boolean enabled = randomBoolean();
Settings settings = Settings.builder()
.put("path.home", createTempDir())
.put("xpack.logstash.enabled", enabled)
.build();
LogstashFeatureSet featureSet = new LogstashFeatureSet(settings, null);
assertThat(featureSet.enabled(), is(enabled));
PlainActionFuture<XPackFeatureSet.Usage> future = new PlainActionFuture<>();
featureSet.usage(future);
XPackFeatureSet.Usage usage = future.get();
BytesStreamOutput out = new BytesStreamOutput();
usage.writeTo(out);
XPackFeatureSet.Usage serializedUsage = new LogstashFeatureSet.Usage(out.bytes().streamInput());
assertThat(serializedUsage.enabled(), is(enabled));
}
public void testEnabledDefault() throws Exception {
Settings settings = Settings.builder().put("path.home", createTempDir()).build();
LogstashFeatureSet featureSet = new LogstashFeatureSet(settings, null);
assertThat(featureSet.enabled(), is(true));
}
public void testAvailable() throws Exception {
final XPackLicenseState licenseState = mock(XPackLicenseState.class);
LogstashFeatureSet featureSet = new LogstashFeatureSet(Settings.EMPTY, licenseState);
boolean available = randomBoolean();
when(licenseState.isLogstashAllowed()).thenReturn(available);
assertThat(featureSet.available(), is(available));
PlainActionFuture<XPackFeatureSet.Usage> future = new PlainActionFuture<>();
featureSet.usage(future);
XPackFeatureSet.Usage usage = future.get();
assertThat(usage.available(), is(available));
BytesStreamOutput out = new BytesStreamOutput();
usage.writeTo(out);
XPackFeatureSet.Usage serializedUsage = new LogstashFeatureSet.Usage(out.bytes().streamInput());
assertThat(serializedUsage.available(), is(available));
}
}

View File

@ -0,0 +1,245 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.template.TemplateUtils;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import static org.elasticsearch.mock.orig.Mockito.times;
import static org.elasticsearch.xpack.logstash.LogstashTemplateRegistry.LOGSTASH_INDEX_NAME;
import static org.elasticsearch.xpack.logstash.LogstashTemplateRegistry.LOGSTASH_TEMPLATE_NAME;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class LogstashTemplateRegistryTests extends ESTestCase {
private static final int NUM_LOGSTASH_INDEXES = 1; // .logstash
private InternalClient client;
private ExecutorService executorService;
private TransportClient transportClient;
private ThreadPool threadPool;
private ClusterService clusterService;
private LogstashTemplateRegistry logstashTemplateRegistry;
private static final ClusterState EMPTY_CLUSTER_STATE =
new ClusterState.Builder(new ClusterName("test-cluster")).build();
CopyOnWriteArrayList<ActionListener> listeners;
@Before
public void setup() {
executorService = mock(ExecutorService.class);
threadPool = mock(ThreadPool.class);
clusterService = mock(ClusterService.class);
final ExecutorService executorService = EsExecutors.newDirectExecutorService();
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
transportClient = new MockTransportClient(Settings.EMPTY);
class TestInternalClient extends InternalClient {
TestInternalClient(Client transportClient) {
super(Settings.EMPTY, null, transportClient);
}
@Override
protected <Request extends ActionRequest,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>>
void doExecute(Action<Request, Response, RequestBuilder> action, Request request,
ActionListener<Response> listener) {
listeners.add(listener);
}
}
client = new TestInternalClient(transportClient);
listeners = new CopyOnWriteArrayList<>();
logstashTemplateRegistry = new LogstashTemplateRegistry(Settings.EMPTY, clusterService, client);
}
@After
public void stop() throws InterruptedException {
if (transportClient != null) {
transportClient.close();
}
}
public void testAddsListener() throws Exception {
LogstashTemplateRegistry templateRegistry = new LogstashTemplateRegistry(Settings.EMPTY, clusterService, client);
verify(clusterService, times(1)).addListener(templateRegistry);
}
public void testAddTemplatesIfMissing() throws IOException {
ClusterState.Builder clusterStateBuilder = createClusterStateWithTemplate(
"/" + LOGSTASH_TEMPLATE_NAME + ".json"
);
logstashTemplateRegistry.clusterChanged(new ClusterChangedEvent("test-event",
clusterStateBuilder.build(), EMPTY_CLUSTER_STATE));
assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(true));
assertThat(listeners, hasSize(0));
}
public void testWrongVersionIndexTemplate_isIdentifiedAsNotUpToDate() throws IOException {
String templateString = "/wrong-version-" + LOGSTASH_TEMPLATE_NAME + ".json";
ClusterState.Builder clusterStateBuilder = createClusterStateWithTemplate(templateString);
logstashTemplateRegistry.clusterChanged(new ClusterChangedEvent("test-event",
clusterStateBuilder.build(), EMPTY_CLUSTER_STATE));
assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(false));
assertThat(listeners, hasSize(NUM_LOGSTASH_INDEXES));
}
public void testWrongVersionIndexTemplate_isUpdated() throws IOException {
String templateString = "/wrong-version-" + LOGSTASH_TEMPLATE_NAME + ".json";
ClusterState.Builder clusterStateBuilder = createClusterStateWithTemplate(templateString);
final ClusterState clusterState = clusterStateBuilder.build();
logstashTemplateRegistry.clusterChanged(new ClusterChangedEvent("test-event",
clusterState, EMPTY_CLUSTER_STATE));
assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(false));
assertThat(listeners, hasSize(NUM_LOGSTASH_INDEXES));
assertThat("Expected pending template creation", logstashTemplateRegistry.isTemplateCreationPending(), is(true));
// if we do it again this should not send an update
ActionListener listener = listeners.get(0);
listeners.clear();
logstashTemplateRegistry.clusterChanged(new ClusterChangedEvent("test-event",
clusterState, EMPTY_CLUSTER_STATE));
assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(false));
assertThat(listeners, hasSize(0));
assertThat("Expected pending template creation", logstashTemplateRegistry.isTemplateCreationPending(), is(true));
// if we now simulate an error...
listener.onFailure(new Exception());
assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(false));
assertFalse(logstashTemplateRegistry.isTemplateCreationPending());
// ... we should be able to send a new update
logstashTemplateRegistry.clusterChanged(new ClusterChangedEvent("test-event",
clusterState, EMPTY_CLUSTER_STATE));
assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(false));
assertThat(listeners, hasSize(1));
assertThat("Expected pending template creation", logstashTemplateRegistry.isTemplateCreationPending(), is(true));
// now check what happens if we get back an unacknowledged response
listeners.get(0).onResponse(new TestPutIndexTemplateResponse());
assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(false));
assertThat("Didn't expect pending template creation", logstashTemplateRegistry.isTemplateCreationPending(), is(false));
// and now let's see what happens if we get back a response
listeners.clear();
logstashTemplateRegistry.clusterChanged(new ClusterChangedEvent("test-event",
clusterState, EMPTY_CLUSTER_STATE));
assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(false));
assertThat("Expected pending template creation", logstashTemplateRegistry.isTemplateCreationPending(), is(true));
assertThat(listeners, hasSize(1));
listeners.get(0).onResponse(new TestPutIndexTemplateResponse(true));
assertThat(logstashTemplateRegistry.isTemplateUpToDate(), equalTo(true));
assertThat("Didn't expect pending template creation", logstashTemplateRegistry.isTemplateCreationPending(), is(false));
}
private static ClusterState.Builder createClusterStateWithTemplate(String logstashTemplateString) throws IOException {
MetaData.Builder metaDataBuilder = new MetaData.Builder();
IndexTemplateMetaData.Builder logstashTemplateBuilder =
getIndexTemplateMetaData(LOGSTASH_TEMPLATE_NAME, logstashTemplateString);
metaDataBuilder.put(logstashTemplateBuilder);
// add the correct mapping no matter what the template
String logstashMappingString = "/" + LOGSTASH_TEMPLATE_NAME + ".json";
IndexMetaData.Builder logstashIndexMeta =
createIndexMetadata(LOGSTASH_INDEX_NAME, logstashMappingString);
metaDataBuilder.put(logstashIndexMeta);
return ClusterState.builder(state()).metaData(metaDataBuilder.build());
}
private static IndexTemplateMetaData.Builder getIndexTemplateMetaData(
String templateName, String templateString) throws IOException {
String template = TemplateUtils.loadTemplate(templateString, Version.CURRENT.toString(),
LogstashTemplateRegistry.TEMPLATE_VERSION_PATTERN);
PutIndexTemplateRequest request = new PutIndexTemplateRequest();
request.source(template, XContentType.JSON);
IndexTemplateMetaData.Builder templateBuilder =
IndexTemplateMetaData.builder(templateName);
for (Map.Entry<String, String> entry : request.mappings().entrySet()) {
templateBuilder.putMapping(entry.getKey(), entry.getValue());
}
return templateBuilder;
}
private static IndexMetaData.Builder createIndexMetadata(
String indexName, String templateString) throws IOException {
String template = TemplateUtils.loadTemplate(templateString, Version.CURRENT.toString(),
LogstashTemplateRegistry.TEMPLATE_VERSION_PATTERN);
PutIndexTemplateRequest request = new PutIndexTemplateRequest();
request.source(template, XContentType.JSON);
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
indexMetaData.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build());
for (Map.Entry<String, String> entry : request.mappings().entrySet()) {
indexMetaData.putMapping(entry.getKey(), entry.getValue());
}
return indexMetaData;
}
// cluster state where local node is master
private static ClusterState state() {
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
discoBuilder.masterNodeId("1");
discoBuilder.localNodeId("1");
ClusterState.Builder state = ClusterState.builder(new ClusterName("test-cluster"));
state.nodes(discoBuilder);
state.metaData(MetaData.builder().generateClusterUuidIfNeeded());
return state.build();
}
private static class TestPutIndexTemplateResponse extends PutIndexTemplateResponse {
TestPutIndexTemplateResponse(boolean acknowledged) {
super(acknowledged);
}
TestPutIndexTemplateResponse() {
super();
}
}
}

View File

@ -0,0 +1,50 @@
{
"index_patterns" : ".logstash",
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 1,
"codec": "best_compression",
"mapping.single_type": false
}
},
"mappings" : {
"doc" : {
"_meta": {
"logstash-version": "4.0.0"
},
"dynamic": "strict",
"properties":{
"description":{
"type":"text"
},
"last_modified":{
"type":"date"
},
"pipeline_metadata":{
"properties":{
"version":{
"type":"short"
},
"type":{
"type":"keyword"
}
}
},
"version":{
"type":"keyword"
},
"pipeline":{
"type":"text"
},
"username":{
"type":"keyword"
},
"metadata":{
"type":"object",
"dynamic":false
}
}
}
}
}