Marvel: Use versioned index templates

This commit changes the templates so that they are now versionned using a number (starting from 1). This number is used in index templates names (ex: .marvel-es-data-1, .marvel-es-1) as well as in indices names (ex: .marvel-es-1-2015-12-30, .marvel-es-data-1).

If the template does not exist, it is created. Otherwise nothing (no update) is done.

Original commit: elastic/x-pack-elasticsearch@66c1a8bed0
This commit is contained in:
Tanguy Leroux 2015-12-17 23:01:32 +01:00
parent a931e4e0b8
commit eeb5842730
24 changed files with 567 additions and 946 deletions

View File

@ -70,6 +70,9 @@ compileTestJava.options.compilerArgs << "-Xlint:-deprecation,-rawtypes,-serial,-
ext.expansions = [ ext.expansions = [
'project.version': version, 'project.version': version,
// Used in marvel index templates
'marvel.plugin.version': version,
'marvel.template.version': '1',
] ]
processResources { processResources {
@ -82,8 +85,8 @@ processResources {
String licenseKeyPath = "license-plugin/keys/${licenseKeyName}/public.key" String licenseKeyPath = "license-plugin/keys/${licenseKeyName}/public.key"
if (file(licenseKeyPath).exists() == false) { if (file(licenseKeyPath).exists() == false) {
throw new GradleException("no public key found for '${licenseKeyName}'") throw new GradleException("no public key found for '${licenseKeyName}'")
} }
from licenseKeyPath from licenseKeyPath
} }
processTestResources { processTestResources {

View File

@ -11,11 +11,14 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.exporter.IndexNameResolver;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc; import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.license.MarvelLicensee; import org.elasticsearch.marvel.license.MarvelLicensee;
import java.util.Collection; import java.util.Collection;
import java.util.Objects;
public abstract class AbstractCollector<T> extends AbstractLifecycleComponent<T> implements Collector<T> { public abstract class AbstractCollector<T> extends AbstractLifecycleComponent<T> implements Collector<T> {
@ -24,6 +27,7 @@ public abstract class AbstractCollector<T> extends AbstractLifecycleComponent<T>
protected final ClusterService clusterService; protected final ClusterService clusterService;
protected final MarvelSettings marvelSettings; protected final MarvelSettings marvelSettings;
protected final MarvelLicensee licensee; protected final MarvelLicensee licensee;
protected final IndexNameResolver dataIndexNameResolver;
@Inject @Inject
public AbstractCollector(Settings settings, String name, ClusterService clusterService, MarvelSettings marvelSettings, MarvelLicensee licensee) { public AbstractCollector(Settings settings, String name, ClusterService clusterService, MarvelSettings marvelSettings, MarvelLicensee licensee) {
@ -32,6 +36,7 @@ public abstract class AbstractCollector<T> extends AbstractLifecycleComponent<T>
this.clusterService = clusterService; this.clusterService = clusterService;
this.marvelSettings = marvelSettings; this.marvelSettings = marvelSettings;
this.licensee = licensee; this.licensee = licensee;
this.dataIndexNameResolver = new DataIndexNameResolver(MarvelTemplateUtils.TEMPLATE_VERSION);
} }
@Override @Override
@ -109,4 +114,27 @@ public abstract class AbstractCollector<T> extends AbstractLifecycleComponent<T>
protected String clusterUUID() { protected String clusterUUID() {
return clusterService.state().metaData().clusterUUID(); return clusterService.state().metaData().clusterUUID();
} }
/**
* Resolves marvel's data index name
*/
public class DataIndexNameResolver implements IndexNameResolver {
private final String index;
DataIndexNameResolver(Integer version) {
Objects.requireNonNull(version, "index version cannot be null");
this.index = MarvelSettings.MARVEL_DATA_INDEX_PREFIX + String.valueOf(version);
}
@Override
public String resolve(MarvelDoc doc) {
return index;
}
@Override
public String resolve(long timestamp) {
return index;
}
}
} }

View File

@ -77,7 +77,8 @@ public class ClusterInfoCollector extends AbstractCollector<ClusterInfoMarvelDoc
} }
String clusterUUID = clusterUUID(); String clusterUUID = clusterUUID();
results.add(new ClusterInfoMarvelDoc(MarvelSettings.MARVEL_DATA_INDEX_NAME, TYPE, clusterUUID, clusterUUID, System.currentTimeMillis(), long timestamp = System.currentTimeMillis();
results.add(new ClusterInfoMarvelDoc(dataIndexNameResolver.resolve(timestamp), TYPE, clusterUUID, clusterUUID, timestamp,
clusterName.value(), Version.CURRENT.toString(), license, clusterStats)); clusterName.value(), Version.CURRENT.toString(), license, clusterStats));
return Collections.unmodifiableCollection(results); return Collections.unmodifiableCollection(results);
} }

View File

@ -71,7 +71,7 @@ public class ClusterStateCollector extends AbstractCollector<ClusterStateCollect
results.add(new ClusterStateNodeMarvelDoc(clusterUUID, NODES_TYPE, timestamp, stateUUID, node.getId())); results.add(new ClusterStateNodeMarvelDoc(clusterUUID, NODES_TYPE, timestamp, stateUUID, node.getId()));
// Adds a document for every node in the marvel data index (type "node") // Adds a document for every node in the marvel data index (type "node")
results.add(new DiscoveryNodeMarvelDoc(MarvelSettings.MARVEL_DATA_INDEX_NAME, NODE_TYPE, node.getId(), clusterUUID, timestamp, node)); results.add(new DiscoveryNodeMarvelDoc(dataIndexNameResolver.resolve(timestamp), NODE_TYPE, node.getId(), clusterUUID, timestamp, node));
} }
} }

View File

@ -155,7 +155,7 @@ public abstract class Exporter {
@Override @Override
public String resolve(long timestamp) { public String resolve(long timestamp) {
return MarvelSettings.MARVEL_INDICES_PREFIX + indexTimeFormatter.print(timestamp); return MarvelSettings.MARVEL_INDICES_PREFIX + String.valueOf(MarvelTemplateUtils.TEMPLATE_VERSION) + "-" + indexTimeFormatter.print(timestamp);
} }
@Override @Override

View File

@ -5,113 +5,103 @@
*/ */
package org.elasticsearch.marvel.agent.exporter; package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.marvel.support.VersionUtils;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Properties;
/**
*
*/
public final class MarvelTemplateUtils { public final class MarvelTemplateUtils {
public static final String MARVEL_TEMPLATE_FILE = "/marvel_index_template.json"; static final String INDEX_TEMPLATE_FILE = "/marvel-es.json";
public static final String INDEX_TEMPLATE_NAME = ".marvel-es"; static final String INDEX_TEMPLATE_NAME_PREFIX = ".marvel-es-";
public static final String MARVEL_VERSION_FIELD = "marvel_version";
public static final Version MIN_SUPPORTED_TEMPLATE_VERSION = Version.V_2_0_0_beta2; static final String DATA_TEMPLATE_FILE = "/marvel-es-data.json";
static final String DATA_TEMPLATE_NAME_PREFIX = ".marvel-es-data-";
static final String PROPERTIES_FILE = "/marvel.properties";
static final String VERSION_FIELD = "marvel.template.version";
public static final Integer TEMPLATE_VERSION = loadTemplateVersion();
private MarvelTemplateUtils() { private MarvelTemplateUtils() {
} }
/** /**
* Loads the default Marvel template * Loads the default template for the timestamped indices
*/ */
public static byte[] loadDefaultTemplate() { public static byte[] loadTimestampedIndexTemplate() {
try (InputStream is = MarvelTemplateUtils.class.getResourceAsStream(MARVEL_TEMPLATE_FILE)) { try {
ByteArrayOutputStream out = new ByteArrayOutputStream(); return load(INDEX_TEMPLATE_FILE);
Streams.copy(is, out);
return out.toByteArray();
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException("unable to load marvel template", e); throw new IllegalStateException("unable to load marvel template", e);
} }
} }
public static Version loadDefaultTemplateVersion() { /**
return parseTemplateVersion(loadDefaultTemplate()); * Loads the default template for the data index
} */
public static byte[] loadDataIndexTemplate() {
public static Version templateVersion(IndexTemplateMetaData templateMetaData) {
String version = templateMetaData.settings().get("index." + MARVEL_VERSION_FIELD);
if (Strings.hasLength(version)) {
try {
return Version.fromString(version);
} catch (IllegalArgumentException e) {
return null;
}
}
return null;
}
public static IndexTemplateMetaData findMarvelTemplate(ClusterState state) {
MetaData metaData = state.getMetaData();
return metaData != null ? metaData.getTemplates().get(INDEX_TEMPLATE_NAME) : null;
}
public static Version parseTemplateVersion(byte[] template) {
try { try {
return VersionUtils.parseVersion(MARVEL_VERSION_FIELD, template); return load(DATA_TEMPLATE_FILE);
} catch (IllegalArgumentException e) { } catch (IOException e) {
throw new IllegalStateException("unable to load marvel data template", e);
}
}
/**
* Loads a resource with a given name and returns it as a byte array.
*/
static byte[] load(String name) throws IOException {
try (InputStream is = MarvelTemplateUtils.class.getResourceAsStream(name)) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);
return out.toByteArray();
}
}
/**
* Loads the current version of templates
*
* When executing tests in Intellij, the properties file might not be
* resolved: try running 'gradle processResources' first.
*/
static Integer loadTemplateVersion() {
try (InputStream is = MarvelTemplateUtils.class.getResourceAsStream(PROPERTIES_FILE)) {
Properties properties = new Properties();
properties.load(is);
String version = properties.getProperty(VERSION_FIELD);
if (Strings.hasLength(version)) {
return Integer.parseInt(version);
}
return null; return null;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("failed to parse marvel template version");
} catch (IOException e) {
throw new IllegalArgumentException("failed to load marvel template version");
} }
} }
public static boolean installedTemplateVersionIsSufficient(Version installed) { public static String indexTemplateName() {
// null indicates couldn't parse the version from the installed template, this means it is probably too old or invalid... return indexTemplateName(TEMPLATE_VERSION);
if (installed == null) {
return false;
}
// ensure the template is not too old
if (installed.before(MIN_SUPPORTED_TEMPLATE_VERSION)) {
return false;
}
// We do not enforce that versions are equivalent to the current version as we may be in a rolling upgrade scenario
// and until a master is elected with the new version, data nodes that have been upgraded will not be able to ship
// data. This means that there is an implication that the new shippers will ship data correctly even with an old template.
// There is also no upper bound and we rely on elasticsearch nodes not being able to connect to each other across major
// versions
return true;
} }
public static boolean installedTemplateVersionMandatesAnUpdate(Version current, Version installed, ESLogger logger, String exporterName) { public static String indexTemplateName(Integer version) {
if (installed == null) { return templateName(INDEX_TEMPLATE_NAME_PREFIX, version);
logger.debug("exporter [{}] - currently installed marvel template is missing a version - installing a new one [{}]", exporterName, current); }
return true;
} public static String dataTemplateName() {
// Never update a very old template return dataTemplateName(TEMPLATE_VERSION);
if (installed.before(MIN_SUPPORTED_TEMPLATE_VERSION)) { }
return false;
} public static String dataTemplateName(Integer version) {
// Always update a template to the last up-to-date version return templateName(DATA_TEMPLATE_NAME_PREFIX, version);
if (current.after(installed)) { }
logger.debug("exporter [{}] - currently installed marvel template version [{}] will be updated to a newer version [{}]", exporterName, installed, current);
return true; static String templateName(String prefix, Integer version) {
// When the template is up-to-date, do not update assert version != null && version >= 0 : "version must be not null and greater or equal to zero";
} else if (current.equals(installed)) { return prefix + String.valueOf(version);
logger.debug("exporter [{}] - currently installed marvel template version [{}] is up-to-date", exporterName, installed);
return false;
// Never update a template that is newer than the expected one
} else {
logger.debug("exporter [{}] - currently installed marvel template version [{}] is newer than the one required [{}]... keeping it.", exporterName, installed, current);
return false;
}
} }
} }

View File

@ -8,11 +8,9 @@ package org.elasticsearch.marvel.agent.exporter.http;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.common.Base64; import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -20,7 +18,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
@ -55,12 +52,7 @@ import java.nio.file.Path;
import java.security.KeyStore; import java.security.KeyStore;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.installedTemplateVersionIsSufficient;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.installedTemplateVersionMandatesAnUpdate;
/** /**
* *
@ -106,8 +98,8 @@ public class HttpExporter extends Exporter {
volatile boolean checkedAndUploadedIndexTemplate = false; volatile boolean checkedAndUploadedIndexTemplate = false;
volatile boolean supportedClusterVersion = false; volatile boolean supportedClusterVersion = false;
/** Version of the built-in template **/ /** Version number of built-in templates **/
final Version templateVersion; private final Integer templateVersion;
boolean keepAlive; boolean keepAlive;
final ConnectionKeepAliveWorker keepAliveWorker; final ConnectionKeepAliveWorker keepAliveWorker;
@ -142,8 +134,8 @@ public class HttpExporter extends Exporter {
sslSocketFactory = createSSLSocketFactory(config.settings().getAsSettings(SSL_SETTING)); sslSocketFactory = createSSLSocketFactory(config.settings().getAsSettings(SSL_SETTING));
hostnameVerification = config.settings().getAsBoolean(SSL_HOSTNAME_VERIFICATION_SETTING, true); hostnameVerification = config.settings().getAsBoolean(SSL_HOSTNAME_VERIFICATION_SETTING, true);
// Checks that the built-in template is versioned // Loads the current version number of built-in templates
templateVersion = MarvelTemplateUtils.loadDefaultTemplateVersion(); templateVersion = MarvelTemplateUtils.TEMPLATE_VERSION;
if (templateVersion == null) { if (templateVersion == null) {
throw new IllegalStateException("unable to find built-in template version"); throw new IllegalStateException("unable to find built-in template version");
} }
@ -202,7 +194,7 @@ public class HttpExporter extends Exporter {
builder.startObject(); builder.startObject();
builder.startObject("index"); builder.startObject("index");
// we need the index to be based on the document timestamp // we need the index to be based on the document timestamp and/or template version
builder.field("_index", indexNameResolver.resolve(marvelDoc)); builder.field("_index", indexNameResolver.resolve(marvelDoc));
if (marvelDoc.type() != null) { if (marvelDoc.type() != null) {
@ -395,101 +387,51 @@ public class HttpExporter extends Exporter {
* @return true if template exists or was uploaded successfully. * @return true if template exists or was uploaded successfully.
*/ */
private boolean checkAndUploadIndexTemplate(final String host) { private boolean checkAndUploadIndexTemplate(final String host) {
byte[] installedTemplate; String templateName = MarvelTemplateUtils.indexTemplateName(templateVersion);
try { boolean templateInstalled = hasTemplate(templateName, host);
installedTemplate = findMarvelTemplate(host);
} catch (Exception e) { // Works like LocalExporter on master:
logger.debug("http exporter [{}] - exception when loading the existing marvel template on host[{}]", e, name(), host); // Install the index template for timestamped indices first, so that other nodes can ship data
return false; if (!templateInstalled) {
logger.debug("http exporter [{}] - could not find existing marvel template, installing a new one", name());
if (!putTemplate(host, templateName, MarvelTemplateUtils.loadTimestampedIndexTemplate())) {
return false;
}
} }
// if we cannot find a template or a compatible template, we'll install one in / update it. // Install the index template for data index
if (installedTemplate == null) { templateName = MarvelTemplateUtils.dataTemplateName(templateVersion);
logger.debug("http exporter [{}] - could not find existing marvel template, installing a new one", name()); if (!hasTemplate(templateName, host)) {
return putTemplate(host); logger.debug("http exporter [{}] - could not find existing marvel template for data index, installing a new one", name());
} if (!putTemplate(host, templateName, MarvelTemplateUtils.loadDataIndexTemplate())) {
Version installedTemplateVersion = MarvelTemplateUtils.parseTemplateVersion(installedTemplate); return false;
if (installedTemplateVersionMandatesAnUpdate(templateVersion, installedTemplateVersion, logger, name())) { }
logger.debug("http exporter [{}] - installing new marvel template [{}], replacing [{}]", name(), templateVersion, installedTemplateVersion);
return putTemplate(host);
} else if (!installedTemplateVersionIsSufficient(installedTemplateVersion)) {
logger.error("http exporter [{}] - marvel template version [{}] is below the minimum compatible version [{}]. "
+ "please manually update the marvel template to a more recent version"
+ "and delete the current active marvel index (don't forget to back up it first if needed)",
name(), installedTemplateVersion, MarvelTemplateUtils.MIN_SUPPORTED_TEMPLATE_VERSION);
// we're not going to do anything with the template.. it's too old, and the schema might
// be too different than what this version of marvel/es can work with. For this reason we're
// not going to export any data, to avoid mapping conflicts.
return false;
} }
return true; return true;
} }
private byte[] findMarvelTemplate(String host) throws IOException { private boolean hasTemplate(String templateName, String host) {
String url = "_template/" + MarvelTemplateUtils.INDEX_TEMPLATE_NAME; String url = "_template/" + templateName;
if (templateCheckTimeout != null) { if (templateCheckTimeout != null) {
url += "?timeout=" + templateCheckTimeout; url += "?timeout=" + templateCheckTimeout;
} }
HttpURLConnection connection = null; HttpURLConnection connection = null;
try { try {
logger.debug("http exporter [{}] - checking if marvel template exists on the marvel cluster", name()); logger.debug("http exporter [{}] - checking if marvel template [{}] exists on the marvel cluster", name(), templateName);
connection = openConnection(host, "GET", url, null); connection = openConnection(host, "GET", url, null);
if (connection == null) { if (connection == null) {
throw new IOException("no available connection to check marvel template existence"); throw new IOException("no available connection to check for marvel template [" + templateName + "] existence");
} }
byte[] remoteTemplate = null;
// 200 means that the template has been found, 404 otherwise // 200 means that the template has been found, 404 otherwise
if (connection.getResponseCode() == 200) { if (connection.getResponseCode() == 200) {
logger.debug("marvel template found"); logger.debug("marvel template [{}] found",templateName);
return true;
try (InputStream is = connection.getInputStream()) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);
remoteTemplate = out.toByteArray();
}
} }
return remoteTemplate;
} catch (Exception e) { } catch (Exception e) {
logger.error("http exporter [{}] - failed to verify the marvel template to [{}]:\n{}", name(), host, e.getMessage()); logger.error("http exporter [{}] - failed to verify the marvel template [{}] on [{}]:\n{}", name(), templateName, host, e.getMessage());
throw e;
} finally {
if (connection != null) {
try {
connection.getInputStream().close();
} catch (IOException e) {
// Ignore
}
}
}
}
boolean putTemplate(String host) {
HttpURLConnection connection = null;
try {
connection = openConnection(host, "PUT", "_template/" + MarvelTemplateUtils.INDEX_TEMPLATE_NAME, XContentType.JSON.restContentType());
if (connection == null) {
logger.debug("http exporter [{}] - no available connection to update marvel template", name());
return false;
}
logger.debug("http exporter [{}] - loading marvel pre-configured template", name());
byte[] template = MarvelTemplateUtils.loadDefaultTemplate();
// Uploads the template and closes the outputstream
Streams.copy(template, connection.getOutputStream());
if (connection.getResponseCode() != 200 && connection.getResponseCode() != 201) {
logConnectionError("error adding the marvel template to [" + host + "]", connection);
return false;
}
logger.info("http exporter [{}] - marvel template updated to version [{}]", name(), templateVersion);
} catch (IOException e) {
logger.error("http exporter [{}] - failed to update the marvel template to [{}]:\n{}", name(), host, e.getMessage());
return false; return false;
} finally { } finally {
if (connection != null) { if (connection != null) {
try { try {
@ -499,101 +441,31 @@ public class HttpExporter extends Exporter {
} }
} }
} }
return false;
if (config.settings().getAsBoolean("update_mappings", true)) {
updateMappings(host, MarvelSettings.MARVEL_DATA_INDEX_NAME);
updateMappings(host, indexNameResolver().resolve(System.currentTimeMillis()));
}
return true;
} }
// TODO: Remove this method once marvel indices are versioned (v 2.2.0) boolean putTemplate(String host, String template, byte[] source) {
void updateMappings(String host, String index) { logger.debug("http exporter [{}] - installing template [{}]", name(), template);
logger.trace("http exporter [{}] - updating mappings for index [{}]", name(), index);
// Parse the default template to get its mappings
PutIndexTemplateRequest template = new PutIndexTemplateRequest().source(MarvelTemplateUtils.loadDefaultTemplate());
if ((template == null) || (template.mappings() == null) || (template.mappings().isEmpty())) {
return;
}
Set<String> indexMappings = new HashSet<>();
HttpURLConnection connection = null; HttpURLConnection connection = null;
try { try {
connection = openConnection(host, "GET", "/" + index + "/_mapping", XContentType.JSON.restContentType()); connection = openConnection(host, "PUT", "_template/" + template, XContentType.JSON.restContentType());
if (connection == null) { if (connection == null) {
logger.debug("http exporter [{}] - no available connection to get index mappings", name()); logger.debug("http exporter [{}] - no available connection to update marvel template [{}]", name(), template);
return; return false;
}
if (connection.getResponseCode() == 404) {
logger.trace("http exporter [{}] - index [{}] does not exist", name(), index);
return;
} else if (connection.getResponseCode() == 200) {
try (InputStream is = connection.getInputStream()) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);
Map<String, Object> mappings = XContentHelper.convertToMap(new BytesArray(out.toByteArray()), false).v2();
if ((mappings.get(index) != null) && (mappings.get(index) instanceof Map)) {
Map m = (Map) ((Map) mappings.get(index)).get("mappings");
if (m != null) {
indexMappings = m.keySet();
}
}
}
} else {
logConnectionError("http exporter [" + name() +"] - failed to get mappings for index [" + index + "] on host [" + host + "]", connection);
return;
}
} catch (Exception e) {
logger.error("http exporter [{}] - failed to update the marvel template to [{}]:\n{}", name(), host, e.getMessage());
return;
} finally {
if (connection != null) {
try {
connection.getInputStream().close();
} catch (IOException e) {
// Ignore
}
}
}
// Iterates over document types defined in the default template
for (String type : template.mappings().keySet()) {
if (indexMappings.contains(type)) {
logger.trace("http exporter [{}] - type [{} already exists in mapping of index [{}]", name(), type, index);
continue;
}
logger.trace("http exporter [{}] - adding type [{}] to index [{}] mappings", name(), type, index);
updateMappingForType(host, index, type, template.mappings().get(type));
}
}
void updateMappingForType(String host, String index, String type, String mappingSource) {
logger.trace("http exporter [{}] - updating index [{}] mappings for type [{}] on host [{}]", name(), index, type, host);
HttpURLConnection connection = null;
try {
connection = openConnection(host, "PUT", "/" + index + "/_mapping/" + type, XContentType.JSON.restContentType());
if (connection == null) {
logger.debug("http exporter [{}] - no available connection to update index mapping", name());
return;
} }
// Uploads the template and closes the outputstream // Uploads the template and closes the outputstream
Streams.copy(Strings.toUTF8Bytes(mappingSource), connection.getOutputStream()); Streams.copy(source, connection.getOutputStream());
if (connection.getResponseCode() != 200 && connection.getResponseCode() != 201) { if (connection.getResponseCode() != 200 && connection.getResponseCode() != 201) {
logConnectionError("http exporter [" + name() +"] - mapping of index [" + index + "] failed to be updated for type [" + type + "] on host [" + host + "]", connection); logConnectionError("error adding the marvel template [" + template + "] to [" + host + "]", connection);
return; return false;
} }
logger.trace("http exporter [{}] - mapping of index [{}] updated for type [{}]", name(), index, type); logger.info("http exporter [{}] - marvel template [{}] updated to version [{}]", name(), template, templateVersion);
} catch (Exception e) { return true;
logger.error("http exporter [{}] - failed to update mapping of index [{}] for type [{}]", name(), index, type); } catch (IOException e) {
logger.error("http exporter [{}] - failed to update marvel template [{}] on host [{}]:\n{}", name(), template, host, e.getMessage());
return false;
} finally { } finally {
if (connection != null) { if (connection != null) {
try { try {

View File

@ -60,11 +60,10 @@ public class LocalBulk extends ExportBulk {
} }
IndexRequestBuilder request = client.prepareIndex(); IndexRequestBuilder request = client.prepareIndex();
if (marvelDoc.index() != null) {
request.setIndex(marvelDoc.index()); // we need the index to be based on the document timestamp and/or template version
} else { request.setIndex(indexNameResolver.resolve(marvelDoc));
request.setIndex(indexNameResolver.resolve(marvelDoc));
}
if (marvelDoc.type() != null) { if (marvelDoc.type() != null) {
request.setType(marvelDoc.type()); request.setType(marvelDoc.type());
} }

View File

@ -5,12 +5,8 @@
*/ */
package org.elasticsearch.marvel.agent.exporter.local; package org.elasticsearch.marvel.agent.exporter.local;
import org.elasticsearch.Version; import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -18,22 +14,17 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.marvel.agent.exporter.ExportBulk; import org.elasticsearch.marvel.agent.exporter.ExportBulk;
import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils; import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry; import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.shield.SecuredClient; import org.elasticsearch.marvel.shield.SecuredClient;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.installedTemplateVersionIsSufficient;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.installedTemplateVersionMandatesAnUpdate;
/** /**
* *
*/ */
@ -48,8 +39,8 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
private volatile LocalBulk bulk; private volatile LocalBulk bulk;
private volatile boolean active = true; private volatile boolean active = true;
/** Version of the built-in template **/ /** Version number of built-in templates **/
private final Version templateVersion; private final Integer templateVersion;
public LocalExporter(Exporter.Config config, Client client, ClusterService clusterService, RendererRegistry renderers) { public LocalExporter(Exporter.Config config, Client client, ClusterService clusterService, RendererRegistry renderers) {
super(TYPE, config); super(TYPE, config);
@ -57,8 +48,8 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
this.clusterService = clusterService; this.clusterService = clusterService;
this.renderers = renderers; this.renderers = renderers;
// Checks that the built-in template is versioned // Loads the current version number of built-in templates
templateVersion = MarvelTemplateUtils.loadDefaultTemplateVersion(); templateVersion = MarvelTemplateUtils.TEMPLATE_VERSION;
if (templateVersion == null) { if (templateVersion == null) {
throw new IllegalStateException("unable to find built-in template version"); throw new IllegalStateException("unable to find built-in template version");
} }
@ -126,21 +117,17 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
return null; return null;
} }
IndexTemplateMetaData installedTemplate = MarvelTemplateUtils.findMarvelTemplate(clusterState); String templateName = MarvelTemplateUtils.indexTemplateName(templateVersion);
boolean templateInstalled = hasTemplate(templateName, clusterState);
// if this is not the master, we'll just look to see if the marvel template is already // if this is not the master, we'll just look to see if the marvel timestamped template is already
// installed and if so, if it has a compatible version. If it is (installed and compatible) // installed and if so, if it has a compatible version. If it is (installed and compatible)
// we'll be able to start this exporter. Otherwise, we'll just wait for a new cluster state. // we'll be able to start this exporter. Otherwise, we'll just wait for a new cluster state.
if (!clusterService.localNode().masterNode()) { if (!clusterService.localNode().masterNode()) {
if (installedTemplate == null) { // We only need to check the index template for timestamped indices
// the marvel template is not yet installed in the given cluster state, we'll wait. if (!templateInstalled) {
logger.debug("local exporter [{}] - marvel index template [{}] does not exist, so service cannot start", name(), MarvelTemplateUtils.INDEX_TEMPLATE_NAME); // the template for timestamped indices is not yet installed in the given cluster state, we'll wait.
return null; logger.debug("local exporter [{}] - marvel index template does not exist, so service cannot start", name());
}
Version installedTemplateVersion = MarvelTemplateUtils.templateVersion(installedTemplate);
if (!installedTemplateVersionIsSufficient(installedTemplateVersion)) {
logger.debug("local exporter [{}] - cannot start. the currently installed marvel template (version [{}]) is incompatible with the " +
"current elasticsearch version [{}]. waiting until the template is updated", name(), installedTemplateVersion, Version.CURRENT);
return null; return null;
} }
@ -151,36 +138,65 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
// we are on master // we are on master
// //
// if we cannot find a template or a compatible template, we'll install one in / update it. // Check that there is nothing that could block metadata updates
if (installedTemplate == null) { if (clusterState.blocks().hasGlobalBlock(ClusterBlockLevel.METADATA_WRITE)) {
logger.debug("local exporter [{}] - could not find existing marvel template, installing a new one", name()); logger.debug("local exporter [{}] - waiting until metadata writes are unblocked", name());
putTemplate();
// we'll get that template on the next cluster state update
return null;
}
Version installedTemplateVersion = MarvelTemplateUtils.templateVersion(installedTemplate);
if (installedTemplateVersionMandatesAnUpdate(templateVersion, installedTemplateVersion, logger, name())) {
logger.debug("local exporter [{}] - installing new marvel template [{}], replacing [{}]", name(), templateVersion, installedTemplateVersion);
putTemplate();
// we'll get that template on the next cluster state update
return null;
} else if (!installedTemplateVersionIsSufficient(installedTemplateVersion)) {
logger.error("local exporter [{}] - marvel template version [{}] is below the minimum compatible version [{}]. "
+ "please manually update the marvel template to a more recent version"
+ "and delete the current active marvel index (don't forget to back up it first if needed)",
name(), installedTemplateVersion, MarvelTemplateUtils.MIN_SUPPORTED_TEMPLATE_VERSION);
// we're not going to do anything with the template.. it's too old, and the schema might
// be too different than what this version of marvel/es can work with. For this reason we're
// not going to export any data, to avoid mapping conflicts.
return null; return null;
} }
// ok.. we have a compatible template... we can start // Install the index template for timestamped indices first, so that other nodes can ship data
if (!templateInstalled) {
logger.debug("local exporter [{}] - could not find existing marvel template for timestamped indices, installing a new one", name());
putTemplate(templateName, MarvelTemplateUtils.loadTimestampedIndexTemplate());
// we'll get that template on the next cluster state update
return null;
}
// Install the index template for data index
templateName = MarvelTemplateUtils.dataTemplateName(templateVersion);
if (!hasTemplate(templateName, clusterState)) {
logger.debug("local exporter [{}] - could not find existing marvel template for data index, installing a new one", name());
putTemplate(templateName, MarvelTemplateUtils.loadDataIndexTemplate());
// we'll get that template on the next cluster state update
return null;
}
// ok.. we have a compatible templates... we can start
return currentBulk != null ? currentBulk : new LocalBulk(name(), logger, client, indexNameResolver, renderers); return currentBulk != null ? currentBulk : new LocalBulk(name(), logger, client, indexNameResolver, renderers);
} }
void putTemplate() { /**
PutIndexTemplateRequest request = new PutIndexTemplateRequest(MarvelTemplateUtils.INDEX_TEMPLATE_NAME).source(MarvelTemplateUtils.loadDefaultTemplate()); * List templates that exists in cluster state metadata and that match a given template name pattern.
*/
private ImmutableOpenMap<String, Integer> findTemplates(String templatePattern, ClusterState state) {
if (state == null || state.getMetaData() == null || state.getMetaData().getTemplates().isEmpty()) {
return ImmutableOpenMap.of();
}
ImmutableOpenMap.Builder<String, Integer> templates = ImmutableOpenMap.builder();
for (ObjectCursor<String> template : state.metaData().templates().keys()) {
if (Regex.simpleMatch(templatePattern, template.value)) {
try {
Integer version = Integer.parseInt(template.value.substring(templatePattern.length() - 1));
templates.put(template.value, version);
logger.debug("found index template [{}] in version [{}]", template, version);
} catch (NumberFormatException e) {
logger.warn("cannot extract version number for template [{}]", template.value);
}
}
}
return templates.build();
}
private boolean hasTemplate(String templateName, ClusterState state) {
ImmutableOpenMap<String, Integer> templates = findTemplates(templateName, state);
return templates.size() > 0;
}
void putTemplate(String template, byte[] source) {
logger.debug("local exporter [{}] - installing template [{}]", name(), template);
PutIndexTemplateRequest request = new PutIndexTemplateRequest(template).source(source);
assert !Thread.currentThread().isInterrupted() : "current thread has been interrupted before putting index template!!!"; assert !Thread.currentThread().isInterrupted() : "current thread has been interrupted before putting index template!!!";
// async call, so we won't block cluster event thread // async call, so we won't block cluster event thread
@ -188,80 +204,15 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
@Override @Override
public void onResponse(PutIndexTemplateResponse response) { public void onResponse(PutIndexTemplateResponse response) {
if (response.isAcknowledged()) { if (response.isAcknowledged()) {
logger.trace("local exporter [{}] - successfully installed marvel template", name()); logger.trace("local exporter [{}] - successfully installed marvel template [{}]", name(), template);
if (config.settings().getAsBoolean("update_mappings", true)) {
updateMappings(MarvelSettings.MARVEL_DATA_INDEX_NAME);
updateMappings(indexNameResolver().resolve(System.currentTimeMillis()));
}
} else { } else {
logger.error("local exporter [{}] - failed to update marvel index template", name()); logger.error("local exporter [{}] - failed to update marvel index template [{}]", name(), template);
} }
} }
@Override @Override
public void onFailure(Throwable throwable) { public void onFailure(Throwable throwable) {
logger.error("local exporter [{}] - failed to update marvel index template", throwable, name()); logger.error("local exporter [{}] - failed to update marvel index template [{}]", throwable, name(), template);
}
});
}
// TODO: Remove this method once marvel indices are versioned (v 2.2.0)
void updateMappings(String index) {
logger.trace("local exporter [{}] - updating mappings for index [{}]", name(), index);
// Parse the default template to get its mappings
PutIndexTemplateRequest template = new PutIndexTemplateRequest().source(MarvelTemplateUtils.loadDefaultTemplate());
if ((template == null) || (template.mappings() == null) || (template.mappings().isEmpty())) {
return;
}
// async call, so we won't block cluster event thread
client.admin().indices().getMappings(new GetMappingsRequest().indices(index), new ActionListener<GetMappingsResponse>() {
@Override
public void onResponse(GetMappingsResponse response) {
ImmutableOpenMap<String, MappingMetaData> indexMappings = response.getMappings().get(index);
if (indexMappings != null) {
// Iterates over document types defined in the default template
for (String type : template.mappings().keySet()) {
if (indexMappings.get(type) != null) {
logger.trace("local exporter [{}] - type [{} already exists in mapping of index [{}]", name(), type, index);
continue;
}
logger.trace("local exporter [{}] - adding type [{}] to index [{}] mappings", name(), type, index);
updateMappingForType(index, type, template.mappings().get(type));
}
}
}
@Override
public void onFailure(Throwable e) {
if (e instanceof IndexNotFoundException) {
logger.trace("local exporter [{}] - index [{}] not found, unable to update mappings", name(), index);
} else {
logger.error("local exporter [{}] - failed to get mappings for index [{}]", name(), index);
}
}
});
}
void updateMappingForType(String index, String type, String mappingSource) {
logger.trace("local exporter [{}] - updating index [{}] mappings for type [{}]", name(), index, type);
client.admin().indices().putMapping(new PutMappingRequest(index).type(type).source(mappingSource), new ActionListener<PutMappingResponse>() {
@Override
public void onResponse(PutMappingResponse response) {
if (response.isAcknowledged()) {
logger.trace("local exporter [{}] - mapping of index [{}] updated for type [{}]", name(), index, type);
} else {
logger.trace("local exporter [{}] - mapping of index [{}] failed to be updated for type [{}]", name(), index, type);
}
}
@Override
public void onFailure(Throwable e) {
logger.error("local exporter [{}] - failed to update mapping of index [{}] for type [{}]", name(), index, type);
} }
}); });
} }

View File

@ -22,7 +22,7 @@ public class MarvelSettings extends AbstractComponent {
private static final String PREFIX = MarvelPlugin.NAME + ".agent."; private static final String PREFIX = MarvelPlugin.NAME + ".agent.";
public static final String MARVEL_INDICES_PREFIX = ".marvel-es-"; public static final String MARVEL_INDICES_PREFIX = ".marvel-es-";
public static final String MARVEL_DATA_INDEX_NAME = MARVEL_INDICES_PREFIX + "data"; public static final String MARVEL_DATA_INDEX_PREFIX = MARVEL_INDICES_PREFIX + "data-";
public static final TimeValue MAX_LICENSE_GRACE_PERIOD = TimeValue.timeValueHours(7 * 24); public static final TimeValue MAX_LICENSE_GRACE_PERIOD = TimeValue.timeValueHours(7 * 24);
/** Sampling interval between two collections (default to 10s) */ /** Sampling interval between two collections (default to 10s) */

View File

@ -0,0 +1,25 @@
{
"template": ".marvel-es-data-${marvel.template.version}",
"settings": {
"marvel.plugin.version": "${marvel.plugin.version}",
"marvel.template.version": "${marvel.template.version}",
"index.number_of_shards": 1,
"index.number_of_replicas": 1,
"index.codec": "best_compression",
"index.mapper.dynamic": false
},
"mappings": {
"_default_": {
"_all": {
"enabled": false
},
"date_detection": false
},
"cluster_info": {
"enabled": false
},
"node": {
"enabled": false
}
}
}

View File

@ -1,7 +1,8 @@
{ {
"template": ".marvel-es-*", "template": ".marvel-es-${marvel.template.version}-*",
"settings": { "settings": {
"marvel_version": "${project.version}", "marvel.plugin.version": "${marvel.plugin.version}",
"marvel.template.version": "${marvel.template.version}",
"index.number_of_shards": 1, "index.number_of_shards": 1,
"index.number_of_replicas": 1, "index.number_of_replicas": 1,
"index.codec": "best_compression", "index.codec": "best_compression",
@ -196,12 +197,6 @@
} }
} }
}, },
"cluster_info": {
"enabled": false
},
"node": {
"enabled": false
},
"nodes": { "nodes": {
"properties": { "properties": {
"state_uuid": { "state_uuid": {

View File

@ -0,0 +1 @@
marvel.template.version=${marvel.template.version}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.collector.AbstractCollectorTestCase; import org.elasticsearch.marvel.agent.collector.AbstractCollectorTestCase;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc; import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.license.MarvelLicensee; import org.elasticsearch.marvel.license.MarvelLicensee;
@ -182,7 +183,7 @@ public class ClusterStateCollectorTests extends AbstractCollectorTestCase {
case ClusterStateCollector.NODE_TYPE: case ClusterStateCollector.NODE_TYPE:
DiscoveryNodeMarvelDoc discoveryNodeMarvelDoc = (DiscoveryNodeMarvelDoc) marvelDoc; DiscoveryNodeMarvelDoc discoveryNodeMarvelDoc = (DiscoveryNodeMarvelDoc) marvelDoc;
assertThat(discoveryNodeMarvelDoc.index(), equalTo(MarvelSettings.MARVEL_DATA_INDEX_NAME)); assertThat(discoveryNodeMarvelDoc.index(), equalTo(MarvelSettings.MARVEL_DATA_INDEX_PREFIX + MarvelTemplateUtils.TEMPLATE_VERSION));
assertThat(discoveryNodeMarvelDoc.id(), not(isEmptyOrNullString())); assertThat(discoveryNodeMarvelDoc.id(), not(isEmptyOrNullString()));
assertNotNull(discoveryNodeMarvelDoc.getNode()); assertNotNull(discoveryNodeMarvelDoc.getNode());
discoveryNodes.add(discoveryNodeMarvelDoc); discoveryNodes.add(discoveryNodeMarvelDoc);

View File

@ -5,7 +5,6 @@
*/ */
package org.elasticsearch.marvel.agent.exporter; package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.collector.Collector; import org.elasticsearch.marvel.agent.collector.Collector;
@ -14,9 +13,6 @@ import org.elasticsearch.marvel.agent.collector.node.NodeStatsCollector;
import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.test.MarvelIntegTestCase; import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.VersionUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -25,11 +21,15 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.dataTemplateName;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.indexTemplateName;
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST; import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
@ClusterScope(scope = TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0) @ClusterScope(scope = TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public abstract class AbstractExporterTemplateTestCase extends MarvelIntegTestCase { public abstract class AbstractExporterTemplateTestCase extends MarvelIntegTestCase {
private final Integer currentVersion = MarvelTemplateUtils.TEMPLATE_VERSION;
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder() Settings.Builder settings = Settings.builder()
@ -44,149 +44,116 @@ public abstract class AbstractExporterTemplateTestCase extends MarvelIntegTestCa
protected abstract Settings exporterSettings(); protected abstract Settings exporterSettings();
protected abstract void deleteTemplate() throws Exception; protected abstract void deleteTemplates() throws Exception;
protected abstract void putTemplate(String version) throws Exception; protected abstract void putTemplate(String name, int version) throws Exception;
protected abstract void createMarvelIndex(String index) throws Exception; protected abstract void assertTemplateExist(String name) throws Exception;
protected abstract void assertTemplateUpdated(Version version) throws Exception; protected abstract void assertTemplateNotUpdated(String name) throws Exception;
protected abstract void assertTemplateNotUpdated(Version version) throws Exception; public void testCreateWhenNoExistingTemplates() throws Exception {
protected abstract void assertMappingsUpdated(String... indices) throws Exception;
protected abstract void assertMappingsNotUpdated(String... indices) throws Exception;
protected abstract void assertIndicesNotCreated() throws Exception;
public void testCreateWhenNoExistingTemplate() throws Exception {
internalCluster().startNode(); internalCluster().startNode();
deleteTemplate(); deleteTemplates();
doExporting(); doExporting();
logger.debug("--> template does not exist: it should have been created in the current version"); logger.debug("--> templates does not exist: it should have been created in the current version");
assertTemplateUpdated(currentVersion()); assertTemplateExist(indexTemplateName());
assertTemplateExist(dataTemplateName());
doExporting(); doExporting();
logger.debug("--> mappings should be up-to-date"); logger.debug("--> indices should have been created");
assertMappingsUpdated(currentIndices()); awaitIndexExists(currentDataIndexName());
awaitIndexExists(currentTimestampedIndexName());
} }
public void testUpdateWhenExistingTemplateHasNoVersion() throws Exception { public void testCreateWhenExistingTemplatesAreOld() throws Exception {
internalCluster().startNode(); internalCluster().startNode();
putTemplate(""); final Integer version = randomIntBetween(0, currentVersion - 1);
doExporting(); putTemplate(indexTemplateName(version), version);
putTemplate(dataTemplateName(version), version);
logger.debug("--> existing template does not have a version: it should be updated to the current version");
assertTemplateUpdated(currentVersion());
doExporting(); doExporting();
logger.debug("--> mappings should be up-to-date"); logger.debug("--> existing templates are old");
assertMappingsUpdated(currentIndices()); assertTemplateExist(dataTemplateName(version));
assertTemplateExist(indexTemplateName(version));
logger.debug("--> existing templates are old: new templates should be created");
assertTemplateExist(indexTemplateName());
assertTemplateExist(dataTemplateName());
doExporting();
logger.debug("--> indices should have been created");
awaitIndexExists(currentDataIndexName());
awaitIndexExists(currentTimestampedIndexName());
} }
public void testUpdateWhenExistingTemplateHasWrongVersion() throws Exception { public void testCreateWhenExistingTemplateAreUpToDate() throws Exception {
internalCluster().startNode(); internalCluster().startNode();
putTemplate(randomAsciiOfLength(5)); putTemplate(indexTemplateName(currentVersion), currentVersion);
doExporting(); putTemplate(dataTemplateName(currentVersion), currentVersion);
logger.debug("--> existing template has a wrong version: it should be updated to the current version");
assertTemplateUpdated(currentVersion());
doExporting(); doExporting();
logger.debug("--> mappings should be up-to-date"); logger.debug("--> existing templates are up to date");
assertMappingsUpdated(currentIndices()); assertTemplateExist(indexTemplateName());
assertTemplateExist(dataTemplateName());
logger.debug("--> existing templates has the same version: they should not be changed");
assertTemplateNotUpdated(indexTemplateName());
assertTemplateNotUpdated(dataTemplateName());
doExporting();
logger.debug("--> indices should have been created");
awaitIndexExists(currentDataIndexName());
awaitIndexExists(currentTimestampedIndexName());
} }
public void testNoUpdateWhenExistingTemplateIsTooOld() throws Exception { public void testRandomTemplates() throws Exception {
internalCluster().startNode(); internalCluster().startNode();
putTemplate(VersionUtils.getFirstVersion().number()); int previousIndexTemplateVersion = rarely() ? currentVersion : randomIntBetween(0, currentVersion - 1);
doExporting(); boolean previousIndexTemplateExist = randomBoolean();
if (previousIndexTemplateExist) {
logger.debug("--> existing template is too old: it should not be updated"); logger.debug("--> creating index template in version [{}]", previousIndexTemplateVersion);
assertTemplateNotUpdated(VersionUtils.getFirstVersion()); putTemplate(indexTemplateName(previousIndexTemplateVersion), previousIndexTemplateVersion);
doExporting();
logger.debug("--> existing template is too old: no data is exported");
assertIndicesNotCreated();
}
public void testUpdateWhenExistingTemplateIsOld() throws Exception {
internalCluster().startNode();
putTemplate(VersionUtils.getPreviousVersion(currentVersion()).number());
doExporting();
logger.debug("--> existing template is old but supported: it should be updated to the current version");
assertTemplateUpdated(currentVersion());
doExporting();
logger.debug("--> mappings should be up-to-date");
assertMappingsUpdated(currentIndices());
}
public void testUpdateWhenExistingTemplateIsUpToDate() throws Exception {
internalCluster().startNode();
putTemplate(currentVersion().toString());
doExporting();
logger.debug("--> existing template has the same version: it should not be updated");
assertTemplateNotUpdated(currentVersion());
doExporting();
logger.debug("--> mappings should not have been updated");
assertMappingsNotUpdated(currentIndices());
}
public void testMappingsUpdate() throws Exception {
boolean updateMappings = randomBoolean();
logger.debug("--> update_mappings is {}", updateMappings);
internalCluster().startNode(Settings.builder().put("marvel.agent.exporters._exporter.update_mappings", updateMappings));
logger.debug("--> putting a template with a very old version so that it will not be updated");
putTemplate(VersionUtils.getFirstVersion().toString());
logger.debug("--> creating marvel data index");
createMarvelIndex(MarvelSettings.MARVEL_DATA_INDEX_NAME);
logger.debug("--> creating a cold marvel index");
createMarvelIndex(coldIndex());
logger.debug("--> creating an active marvel index");
createMarvelIndex(hotIndex());
logger.debug("--> all indices have a old mapping now");
assertMappingsNotUpdated(coldIndex(), hotIndex(), MarvelSettings.MARVEL_DATA_INDEX_NAME);
logger.debug("--> updating the template with a previous version, so that it will be updated when exporting documents");
putTemplate(VersionUtils.getPreviousVersion(currentVersion()).number());
doExporting();
logger.debug("--> existing template is old: it should be updated to the current version");
assertTemplateUpdated(currentVersion());
logger.debug("--> cold marvel index: mappings should not have been updated");
assertMappingsNotUpdated(coldIndex());
if (updateMappings) {
logger.debug("--> marvel indices: mappings should be up-to-date");
assertMappingsUpdated(MarvelSettings.MARVEL_DATA_INDEX_NAME, hotIndex());
} else {
logger.debug("--> marvel indices: mappings should bnot have been updated");
assertMappingsNotUpdated(MarvelSettings.MARVEL_DATA_INDEX_NAME, hotIndex());
} }
int previousDataTemplateVersion = rarely() ? currentVersion : randomIntBetween(0, currentVersion - 1);
boolean previousDataTemplateExist = randomBoolean();
if (previousDataTemplateExist) {
logger.debug("--> creating data template in version [{}]", previousDataTemplateVersion);
putTemplate(dataTemplateName(previousDataTemplateVersion), previousDataTemplateVersion);
}
doExporting();
logger.debug("--> templates should exist in current version");
assertTemplateExist(indexTemplateName());
assertTemplateExist(dataTemplateName());
if (previousIndexTemplateExist) {
logger.debug("--> index template should exist in version [{}]", previousIndexTemplateVersion);
assertTemplateExist(indexTemplateName(previousIndexTemplateVersion));
}
if (previousDataTemplateExist) {
logger.debug("--> data template should exist in version [{}]", previousDataTemplateVersion);
assertTemplateExist(dataTemplateName(previousDataTemplateVersion));
}
doExporting();
logger.debug("--> indices should exist in current version");
awaitIndexExists(currentDataIndexName());
awaitIndexExists(currentTimestampedIndexName());
} }
protected void doExporting() throws Exception { protected void doExporting() throws Exception {
@ -203,59 +170,23 @@ public abstract class AbstractExporterTemplateTestCase extends MarvelIntegTestCa
return exporters.iterator().next(); return exporters.iterator().next();
} }
private Version currentVersion() { private String currentDataIndexName() {
return MarvelTemplateUtils.loadDefaultTemplateVersion(); return ".marvel-es-data-" + String.valueOf(currentVersion);
} }
private String[] currentIndices() { private String currentTimestampedIndexName() {
return new String[]{hotIndex(), MarvelSettings.MARVEL_DATA_INDEX_NAME};
}
private String coldIndex() {
return exporter().indexNameResolver().resolve(new DateTime(2012, 3, 10, 0, 0, DateTimeZone.UTC).getMillis());
}
private String hotIndex() {
return exporter().indexNameResolver().resolve(System.currentTimeMillis()); return exporter().indexNameResolver().resolve(System.currentTimeMillis());
} }
/** Generates a template that looks like an old one **/ /** Generates a basic template **/
protected static BytesReference generateTemplateSource(String version) throws IOException { protected static BytesReference generateTemplateSource(String name, Integer version) throws IOException {
return jsonBuilder().startObject() return jsonBuilder().startObject()
.field("template", ".marvel-es-*") .field("template", name)
.startObject("settings") .startObject("settings")
.field("index.number_of_shards", 1) .field("index.number_of_shards", 1)
.field("index.number_of_replicas", 1) .field("index.number_of_replicas", 1)
.field("index.mapper.dynamic", false) .field(MarvelTemplateUtils.VERSION_FIELD, String.valueOf(version))
.field(MarvelTemplateUtils.MARVEL_VERSION_FIELD, version)
.endObject() .endObject()
.startObject("mappings") .endObject().bytes();
.startObject("_default_")
.startObject("_all")
.field("enabled", false)
.endObject()
.field("date_detection", false)
.startObject("properties")
.startObject("cluster_uuid")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("timestamp")
.field("type", "date")
.field("format", "date_time")
.endObject()
.endObject()
.endObject()
.startObject("cluster_info")
.field("enabled", false)
.endObject()
.startObject("node_stats")
.startObject("properties")
.startObject("node_stats")
.field("type", "object")
.endObject()
.endObject()
.endObject()
.endObject().bytes();
} }
} }

View File

@ -5,89 +5,54 @@
*/ */
package org.elasticsearch.marvel.agent.exporter; package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.marvel.support.VersionUtils;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.MARVEL_VERSION_FIELD; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
//import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.MARVEL_VERSION_FIELD;
public class MarvelTemplateUtilsTests extends ESTestCase { public class MarvelTemplateUtilsTests extends ESTestCase {
public void testLoadTemplate() { public void testLoadTimestampedIndexTemplate() {
byte[] template = MarvelTemplateUtils.loadDefaultTemplate(); byte[] template = MarvelTemplateUtils.loadTimestampedIndexTemplate();
assertNotNull(template); assertNotNull(template);
assertThat(template.length, Matchers.greaterThan(0)); assertThat(template.length, greaterThan(0));
} }
public void testParseTemplateVersionFromByteArrayTemplate() throws IOException { public void testLoadDataIndexTemplate() {
byte[] template = MarvelTemplateUtils.loadDefaultTemplate(); byte[] template = MarvelTemplateUtils.loadDataIndexTemplate();
assertNotNull(template); assertNotNull(template);
assertThat(template.length, greaterThan(0));
}
Version version = MarvelTemplateUtils.parseTemplateVersion(template); public void testLoad() throws IOException {
String resource = randomFrom(MarvelTemplateUtils.INDEX_TEMPLATE_FILE, MarvelTemplateUtils.DATA_TEMPLATE_FILE);
byte[] template = MarvelTemplateUtils.load(resource);
assertNotNull(template);
assertThat(template.length, greaterThan(0));
}
public void testLoadTemplateVersion() {
Integer version = MarvelTemplateUtils.loadTemplateVersion();
assertNotNull(version); assertNotNull(version);
assertThat(version, greaterThan(0));
assertThat(version, equalTo(MarvelTemplateUtils.TEMPLATE_VERSION));
} }
public void testParseTemplateVersionFromStringTemplate() throws IOException { public void testIndexTemplateName() {
List<String> templates = new ArrayList<>(); assertThat(MarvelTemplateUtils.indexTemplateName(),
templates.add("{\"marvel_version\": \"1.4.0.Beta1\"}"); equalTo(MarvelTemplateUtils.INDEX_TEMPLATE_NAME_PREFIX + MarvelTemplateUtils.TEMPLATE_VERSION));
templates.add("{\"marvel_version\": \"1.6.2-SNAPSHOT\"}"); int version = randomIntBetween(1, 100);
templates.add("{\"marvel_version\": \"1.7.1\"}"); assertThat(MarvelTemplateUtils.indexTemplateName(version), equalTo(".marvel-es-" + version));
templates.add("{\"marvel_version\": \"2.0.0-beta1\"}");
templates.add("{\"marvel_version\": \"2.0.0\"}");
templates.add("{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }");
for (String template : templates) {
Version version = MarvelTemplateUtils.parseTemplateVersion(Strings.toUTF8Bytes(template));
assertNotNull(version);
}
Version version = MarvelTemplateUtils.parseTemplateVersion(Strings.toUTF8Bytes("{\"marvel.index_format\": \"7\"}"));
assertNull(version);
} }
public void testParseVersion() throws IOException { public void testDataTemplateName() {
assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0-beta1\"}")); assertThat(MarvelTemplateUtils.dataTemplateName(),
assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0\"}")); equalTo(MarvelTemplateUtils.DATA_TEMPLATE_NAME_PREFIX + MarvelTemplateUtils.TEMPLATE_VERSION));
assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel_version\": \"1.5.2\"}")); int version = randomIntBetween(1, 100);
assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }")); assertThat(MarvelTemplateUtils.dataTemplateName(version), equalTo(".marvel-es-data-" + version));
assertNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel.index_format\": \"7\"}"));
assertNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD + "unkown", "{\"marvel_version\": \"1.5.2\"}"));
}
public void testTemplateVersionMandatesAnUpdate() {
// Version is unknown
assertTrue(MarvelTemplateUtils.installedTemplateVersionMandatesAnUpdate(Version.CURRENT, null, logger, "unit-test"));
// Version is too old
Version unsupported = org.elasticsearch.test.VersionUtils.getPreviousVersion(MarvelTemplateUtils.MIN_SUPPORTED_TEMPLATE_VERSION);
assertFalse(MarvelTemplateUtils.installedTemplateVersionMandatesAnUpdate(Version.CURRENT, unsupported, logger, "unit-test"));
// Version is old but supported
assertTrue(MarvelTemplateUtils.installedTemplateVersionMandatesAnUpdate(Version.CURRENT, MarvelTemplateUtils.MIN_SUPPORTED_TEMPLATE_VERSION, logger, "unit-test"));
// Version is up to date
assertFalse(MarvelTemplateUtils.installedTemplateVersionMandatesAnUpdate(Version.CURRENT, Version.CURRENT, logger, "unit-test"));
// Version is up to date
Version previous = org.elasticsearch.test.VersionUtils.getPreviousVersion(Version.CURRENT);
assertFalse(MarvelTemplateUtils.installedTemplateVersionMandatesAnUpdate(previous, Version.CURRENT, logger, "unit-test"));
}
public void testTemplateVersionIsSufficient() {
// Version is unknown
assertFalse(MarvelTemplateUtils.installedTemplateVersionIsSufficient(null));
// Version is too old
Version unsupported = org.elasticsearch.test.VersionUtils.getPreviousVersion(MarvelTemplateUtils.MIN_SUPPORTED_TEMPLATE_VERSION);
assertFalse(MarvelTemplateUtils.installedTemplateVersionIsSufficient(unsupported));
// Version is OK
assertTrue(MarvelTemplateUtils.installedTemplateVersionIsSufficient(Version.CURRENT));
} }
} }

View File

@ -11,26 +11,25 @@ import com.squareup.okhttp.mockwebserver.MockWebServer;
import com.squareup.okhttp.mockwebserver.RecordedRequest; import com.squareup.okhttp.mockwebserver.RecordedRequest;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.marvel.agent.exporter.AbstractExporterTemplateTestCase; import org.elasticsearch.marvel.agent.exporter.AbstractExporterTemplateTestCase;
import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import java.io.IOException;
import java.net.BindException; import java.net.BindException;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
public class HttpExporterTemplateTests extends AbstractExporterTemplateTestCase { public class HttpExporterTemplateTests extends AbstractExporterTemplateTestCase {
@ -70,68 +69,39 @@ public class HttpExporterTemplateTests extends AbstractExporterTemplateTestCase
} }
@Override @Override
protected void deleteTemplate() { protected void deleteTemplates() throws Exception {
dispatcher.setTemplate(null); dispatcher.templates.clear();
} }
@Override @Override
protected void putTemplate(String version) throws Exception { protected void putTemplate(String name, int version) throws Exception {
dispatcher.setTemplate(generateTemplateSource(version).toBytes()); dispatcher.templates.put(name, generateTemplateSource(name, version));
} }
@Override @Override
protected void createMarvelIndex(String index) throws Exception { protected void assertTemplateExist(String name) throws Exception {
dispatcher.addIndex(index); assertThat("failed to find a template matching [" + name + "]", dispatcher.templates.containsKey(name), is(true));
} }
@Override @Override
protected void assertTemplateUpdated(Version version) { protected void assertTemplateNotUpdated(String name) throws Exception {
// Checks that a PUT Template request has been made
assertThat(dispatcher.hasRequest("PUT", "/_template/" + MarvelTemplateUtils.INDEX_TEMPLATE_NAME), is(true));
// Checks that the current template has the expected version
assertThat(MarvelTemplateUtils.parseTemplateVersion(dispatcher.getTemplate()), equalTo(version));
}
@Override
protected void assertTemplateNotUpdated(Version version) throws Exception {
// Checks that no PUT Template request has been made // Checks that no PUT Template request has been made
assertThat(dispatcher.hasRequest("PUT", "/_template/" + MarvelTemplateUtils.INDEX_TEMPLATE_NAME), is(false)); assertThat(dispatcher.hasRequest("PUT", "/_template/" + name), is(false));
// Checks that the current template has the expected version // Checks that the current template exists
assertThat(MarvelTemplateUtils.parseTemplateVersion(dispatcher.getTemplate()), equalTo(version)); assertThat(dispatcher.templates.containsKey(name), is(true));
} }
@Override @Override
protected void assertIndicesNotCreated() throws Exception { protected void awaitIndexExists(String... indices) throws Exception {
// Checks that no Bulk request has been made assertBusy(new Runnable() {
assertThat(dispatcher.hasRequest("POST", "/_bulk"), is(false)); @Override
assertThat(dispatcher.mappings.size(), equalTo(0)); public void run() {
} for (String index : indices) {
assertThat("could not find index " + index, dispatcher.hasIndex(index), is(true));
@Override }
protected void assertMappingsUpdated(String... indices) throws Exception {
// Load the mappings of the old template
Set<String> oldMappings = new PutIndexTemplateRequest().source(generateTemplateSource(null)).mappings().keySet();
// Load the mappings of the latest template
Set<String> newMappings = new PutIndexTemplateRequest().source(generateTemplateSource(null)).mappings().keySet();
newMappings.removeAll(oldMappings);
for (String index : indices) {
for (String mapping : newMappings) {
// Checks that a PUT Mapping request has been made for every type that was not in the old template
assertThat(dispatcher.hasRequest("PUT", "/" + index + "/_mapping/" + mapping), equalTo(true));
} }
} }, 10, TimeUnit.SECONDS);
}
@Override
protected void assertMappingsNotUpdated(String... indices) throws Exception {
for (String index : indices) {
// Checks that no PUT Template request has been made
assertThat(dispatcher.hasRequest("PUT", "/" + index + "/_mapping/"), is(false));
}
} }
class MockServerDispatcher extends Dispatcher { class MockServerDispatcher extends Dispatcher {
@ -140,100 +110,59 @@ public class HttpExporterTemplateTests extends AbstractExporterTemplateTestCase
private final MockResponse NOT_FOUND = newResponse(404, ""); private final MockResponse NOT_FOUND = newResponse(404, "");
private final Set<String> requests = new HashSet<>(); private final Set<String> requests = new HashSet<>();
private final Map<String, BytesReference> templates = ConcurrentCollections.newConcurrentMap();
private final Map<String, Set<String>> mappings = new HashMap<>(); private final Set<String> indices = ConcurrentCollections.newConcurrentSet();
private byte[] template;
@Override @Override
public MockResponse dispatch(RecordedRequest request) throws InterruptedException { public MockResponse dispatch(RecordedRequest request) throws InterruptedException {
synchronized (this) { final String requestLine = request.getRequestLine();
final String requestLine = request.getRequestLine(); requests.add(requestLine);
requests.add(requestLine);
switch (requestLine) { switch (requestLine) {
// Cluster version // Cluster version
case "GET / HTTP/1.1": case "GET / HTTP/1.1":
return newResponse(200, "{\"version\": {\"number\": \"" + Version.CURRENT.number() + "\"}}"); return newResponse(200, "{\"version\": {\"number\": \"" + Version.CURRENT.number() + "\"}}");
// Bulk
// Template case "POST /_bulk HTTP/1.1":
case "GET /_template/.marvel-es HTTP/1.1": // Parse the bulk request and extract all index names
return (template == null) ? NOT_FOUND : newResponse(200, new BytesArray(template).toUtf8()); try {
BulkRequest bulk = new BulkRequest();
case "PUT /_template/.marvel-es HTTP/1.1": byte[] source = request.getBody().readByteArray();
this.template = request.getBody().readByteArray(); bulk.add(source, 0, source.length);
return OK; for (ActionRequest docRequest : bulk.requests()) {
if (docRequest instanceof IndexRequest) {
// Bulk indices.add(((IndexRequest) docRequest).index());
case "POST /_bulk HTTP/1.1":
return OK;
default:
String[] paths = Strings.splitStringToArray(request.getPath(), '/');
// Index Mappings
if ((paths != null) && (paths.length > 0) && ("_mapping".equals(paths[1]))) {
if (!mappings.containsKey(paths[0])) {
// Index does not exist
return NOT_FOUND;
}
// Get index mappings
if ("GET".equals(request.getMethod())) {
try {
// Builds a fake mapping response
XContentBuilder builder = jsonBuilder().startObject().startObject(paths[0]).startObject("mappings");
for (String type : mappings.get(paths[0])) {
builder.startObject(type).endObject();
}
builder.endObject().endObject().endObject();
return newResponse(200, builder.bytes().toUtf8());
} catch (IOException e) {
return newResponse(500, e.getMessage());
}
// Put index mapping
} else if ("PUT".equals(request.getMethod()) && paths.length > 2) {
Set<String> types = mappings.get(paths[0]);
if (types == null) {
types = new HashSet<>();
}
types.add(paths[2]);
return OK;
} }
} }
break; } catch (Exception e) {
} return newResponse(500, e.getMessage());
}
return OK;
default:
String[] paths = Strings.splitStringToArray(request.getPath(), '/');
return newResponse(500, "MockServerDispatcher does not support: " + request.getRequestLine()); // Templates
if ((paths != null) && (paths.length > 0) && ("_template".equals(paths[0]))) {
String templateName = paths[1];
boolean templateExist = templates.containsKey(templateName);
if ("GET".equals(request.getMethod())) {
return templateExist ? newResponse(200, templates.get(templateName).toUtf8()) : NOT_FOUND;
}
if ("PUT".equals(request.getMethod())) {
templates.put(templateName, new BytesArray(request.getBody().readByteArray()));
return templateExist ? newResponse(200, "updated") : newResponse(201, "created");
}
}
break;
} }
return newResponse(500, "MockServerDispatcher does not support: " + request.getRequestLine());
} }
MockResponse newResponse(int code, String body) { MockResponse newResponse(int code, String body) {
return new MockResponse().setResponseCode(code).setBody(body); return new MockResponse().setResponseCode(code).setBody(body);
} }
void setTemplate(byte[] template) {
synchronized (this) {
this.template = template;
}
}
byte[] getTemplate() {
return template;
}
void addIndex(String index) {
synchronized (this) {
if (template != null) {
// Simulate the use of the index template when creating an index
mappings.put(index, new HashSet<>(new PutIndexTemplateRequest().source(template).mappings().keySet()));
} else {
mappings.put(index, null);
}
}
}
int countRequests(String method, String path) { int countRequests(String method, String path) {
int count = 0; int count = 0;
for (String request : requests) { for (String request : requests) {
@ -247,5 +176,9 @@ public class HttpExporterTemplateTests extends AbstractExporterTemplateTestCase
boolean hasRequest(String method, String path) { boolean hasRequest(String method, String path) {
return countRequests(method, path) > 0; return countRequests(method, path) > 0;
} }
boolean hasIndex(String index) {
return indices.contains(index);
}
} }
} }

View File

@ -46,6 +46,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.dataTemplateName;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.indexTemplateName;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
@ -53,9 +55,13 @@ import static org.hamcrest.Matchers.is;
@ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0) @ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public class HttpExporterTests extends MarvelIntegTestCase { public class HttpExporterTests extends MarvelIntegTestCase {
private int webPort; private int webPort;
private MockWebServer webServer; private MockWebServer webServer;
private static final byte[] TIMESTAMPED_TEMPLATE = MarvelTemplateUtils.loadTimestampedIndexTemplate();
private static final byte[] DATA_TEMPLATE = MarvelTemplateUtils.loadDataIndexTemplate();
@Before @Before
public void startWebservice() throws Exception { public void startWebservice() throws Exception {
for (webPort = 9250; webPort < 9300; webPort++) { for (webPort = 9250; webPort < 9300; webPort++) {
@ -81,8 +87,10 @@ public class HttpExporterTests extends MarvelIntegTestCase {
public void testExport() throws Exception { public void testExport() throws Exception {
enqueueGetClusterVersionResponse(Version.CURRENT); enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueResponse(404, "marvel template does not exist"); enqueueResponse(404, "template for timestamped indices does not exist");
enqueueResponse(201, "marvel template created"); enqueueResponse(201, "template for timestamped indices created");
enqueueResponse(404, "template for data index does not exist");
enqueueResponse(201, "template for data index created");
enqueueResponse(200, "successful bulk request "); enqueueResponse(200, "successful bulk request ");
Settings.Builder builder = Settings.builder() Settings.Builder builder = Settings.builder()
@ -98,7 +106,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
final int nbDocs = randomIntBetween(1, 25); final int nbDocs = randomIntBetween(1, 25);
exporter.export(newRandomMarvelDocs(nbDocs)); exporter.export(newRandomMarvelDocs(nbDocs));
assertThat(webServer.getRequestCount(), equalTo(4)); assertThat(webServer.getRequestCount(), equalTo(6));
RecordedRequest recordedRequest = webServer.takeRequest(); RecordedRequest recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET")); assertThat(recordedRequest.getMethod(), equalTo("GET"));
@ -106,12 +114,21 @@ public class HttpExporterTests extends MarvelIntegTestCase {
recordedRequest = webServer.takeRequest(); recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET")); assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/.marvel-es")); assertThat(recordedRequest.getPath(), equalTo("/_template/" + indexTemplateName()));
recordedRequest = webServer.takeRequest(); recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT")); assertThat(recordedRequest.getMethod(), equalTo("PUT"));
assertThat(recordedRequest.getPath(), equalTo("/_template/.marvel-es")); assertThat(recordedRequest.getPath(), equalTo("/_template/" + indexTemplateName()));
assertThat(recordedRequest.getBody().readByteArray(), equalTo(MarvelTemplateUtils.loadDefaultTemplate())); assertThat(recordedRequest.getBody().readByteArray(), equalTo(TIMESTAMPED_TEMPLATE));
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + dataTemplateName()));
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + dataTemplateName()));
assertThat(recordedRequest.getBody().readByteArray(), equalTo(DATA_TEMPLATE));
recordedRequest = webServer.takeRequest(); recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("POST")); assertThat(recordedRequest.getMethod(), equalTo("POST"));
@ -156,8 +173,10 @@ public class HttpExporterTests extends MarvelIntegTestCase {
logger.info("--> starting node"); logger.info("--> starting node");
enqueueGetClusterVersionResponse(Version.CURRENT); enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueResponse(404, "marvel template does not exist"); enqueueResponse(404, "template for timestamped indices does not exist");
enqueueResponse(201, "marvel template created"); enqueueResponse(201, "template for timestamped indices created");
enqueueResponse(404, "template for data index does not exist");
enqueueResponse(201, "template for data index created");
enqueueResponse(200, "successful bulk request "); enqueueResponse(200, "successful bulk request ");
String agentNode = internalCluster().startNode(builder); String agentNode = internalCluster().startNode(builder);
@ -168,7 +187,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
exporter.export(Collections.singletonList(newRandomMarvelDoc())); exporter.export(Collections.singletonList(newRandomMarvelDoc()));
assertThat(exporter.supportedClusterVersion, is(true)); assertThat(exporter.supportedClusterVersion, is(true));
assertThat(webServer.getRequestCount(), equalTo(4)); assertThat(webServer.getRequestCount(), equalTo(6));
RecordedRequest recordedRequest = webServer.takeRequest(); RecordedRequest recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET")); assertThat(recordedRequest.getMethod(), equalTo("GET"));
@ -176,12 +195,21 @@ public class HttpExporterTests extends MarvelIntegTestCase {
recordedRequest = webServer.takeRequest(); recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET")); assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/.marvel-es")); assertThat(recordedRequest.getPath(), equalTo("/_template/" + indexTemplateName()));
recordedRequest = webServer.takeRequest(); recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT")); assertThat(recordedRequest.getMethod(), equalTo("PUT"));
assertThat(recordedRequest.getPath(), equalTo("/_template/.marvel-es")); assertThat(recordedRequest.getPath(), equalTo("/_template/" + indexTemplateName()));
assertThat(recordedRequest.getBody().readByteArray(), equalTo(MarvelTemplateUtils.loadDefaultTemplate())); assertThat(recordedRequest.getBody().readByteArray(), equalTo(TIMESTAMPED_TEMPLATE));
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + dataTemplateName()));
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + dataTemplateName()));
assertThat(recordedRequest.getBody().readByteArray(), equalTo(DATA_TEMPLATE));
recordedRequest = webServer.takeRequest(); recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("POST")); assertThat(recordedRequest.getMethod(), equalTo("POST"));
@ -214,14 +242,15 @@ public class HttpExporterTests extends MarvelIntegTestCase {
exporter = getExporter(agentNode); exporter = getExporter(agentNode);
enqueueGetClusterVersionResponse(secondWebServer, Version.CURRENT); enqueueGetClusterVersionResponse(secondWebServer, Version.CURRENT);
enqueueResponse(secondWebServer, 404, "marvel template does not exist"); enqueueResponse(secondWebServer, 404, "template for timestamped indices does not exist");
enqueueResponse(secondWebServer, 201, "marvel template created"); enqueueResponse(secondWebServer, 201, "template for timestamped indices created");
enqueueResponse(secondWebServer, 200, "template for data index exist");
enqueueResponse(secondWebServer, 200, "successful bulk request "); enqueueResponse(secondWebServer, 200, "successful bulk request ");
logger.info("--> exporting a second event"); logger.info("--> exporting a second event");
exporter.export(Collections.singletonList(newRandomMarvelDoc())); exporter.export(Collections.singletonList(newRandomMarvelDoc()));
assertThat(secondWebServer.getRequestCount(), equalTo(4)); assertThat(secondWebServer.getRequestCount(), equalTo(5));
recordedRequest = secondWebServer.takeRequest(); recordedRequest = secondWebServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET")); assertThat(recordedRequest.getMethod(), equalTo("GET"));
@ -229,12 +258,16 @@ public class HttpExporterTests extends MarvelIntegTestCase {
recordedRequest = secondWebServer.takeRequest(); recordedRequest = secondWebServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET")); assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/.marvel-es")); assertThat(recordedRequest.getPath(), equalTo("/_template/" + indexTemplateName()));
recordedRequest = secondWebServer.takeRequest(); recordedRequest = secondWebServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT")); assertThat(recordedRequest.getMethod(), equalTo("PUT"));
assertThat(recordedRequest.getPath(), equalTo("/_template/.marvel-es")); assertThat(recordedRequest.getPath(), equalTo("/_template/" + indexTemplateName()));
assertThat(recordedRequest.getBody().readByteArray(), equalTo(MarvelTemplateUtils.loadDefaultTemplate())); assertThat(recordedRequest.getBody().readByteArray(), equalTo(TIMESTAMPED_TEMPLATE));
recordedRequest = secondWebServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + dataTemplateName()));;
recordedRequest = secondWebServer.takeRequest(); recordedRequest = secondWebServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("POST")); assertThat(recordedRequest.getMethod(), equalTo("POST"));
@ -287,8 +320,10 @@ public class HttpExporterTests extends MarvelIntegTestCase {
logger.info("--> exporting a first event"); logger.info("--> exporting a first event");
enqueueGetClusterVersionResponse(Version.CURRENT); enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueResponse(404, "marvel template does not exist"); enqueueResponse(404, "template for timestamped indices does not exist");
enqueueResponse(201, "marvel template created"); enqueueResponse(201, "template for timestamped indices created");
enqueueResponse(404, "template for data index does not exist");
enqueueResponse(201, "template for data index created");
enqueueResponse(200, "successful bulk request "); enqueueResponse(200, "successful bulk request ");
HttpExporter exporter = getExporter(agentNode); HttpExporter exporter = getExporter(agentNode);
@ -296,7 +331,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
MarvelDoc doc = newRandomMarvelDoc(); MarvelDoc doc = newRandomMarvelDoc();
exporter.export(Collections.singletonList(doc)); exporter.export(Collections.singletonList(doc));
assertThat(webServer.getRequestCount(), equalTo(4)); assertThat(webServer.getRequestCount(), equalTo(6));
RecordedRequest recordedRequest = webServer.takeRequest(); RecordedRequest recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET")); assertThat(recordedRequest.getMethod(), equalTo("GET"));
@ -304,12 +339,21 @@ public class HttpExporterTests extends MarvelIntegTestCase {
recordedRequest = webServer.takeRequest(); recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET")); assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/.marvel-es")); assertThat(recordedRequest.getPath(), equalTo("/_template/" + indexTemplateName()));
recordedRequest = webServer.takeRequest(); recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT")); assertThat(recordedRequest.getMethod(), equalTo("PUT"));
assertThat(recordedRequest.getPath(), equalTo("/_template/.marvel-es")); assertThat(recordedRequest.getPath(), equalTo("/_template/" + indexTemplateName()));
assertThat(recordedRequest.getBody().readByteArray(), equalTo(MarvelTemplateUtils.loadDefaultTemplate())); assertThat(recordedRequest.getBody().readByteArray(), equalTo(TIMESTAMPED_TEMPLATE));
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + dataTemplateName()));
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + dataTemplateName()));
assertThat(recordedRequest.getBody().readByteArray(), equalTo(DATA_TEMPLATE));
recordedRequest = webServer.takeRequest(); recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("POST")); assertThat(recordedRequest.getMethod(), equalTo("POST"));
@ -332,18 +376,18 @@ public class HttpExporterTests extends MarvelIntegTestCase {
logger.info("--> exporting a second event"); logger.info("--> exporting a second event");
enqueueGetClusterVersionResponse(Version.CURRENT); enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueResponse(404, "marvel template does not exist"); enqueueResponse(200, "template for timestamped indices exist");
enqueueResponse(201, "marvel template created"); enqueueResponse(200, "template for data index exist");
enqueueResponse(200, "successful bulk request "); enqueueResponse(200, "successful bulk request ");
doc = newRandomMarvelDoc(); doc = newRandomMarvelDoc();
exporter = getExporter(agentNode); exporter = getExporter(agentNode);
exporter.export(Collections.singletonList(doc)); exporter.export(Collections.singletonList(doc));
String expectedMarvelIndex = MarvelSettings.MARVEL_INDICES_PREFIX String expectedMarvelIndex = MarvelSettings.MARVEL_INDICES_PREFIX + MarvelTemplateUtils.TEMPLATE_VERSION + "-"
+ DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(doc.timestamp()); + DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(doc.timestamp());
assertThat(webServer.getRequestCount(), equalTo(4 + 4)); assertThat(webServer.getRequestCount(), equalTo(6 + 4));
recordedRequest = webServer.takeRequest(); recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET")); assertThat(recordedRequest.getMethod(), equalTo("GET"));
@ -351,12 +395,11 @@ public class HttpExporterTests extends MarvelIntegTestCase {
recordedRequest = webServer.takeRequest(); recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET")); assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/.marvel-es")); assertThat(recordedRequest.getPath(), equalTo("/_template/" + indexTemplateName()));
recordedRequest = webServer.takeRequest(); recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT")); assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/.marvel-es")); assertThat(recordedRequest.getPath(), equalTo("/_template/" + dataTemplateName()));
assertThat(recordedRequest.getBody().readByteArray(), equalTo(MarvelTemplateUtils.loadDefaultTemplate()));
recordedRequest = webServer.takeRequest(); recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("POST")); assertThat(recordedRequest.getMethod(), equalTo("POST"));

View File

@ -5,26 +5,12 @@
*/ */
package org.elasticsearch.marvel.agent.exporter.local; package org.elasticsearch.marvel.agent.exporter.local;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.marvel.agent.exporter.AbstractExporterTemplateTestCase; import org.elasticsearch.marvel.agent.exporter.AbstractExporterTemplateTestCase;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import java.util.Collections; import java.util.Collections;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
public class LocalExporterTemplateTests extends AbstractExporterTemplateTestCase { public class LocalExporterTemplateTests extends AbstractExporterTemplateTestCase {
@ -34,83 +20,26 @@ public class LocalExporterTemplateTests extends AbstractExporterTemplateTestCase
} }
@Override @Override
protected Set<String> excludeTemplates() { protected void deleteTemplates() throws Exception {
// Always delete the template between tests
return Collections.emptySet();
}
@Override
protected void deleteTemplate() throws Exception {
waitNoPendingTasksOnAll(); waitNoPendingTasksOnAll();
assertAcked(client().admin().indices().prepareDeleteTemplate(MarvelTemplateUtils.INDEX_TEMPLATE_NAME).get()); cluster().wipeAllTemplates(Collections.emptySet());
} }
@Override @Override
protected void putTemplate(String version) throws Exception { protected void putTemplate(String name, int version) throws Exception {
waitNoPendingTasksOnAll(); waitNoPendingTasksOnAll();
assertAcked(client().admin().indices().preparePutTemplate(MarvelTemplateUtils.INDEX_TEMPLATE_NAME).setSource(generateTemplateSource(version)).get()); assertAcked(client().admin().indices().preparePutTemplate(name).setSource(generateTemplateSource(name, version)).get());
} }
@Override @Override
protected void createMarvelIndex(String index) throws Exception { protected void assertTemplateExist(String name) throws Exception {
waitNoPendingTasksOnAll(); waitNoPendingTasksOnAll();
createIndex(index); waitForMarvelTemplate(name);
} }
@Override @Override
protected void assertTemplateUpdated(Version version) throws Exception { protected void assertTemplateNotUpdated(String name) throws Exception {
waitNoPendingTasksOnAll(); waitNoPendingTasksOnAll();
awaitMarvelTemplateInstalled(version); assertTemplateExist(name);
}
@Override
protected void assertTemplateNotUpdated(Version version) throws Exception {
waitNoPendingTasksOnAll();
awaitMarvelTemplateInstalled(version);
}
private void assertMappings(byte[] reference, String... indices) throws Exception {
waitNoPendingTasksOnAll();
Map<String, String> mappings = new PutIndexTemplateRequest().source(reference).mappings();
assertBusy(new Runnable() {
@Override
public void run() {
for (String index : indices) {
GetMappingsResponse response = client().admin().indices().prepareGetMappings(index).setIndicesOptions(IndicesOptions.lenientExpandOpen()).get();
ImmutableOpenMap<String, MappingMetaData> indexMappings = response.getMappings().get(index);
assertNotNull(indexMappings);
assertThat(indexMappings.size(), equalTo(mappings.size()));
for (String mapping : mappings.keySet()) {
// We just check that mapping type exists, we don't verify its content
assertThat("mapping type " + mapping + " should exist in index " + index, indexMappings.get(mapping), notNullValue());
}
}
}
});
}
@Override
protected void assertMappingsUpdated(String... indices) throws Exception {
assertMappings(MarvelTemplateUtils.loadDefaultTemplate(), indices);
}
@Override
protected void assertMappingsNotUpdated(String... indices) throws Exception {
assertMappings(generateTemplateSource(null).toBytes(), indices);
}
@Override
protected void assertIndicesNotCreated() throws Exception {
waitNoPendingTasksOnAll();
try {
assertThat(client().admin().indices().prepareExists(MarvelSettings.MARVEL_INDICES_PREFIX + "*").get().isExists(), is(false));
} catch (IndexNotFoundException e) {
// with shield we might get that if wildcards were resolved to no indices
if (!shieldEnabled) {
throw e;
}
}
} }
} }

View File

@ -6,9 +6,7 @@
package org.elasticsearch.marvel.agent.exporter.local; package org.elasticsearch.marvel.agent.exporter.local;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
@ -36,11 +34,12 @@ import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.dataTemplateName;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.indexTemplateName;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
@ -105,8 +104,8 @@ public class LocalExporterTests extends MarvelIntegTestCase {
updateMarvelInterval(3L, TimeUnit.SECONDS); updateMarvelInterval(3L, TimeUnit.SECONDS);
// lets wait until the marvel template will be installed // lets wait until the marvel template will be installed
awaitMarvelTemplateInstalled(); waitForMarvelTemplate(indexTemplateName());
assertThat(getCurrentlyInstalledTemplateVersion(), is(Version.CURRENT)); waitForMarvelTemplate(dataTemplateName());
} }
public void testIndexTimestampFormat() throws Exception { public void testIndexTimestampFormat() throws Exception {
@ -122,12 +121,12 @@ public class LocalExporterTests extends MarvelIntegTestCase {
LocalExporter exporter = getLocalExporter("_local"); LocalExporter exporter = getLocalExporter("_local");
// first lets test that the index resolver works with time // first lets test that the index resolver works with time
String indexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(time); String indexName = MarvelSettings.MARVEL_INDICES_PREFIX + MarvelTemplateUtils.TEMPLATE_VERSION + "-" + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(time);
assertThat(exporter.indexNameResolver().resolve(time), equalTo(indexName)); assertThat(exporter.indexNameResolver().resolve(time), equalTo(indexName));
// now lets test that the index name resolver works with a doc // now lets test that the index name resolver works with a doc
MarvelDoc doc = newRandomMarvelDoc(); MarvelDoc doc = newRandomMarvelDoc();
indexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp()); indexName = MarvelSettings.MARVEL_INDICES_PREFIX + MarvelTemplateUtils.TEMPLATE_VERSION + "-" + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp());
assertThat(exporter.indexNameResolver().resolve(doc), equalTo(indexName)); assertThat(exporter.indexNameResolver().resolve(doc), equalTo(indexName));
logger.debug("--> exporting a random marvel document"); logger.debug("--> exporting a random marvel document");
@ -138,7 +137,7 @@ public class LocalExporterTests extends MarvelIntegTestCase {
timeFormat = randomFrom("dd", "dd.MM.YYYY", "dd.MM"); timeFormat = randomFrom("dd", "dd.MM.YYYY", "dd.MM");
updateClusterSettings(Settings.builder().put("marvel.agent.exporters._local.index.name.time_format", timeFormat)); updateClusterSettings(Settings.builder().put("marvel.agent.exporters._local.index.name.time_format", timeFormat));
exporter = getLocalExporter("_local"); // we need to get it again.. as it was rebuilt exporter = getLocalExporter("_local"); // we need to get it again.. as it was rebuilt
indexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp()); indexName = MarvelSettings.MARVEL_INDICES_PREFIX + MarvelTemplateUtils.TEMPLATE_VERSION + "-" + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp());
assertThat(exporter.indexNameResolver().resolve(doc), equalTo(indexName)); assertThat(exporter.indexNameResolver().resolve(doc), equalTo(indexName));
logger.debug("--> exporting the document again (this time with the the new index name time format [{}], expecting index name [{}]", timeFormat, indexName); logger.debug("--> exporting the document again (this time with the the new index name time format [{}], expecting index name [{}]", timeFormat, indexName);
@ -194,13 +193,4 @@ public class LocalExporterTests extends MarvelIntegTestCase {
ClusterStateCollector.TYPE, timeStampGenerator.incrementAndGet(), ClusterState.PROTO, ClusterHealthStatus.GREEN); ClusterStateCollector.TYPE, timeStampGenerator.incrementAndGet(), ClusterState.PROTO, ClusterHealthStatus.GREEN);
} }
} }
private Version getCurrentlyInstalledTemplateVersion() {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(MarvelTemplateUtils.INDEX_TEMPLATE_NAME).get();
assertThat(response, notNullValue());
assertThat(response.getIndexTemplates(), notNullValue());
assertThat(response.getIndexTemplates(), hasSize(1));
assertThat(response.getIndexTemplates().get(0), notNullValue());
return MarvelTemplateUtils.templateVersion(response.getIndexTemplates().get(0));
}
} }

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.license.core.License; import org.elasticsearch.license.core.License;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterInfoCollector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterInfoCollector;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.test.MarvelIntegTestCase; import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
@ -21,6 +22,8 @@ import org.junit.Before;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.dataTemplateName;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.indexTemplateName;
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST; import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -57,20 +60,21 @@ public class ClusterInfoTests extends MarvelIntegTestCase {
final String clusterUUID = client().admin().cluster().prepareState().setMetaData(true).get().getState().metaData().clusterUUID(); final String clusterUUID = client().admin().cluster().prepareState().setMetaData(true).get().getState().metaData().clusterUUID();
assertTrue(Strings.hasText(clusterUUID)); assertTrue(Strings.hasText(clusterUUID));
logger.debug("--> waiting for the marvel data index to be created (it should have been created by the LicenseCollector)"); logger.debug("--> waiting for the marvel data index to be created (it should have been created by the ClusterInfoCollector)");
awaitIndexExists(MarvelSettings.MARVEL_DATA_INDEX_NAME); String dataIndex = ".marvel-es-data-" + MarvelTemplateUtils.TEMPLATE_VERSION;
awaitIndexExists(dataIndex);
logger.debug("--> waiting for cluster info collector to collect data"); logger.debug("--> waiting for cluster info collector to collect data");
awaitMarvelDocsCount(equalTo(1L), ClusterInfoCollector.TYPE); awaitMarvelDocsCount(equalTo(1L), ClusterInfoCollector.TYPE);
logger.debug("--> retrieving cluster info document"); logger.debug("--> retrieving cluster info document");
GetResponse response = client().prepareGet(MarvelSettings.MARVEL_DATA_INDEX_NAME, ClusterInfoCollector.TYPE, clusterUUID).get(); GetResponse response = client().prepareGet(dataIndex, ClusterInfoCollector.TYPE, clusterUUID).get();
assertTrue(MarvelSettings.MARVEL_DATA_INDEX_NAME + " document does not exist", response.isExists()); assertTrue("cluster_info document does not exist in data index", response.isExists());
logger.debug("--> checking that the document contains all required information"); logger.debug("--> checking that the document contains all required information");
logger.debug("--> checking that the document contains license information"); logger.debug("--> checking that the document contains license information");
assertThat(response.getIndex(), equalTo(MarvelSettings.MARVEL_DATA_INDEX_NAME)); assertThat(response.getIndex(), equalTo(dataIndex));
assertThat(response.getType(), equalTo(ClusterInfoCollector.TYPE)); assertThat(response.getType(), equalTo(ClusterInfoCollector.TYPE));
assertThat(response.getId(), equalTo(clusterUUID)); assertThat(response.getId(), equalTo(clusterUUID));
@ -111,14 +115,15 @@ public class ClusterInfoTests extends MarvelIntegTestCase {
assertThat(clusterStats, instanceOf(Map.class)); assertThat(clusterStats, instanceOf(Map.class));
assertThat(((Map) clusterStats).size(), greaterThan(0)); assertThat(((Map) clusterStats).size(), greaterThan(0));
assertMarvelTemplateInstalled(); waitForMarvelTemplate(indexTemplateName());
waitForMarvelTemplate(dataTemplateName());
logger.debug("--> check that the cluster_info is not indexed"); logger.debug("--> check that the cluster_info is not indexed");
securedFlush(); securedFlush();
securedRefresh(); securedRefresh();
assertHitCount(client().prepareSearch().setSize(0) assertHitCount(client().prepareSearch().setSize(0)
.setIndices(MarvelSettings.MARVEL_DATA_INDEX_NAME) .setIndices(dataIndex)
.setTypes(ClusterInfoCollector.TYPE) .setTypes(ClusterInfoCollector.TYPE)
.setQuery(QueryBuilders.boolQuery() .setQuery(QueryBuilders.boolQuery()
.should(QueryBuilders.matchQuery(License.XFields.STATUS.underscore().toString(), License.Status.ACTIVE.label())) .should(QueryBuilders.matchQuery(License.XFields.STATUS.underscore().toString(), License.Status.ACTIVE.label()))

View File

@ -11,6 +11,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.renderer.AbstractRenderer; import org.elasticsearch.marvel.agent.renderer.AbstractRenderer;
import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.test.MarvelIntegTestCase; import org.elasticsearch.marvel.test.MarvelIntegTestCase;
@ -156,9 +157,10 @@ public class ClusterStateTests extends MarvelIntegTestCase {
for (final String nodeName : internalCluster().getNodeNames()) { for (final String nodeName : internalCluster().getNodeNames()) {
final String nodeId = internalCluster().clusterService(nodeName).localNode().getId(); final String nodeId = internalCluster().clusterService(nodeName).localNode().getId();
final String dataIndex = ".marvel-es-data-" + MarvelTemplateUtils.TEMPLATE_VERSION;
logger.debug("--> getting marvel document for node id [{}]", nodeId); logger.debug("--> getting marvel document for node id [{}]", nodeId);
assertThat(client().prepareGet(MarvelSettings.MARVEL_DATA_INDEX_NAME, ClusterStateCollector.NODE_TYPE, nodeId).get().isExists(), is(true)); assertThat(client().prepareGet(dataIndex, ClusterStateCollector.NODE_TYPE, nodeId).get().isExists(), is(true));
// checks that document is not indexed // checks that document is not indexed
assertHitCount(client().prepareSearch().setSize(0) assertHitCount(client().prepareSearch().setSize(0)

View File

@ -36,13 +36,13 @@ public class SecuredClientTests extends MarvelIntegTestCase {
assertAccessIsAllowed(securedClient.admin().indices().prepareDelete(MarvelSettings.MARVEL_INDICES_PREFIX)); assertAccessIsAllowed(securedClient.admin().indices().prepareDelete(MarvelSettings.MARVEL_INDICES_PREFIX));
assertAccessIsAllowed(securedClient.admin().indices().prepareCreate(MarvelSettings.MARVEL_INDICES_PREFIX + "test")); assertAccessIsAllowed(securedClient.admin().indices().prepareCreate(MarvelSettings.MARVEL_INDICES_PREFIX + "test"));
assertAccessIsAllowed(securedClient.admin().indices().preparePutTemplate("foo").setSource(MarvelTemplateUtils.loadDefaultTemplate())); assertAccessIsAllowed(securedClient.admin().indices().preparePutTemplate("foo").setSource(MarvelTemplateUtils.loadTimestampedIndexTemplate()));
assertAccessIsAllowed(securedClient.admin().indices().prepareGetTemplates("foo")); assertAccessIsAllowed(securedClient.admin().indices().prepareGetTemplates("foo"));
} }
public void testDeniedAccess() { public void testDeniedAccess() {
SecuredClient securedClient = internalCluster().getInstance(SecuredClient.class); SecuredClient securedClient = internalCluster().getInstance(SecuredClient.class);
assertAcked(securedClient.admin().indices().preparePutTemplate("foo").setSource(MarvelTemplateUtils.loadDefaultTemplate()).get()); assertAcked(securedClient.admin().indices().preparePutTemplate("foo").setSource(MarvelTemplateUtils.loadDataIndexTemplate()).get());
if (shieldEnabled) { if (shieldEnabled) {
assertAccessIsDenied(securedClient.admin().indices().prepareDeleteTemplate("foo")); assertAccessIsDenied(securedClient.admin().indices().prepareDeleteTemplate("foo"));

View File

@ -6,14 +6,10 @@
package org.elasticsearch.marvel.test; package org.elasticsearch.marvel.test;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexModule;
@ -70,11 +66,6 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
return super.buildTestCluster(scope, seed); return super.buildTestCluster(scope, seed);
} }
@Override
protected Set<String> excludeTemplates() {
return Collections.singleton(MarvelTemplateUtils.INDEX_TEMPLATE_NAME);
}
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder builder = Settings.builder() Settings.Builder builder = Settings.builder()
@ -212,63 +203,29 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
} }
} }
protected void assertMarvelTemplateInstalled() { protected void assertTemplateInstalled(String name) {
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get(); boolean found = false;
if (clusterStateResponse != null) { for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates().get().getIndexTemplates()) {
ClusterState state = clusterStateResponse.getState(); if (Regex.simpleMatch(name, template.getName())) {
MetaData md = state.getMetaData(); found = true;
}
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates().get();
for (IndexTemplateMetaData template : response.getIndexTemplates()) {
if (template.getName().equals(MarvelTemplateUtils.INDEX_TEMPLATE_NAME)) {
return;
} }
} }
fail("marvel template should exist"); assertTrue("failed to find a template matching [" + name + "]", found);
} }
protected void assertMarvelTemplateMissing() { protected void waitForMarvelTemplate(String name) throws Exception {
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(MarvelTemplateUtils.INDEX_TEMPLATE_NAME).get().getIndexTemplates()) {
if (template.getName().equals(MarvelTemplateUtils.INDEX_TEMPLATE_NAME)) {
fail("marvel template shouldn't exist");
}
}
}
protected void awaitMarvelTemplateInstalled() throws Exception {
assertBusy(new Runnable() { assertBusy(new Runnable() {
@Override @Override
public void run() { public void run() {
assertMarvelTemplateInstalled(); assertTemplateInstalled(name);
} }
}, 30, TimeUnit.SECONDS); }, 30, TimeUnit.SECONDS);
} }
protected void awaitMarvelTemplateInstalled(Version version) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
assertMarvelTemplateInstalled(version);
}
}, 30, TimeUnit.SECONDS);
}
protected void assertMarvelTemplateInstalled(Version version) {
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(MarvelTemplateUtils.INDEX_TEMPLATE_NAME).get().getIndexTemplates()) {
if (template.getName().equals(MarvelTemplateUtils.INDEX_TEMPLATE_NAME)) {
Version templateVersion = MarvelTemplateUtils.templateVersion(template);
if (templateVersion != null && templateVersion.id == version.id) {
return;
}
fail("did not find marvel template with expected version [" + version + "]. found version [" + templateVersion + "]");
}
}
fail("marvel template could not be found");
}
protected void waitForMarvelIndices() throws Exception { protected void waitForMarvelIndices() throws Exception {
awaitIndexExists(MarvelSettings.MARVEL_DATA_INDEX_NAME); String currentVersion = String.valueOf(MarvelTemplateUtils.TEMPLATE_VERSION);
awaitIndexExists(MarvelSettings.MARVEL_INDICES_PREFIX + "*"); awaitIndexExists(MarvelSettings.MARVEL_DATA_INDEX_PREFIX + currentVersion);
awaitIndexExists(MarvelSettings.MARVEL_INDICES_PREFIX + currentVersion + "-*");
assertBusy(new Runnable() { assertBusy(new Runnable() {
@Override @Override
public void run() { public void run() {