From 81dc0ee210a1c58754630b383545a9d7b5cab369 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 10 Sep 2015 10:54:32 +0200 Subject: [PATCH] Marvel: Use Version in index template Original commit: elastic/x-pack-elasticsearch@6ee330077377236999a6dfe354e82aa44acebaf2 --- .../marvel/agent/exporter/HttpESExporter.java | 281 ++++++++++++------ .../HttpESExporterUtils.java} | 104 +++---- .../main/resources/marvel_index_template.json | 3 +- .../agent/exporter/HttpESExporterTests.java | 22 ++ .../HttpESExporterUtilsTests.java} | 108 ++++--- 5 files changed, 344 insertions(+), 174 deletions(-) rename marvel/src/main/java/org/elasticsearch/marvel/agent/{support/AgentUtils.java => exporter/HttpESExporterUtils.java} (56%) rename marvel/src/test/java/org/elasticsearch/marvel/agent/{AgentUtilsTests.java => exporter/HttpESExporterUtilsTests.java} (58%) diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporter.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporter.java index 38893728404..613188c13dd 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporter.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporter.java @@ -5,7 +5,9 @@ */ package org.elasticsearch.marvel.agent.exporter; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.common.Base64; @@ -27,7 +29,6 @@ import org.elasticsearch.http.HttpServer; import org.elasticsearch.marvel.agent.renderer.Renderer; import org.elasticsearch.marvel.agent.renderer.RendererRegistry; import org.elasticsearch.marvel.agent.settings.MarvelSettings; -import org.elasticsearch.marvel.agent.support.AgentUtils; import org.elasticsearch.node.Node; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.node.settings.NodeSettingsService; @@ -91,6 +92,16 @@ public class HttpESExporter extends AbstractExporter implements final TimeValue bulkTimeout; volatile boolean checkedAndUploadedIndexTemplate = false; + volatile boolean supportedClusterVersion = false; + + /** Version of the built-in template **/ + final Version templateVersion; + + /** Minimum supported version of the remote template **/ + final Version minCompatibleTemplateVersion = Version.V_2_0_0_beta2; + + /** Minimum supported version of the remote marvel cluster **/ + final Version minCompatibleClusterVersion = Version.V_2_0_0_beta2; final ConnectionKeepAliveWorker keepAliveWorker; Thread keepAliveThread; @@ -140,20 +151,27 @@ public class HttpESExporter extends AbstractExporter implements } hostnameVerification = settings.getAsBoolean(SETTINGS_SSL_HOSTNAME_VERIFICATION, true); - logger.debug("initialized with targets: {}, index prefix [{}], index time format [{}]", - AgentUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts)), MarvelSettings.MARVEL_INDICES_PREFIX, indexTimeFormat); + // Checks that the built-in template is versioned + templateVersion = HttpESExporterUtils.parseTemplateVersion(HttpESExporterUtils.loadDefaultTemplate()); + if (templateVersion == null) { + throw new IllegalStateException("unable to find built-in template version"); + } + + logger.debug("initialized with targets: {}, index prefix [{}], index time format [{}], template version [{}]", + HttpESExporterUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts)), + MarvelSettings.MARVEL_INDICES_PREFIX, indexTimeFormat, templateVersion); } static private void validateHosts(String[] hosts) { for (String host : hosts) { try { - AgentUtils.parseHostWithPath(host, ""); + HttpESExporterUtils.parseHostWithPath(host, ""); } catch (URISyntaxException e) { - throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + AgentUtils.santizeUrlPwds(host) + "]." + - " error: [" + AgentUtils.santizeUrlPwds(e.getMessage()) + "]"); + throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + HttpESExporterUtils.santizeUrlPwds(host) + "]." + + " error: [" + HttpESExporterUtils.santizeUrlPwds(e.getMessage()) + "]"); } catch (MalformedURLException e) { - throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + AgentUtils.santizeUrlPwds(host) + "]." + - " error: [" + AgentUtils.santizeUrlPwds(e.getMessage()) + "]"); + throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + HttpESExporterUtils.santizeUrlPwds(host) + "]." + + " error: [" + HttpESExporterUtils.santizeUrlPwds(e.getMessage()) + "]"); } } } @@ -279,7 +297,7 @@ public class HttpESExporter extends AbstractExporter implements try { sendCloseExportingConnection(connection); } catch (IOException e) { - logger.error("error sending data to [{}]: {}", AgentUtils.santizeUrlPwds(connection.getURL()), AgentUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(e))); + logger.error("error sending data to [{}]: {}", HttpESExporterUtils.santizeUrlPwds(connection.getURL()), HttpESExporterUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(e))); } } } @@ -320,15 +338,6 @@ public class HttpESExporter extends AbstractExporter implements } - /** - * open a connection to any host, validating it has the template installed if needed - * - * @return a url connection to the selected host or null if no current host is available. - */ - private HttpURLConnection openAndValidateConnection(String method, String path) { - return openAndValidateConnection(method, path, null); - } - /** * open a connection to any host, validating it has the template installed if needed * @@ -355,7 +364,7 @@ public class HttpESExporter extends AbstractExporter implements return null; } - String[] extractedHosts = AgentUtils.extractHostsFromAddress(boundAddress, logger); + String[] extractedHosts = HttpESExporterUtils.extractHostsFromAddress(boundAddress, logger); if (extractedHosts == null || extractedHosts.length == 0) { return null; } @@ -375,6 +384,24 @@ public class HttpESExporter extends AbstractExporter implements try { for (; hostIndex < hosts.length; hostIndex++) { String host = hosts[hostIndex]; + if (!supportedClusterVersion) { + try { + Version remoteVersion = loadRemoteClusterVersion(host); + if (remoteVersion == null) { + logger.warn("unable to check remote cluster version: no version found on host [" + HttpESExporterUtils.santizeUrlPwds(host) + "]"); + continue; + } + supportedClusterVersion = remoteVersion.onOrAfter(minCompatibleClusterVersion); + if (!supportedClusterVersion) { + logger.error("remote cluster version [" + remoteVersion + "] is not supported, please use a cluster with minimum version [" + minCompatibleClusterVersion + "]"); + continue; + } + } catch (ElasticsearchException e) { + logger.error("exception when checking remote cluster version on host [{}]", e, HttpESExporterUtils.santizeUrlPwds(host)); + continue; + } + } + if (!checkedAndUploadedIndexTemplate) { // check templates first on the host checkedAndUploadedIndexTemplate = checkAndUploadIndexTemplate(host); @@ -386,9 +413,10 @@ public class HttpESExporter extends AbstractExporter implements if (connection != null) { return connection; } - // failed hosts - reset template check , someone may have restarted the target cluster and deleted + // failed hosts - reset template & cluster versions check, someone may have restarted the target cluster and deleted // it's data folder. be safe. checkedAndUploadedIndexTemplate = false; + supportedClusterVersion = false; } } finally { if (hostIndex > 0 && hostIndex < hosts.length) { @@ -397,11 +425,11 @@ public class HttpESExporter extends AbstractExporter implements System.arraycopy(hosts, hostIndex, newHosts, 0, hosts.length - hostIndex); System.arraycopy(hosts, 0, newHosts, hosts.length - hostIndex, hostIndex); hosts = newHosts; - logger.debug("preferred target host is now [{}]", AgentUtils.santizeUrlPwds(hosts[0])); + logger.debug("preferred target host is now [{}]", HttpESExporterUtils.santizeUrlPwds(hosts[0])); } } - logger.error("could not connect to any configured elasticsearch instances: [{}]", AgentUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts))); + logger.error("could not connect to any configured elasticsearch instances: [{}]", HttpESExporterUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts))); return null; @@ -410,7 +438,7 @@ public class HttpESExporter extends AbstractExporter implements /** open a connection to the given hosts, returning null when not successful * */ private HttpURLConnection openConnection(String host, String method, String path, @Nullable String contentType) { try { - final URL url = AgentUtils.parseHostWithPath(host, path); + final URL url = HttpESExporterUtils.parseHostWithPath(host, path); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); if (conn instanceof HttpsURLConnection && sslSocketFactory != null) { @@ -439,20 +467,48 @@ public class HttpESExporter extends AbstractExporter implements return conn; } catch (URISyntaxException e) { - logErrorBasedOnLevel(e, "error parsing host [{}]", AgentUtils.santizeUrlPwds(host)); + logErrorBasedOnLevel(e, "error parsing host [{}]", HttpESExporterUtils.santizeUrlPwds(host)); } catch (IOException e) { - logErrorBasedOnLevel(e, "error connecting to [{}]", AgentUtils.santizeUrlPwds(host)); + logErrorBasedOnLevel(e, "error connecting to [{}]", HttpESExporterUtils.santizeUrlPwds(host)); } return null; } private void logErrorBasedOnLevel(Throwable t, String msg, Object... params) { - logger.error(msg + " [" + AgentUtils.santizeUrlPwds(t.getMessage()) + "]", params); + logger.error(msg + " [" + HttpESExporterUtils.santizeUrlPwds(t.getMessage()) + "]", params); if (logger.isDebugEnabled()) { - logger.debug(msg + ". full error details:\n[{}]", params, AgentUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(t))); + logger.debug(msg + ". full error details:\n[{}]", params, HttpESExporterUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(t))); } } + /** + * Get the version of the remote Marvel cluster + */ + Version loadRemoteClusterVersion(final String host) { + HttpURLConnection connection = null; + try { + connection = openConnection(host, "GET", "/", null); + if (connection == null) { + throw new ElasticsearchException("unable to check remote cluster version: no available connection for host [" + HttpESExporterUtils.santizeUrlPwds(host) + "]"); + } + + try (InputStream is = connection.getInputStream()) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Streams.copy(is, out); + return HttpESExporterUtils.parseElasticsearchVersion(out.toByteArray()); + } + } catch (IOException e) { + throw new ElasticsearchException("failed to verify the remote cluster version on host [" + HttpESExporterUtils.santizeUrlPwds(host) + "]:\n" + HttpESExporterUtils.santizeUrlPwds(e.getMessage())); + } finally { + if (connection != null) { + try { + connection.getInputStream().close(); + } catch (IOException e) { + // Ignore + } + } + } + } /** * Checks if the index templates already exist and if not uploads it @@ -461,70 +517,118 @@ public class HttpESExporter extends AbstractExporter implements * @return true if template exists or was uploaded successfully. */ private boolean checkAndUploadIndexTemplate(final String host) { - byte[] template; - try (InputStream is = getClass().getResourceAsStream("/marvel_index_template.json")) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Streams.copy(is, out); - template = out.toByteArray(); - } catch (IOException e) { - // throwing an exception to stop exporting process - we don't want to send data unless - // we put in the template for it. - throw new RuntimeException("failed to load marvel_index_template.json", e); + boolean updateTemplate = true; + + String url = "_template/marvel"; + if (templateCheckTimeout != null) { + url += "?timeout=" + templateCheckTimeout; } + HttpURLConnection connection = null; try { - int expectedVersion = AgentUtils.parseIndexVersionFromTemplate(template); - if (expectedVersion < 0) { - throw new RuntimeException("failed to find an index version in pre-configured index template"); - } - - String queryString = ""; - if (templateCheckTimeout != null) { - queryString = "?timeout=" + templateCheckTimeout; - } - HttpURLConnection conn = openConnection(host, "GET", "_template/marvel" + queryString, null); - if (conn == null) { + logger.debug("checking if marvel template exists on the marvel cluster"); + connection = openConnection(host, "GET", url, null); + if (connection == null) { + logger.debug("no available connection to check marvel template existence"); return false; } - boolean hasTemplate = false; - if (conn.getResponseCode() == 200) { - // verify content. - InputStream is = conn.getInputStream(); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Streams.copy(is, out); - byte[] existingTemplate = out.toByteArray(); - is.close(); - int foundVersion = AgentUtils.parseIndexVersionFromTemplate(existingTemplate); - if (foundVersion < 0) { - logger.warn("found an existing index template but couldn't extract it's version. leaving it as is."); - hasTemplate = true; - } else if (foundVersion >= expectedVersion) { - logger.debug("accepting existing index template (version [{}], needed [{}])", foundVersion, expectedVersion); - hasTemplate = true; - } else { - logger.debug("replacing existing index template (version [{}], needed [{}])", foundVersion, expectedVersion); - } - } - // nothing there, lets create it - if (!hasTemplate) { - logger.debug("uploading index template"); - conn = openConnection(host, "PUT", "_template/marvel" + queryString, XContentType.JSON.restContentType()); - OutputStream os = conn.getOutputStream(); - Streams.copy(template, os); - if (!(conn.getResponseCode() == 200 || conn.getResponseCode() == 201)) { - logConnectionError("error adding the marvel template to [" + host + "]", conn); - } else { - hasTemplate = true; - } - conn.getInputStream().close(); // close and release to connection pool. - } + // 200 means that the template has been found, 404 otherwise + if (connection.getResponseCode() == 200) { + logger.debug("marvel template found, checking its version"); - return hasTemplate; + byte[] remoteTemplate; + try (InputStream is = connection.getInputStream()) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Streams.copy(is, out); + remoteTemplate = out.toByteArray(); + } + + if ((remoteTemplate == null) || (remoteTemplate.length == 0)) { + logger.error("unable to load remote marvel template on host [{}]", HttpESExporterUtils.santizeUrlPwds(host)); + return false; + } + + Version remoteVersion = HttpESExporterUtils.parseTemplateVersion(remoteTemplate); + logger.debug("detected existing remote template in version [{}] on host [{}]", remoteVersion, HttpESExporterUtils.santizeUrlPwds(host)); + + if (remoteVersion == null) { + logger.warn("marvel template version cannot be found: template will be updated to version [{}]", templateVersion); + } else { + + if (remoteVersion.before(minCompatibleTemplateVersion)) { + logger.error("marvel template version [{}] is below the minimum compatible version [{}] on host [{}]: " + + "please manually update the marvel template to a more recent version" + + "and delete the current active marvel index (don't forget to back up it first if needed)", + remoteVersion, minCompatibleTemplateVersion, HttpESExporterUtils.santizeUrlPwds(host)); + return false; + } + + // Compares the remote template version with the built-in template + if (templateVersion.after(remoteVersion)) { + logger.info("marvel template version will be updated to a newer version [remote:{}, built-in:{}]", remoteVersion, templateVersion); + updateTemplate = true; + + } else if (templateVersion.equals(remoteVersion)) { + logger.debug("marvel template version is up-to-date [remote:{}, built-in:{}]", remoteVersion, templateVersion); + // Always update a snapshot version + updateTemplate = templateVersion.snapshot(); + } else { + logger.debug("marvel template version is newer than the one required by the marvel agent [remote:{}, built-in:{}]", remoteVersion, templateVersion); + updateTemplate = false; + } + } + } } catch (IOException e) { - logger.error("failed to verify/upload the marvel template to [{}]:\n{}", AgentUtils.santizeUrlPwds(host), AgentUtils.santizeUrlPwds(e.getMessage())); + logger.error("failed to verify the marvel template to [{}]:\n{}", HttpESExporterUtils.santizeUrlPwds(host), HttpESExporterUtils.santizeUrlPwds(e.getMessage())); return false; + } finally { + if (connection != null) { + try { + connection.getInputStream().close(); + } catch (IOException e) { + // Ignore + } + } } + + if (updateTemplate) { + try { + connection = openConnection(host, "PUT", url, XContentType.JSON.restContentType()); + + if (connection == null) { + logger.debug("no available connection to update marvel template"); + return false; + } + + logger.debug("loading marvel pre-configured template"); + byte[] template = HttpESExporterUtils.loadDefaultTemplate(); + + // Uploads the template and closes the outputstream + Streams.copy(template, connection.getOutputStream()); + + if (!(connection.getResponseCode() == 200 || connection.getResponseCode() == 201)) { + logConnectionError("error adding the marvel template to [" + host + "]", connection); + return false; + } + + logger.info("marvel template updated to version [{}]", templateVersion); + } catch (IOException e) { + logger.error("failed to update the marvel template to [{}]:\n{}", HttpESExporterUtils.santizeUrlPwds(host), HttpESExporterUtils.santizeUrlPwds(e.getMessage())); + return false; + + } finally { + if (connection != null) { + try { + connection.getInputStream().close(); + } catch (IOException e) { + // Ignore + } + } + } + } + + return updateTemplate; } private void logConnectionError(String msg, HttpURLConnection conn) { @@ -537,11 +641,11 @@ public class HttpESExporter extends AbstractExporter implements try { logger.error("{} response code [{} {}]. content: [{}]", - AgentUtils.santizeUrlPwds(msg), conn.getResponseCode(), - AgentUtils.santizeUrlPwds(conn.getResponseMessage()), - AgentUtils.santizeUrlPwds(err)); + HttpESExporterUtils.santizeUrlPwds(msg), conn.getResponseCode(), + HttpESExporterUtils.santizeUrlPwds(conn.getResponseMessage()), + HttpESExporterUtils.santizeUrlPwds(err)); } catch (IOException e) { - logger.error("{}. connection had an error while reporting the error. tough life.", AgentUtils.santizeUrlPwds(msg)); + logger.error("{}. connection had an error while reporting the error. tough life.", HttpESExporterUtils.santizeUrlPwds(msg)); } } @@ -561,9 +665,10 @@ public class HttpESExporter extends AbstractExporter implements String[] newHosts = settings.getAsArray(SETTINGS_HOSTS, null); if (newHosts != null) { - logger.info("hosts set to [{}]", AgentUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(newHosts))); + logger.info("hosts set to [{}]", HttpESExporterUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(newHosts))); this.hosts = newHosts; this.checkedAndUploadedIndexTemplate = false; + this.supportedClusterVersion = false; this.boundToLocalNode = false; } @@ -612,7 +717,7 @@ public class HttpESExporter extends AbstractExporter implements } HttpURLConnection conn = openConnection(currentHosts[0], "GET", "", null); if (conn == null) { - logger.trace("keep alive thread shutting down. failed to open connection to current host [{}]", AgentUtils.santizeUrlPwds(currentHosts[0])); + logger.trace("keep alive thread shutting down. failed to open connection to current host [{}]", HttpESExporterUtils.santizeUrlPwds(currentHosts[0])); return; } else { conn.getInputStream().close(); // close and release to connection pool. @@ -621,7 +726,7 @@ public class HttpESExporter extends AbstractExporter implements // ignore, if closed, good.... } catch (Throwable t) { logger.debug("error in keep alive thread, shutting down (will be restarted after a successful connection has been made) {}", - AgentUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(t))); + HttpESExporterUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(t))); return; } } diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/support/AgentUtils.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterUtils.java similarity index 56% rename from marvel/src/main/java/org/elasticsearch/marvel/agent/support/AgentUtils.java rename to marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterUtils.java index a4e65c04ec7..b978178d964 100644 --- a/marvel/src/main/java/org/elasticsearch/marvel/agent/support/AgentUtils.java +++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterUtils.java @@ -3,61 +3,29 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.marvel.agent.support; +package org.elasticsearch.marvel.agent.exporter; -import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.Version; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.xcontent.XContentBuilder; +import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.UnsupportedEncodingException; +import java.io.InputStream; import java.net.*; -import java.util.Map; +import java.nio.charset.Charset; import java.util.regex.Matcher; import java.util.regex.Pattern; -public class AgentUtils { +public class HttpESExporterUtils { - public static XContentBuilder nodeToXContent(DiscoveryNode node, XContentBuilder builder) throws IOException { - return nodeToXContent(node, null, builder); - } - - public static XContentBuilder nodeToXContent(DiscoveryNode node, Boolean isMasterNode, XContentBuilder builder) throws IOException { - builder.field("id", node.id()); - builder.field("name", node.name()); - builder.field("transport_address", node.address()); - - if (node.address().uniqueAddressTypeId() == 1) { // InetSocket - InetSocketTransportAddress address = (InetSocketTransportAddress) node.address(); - InetSocketAddress inetSocketAddress = address.address(); - InetAddress inetAddress = inetSocketAddress.getAddress(); - if (inetAddress != null) { - builder.field("ip", NetworkAddress.formatAddress(inetAddress)); - builder.field("host", inetSocketAddress.getHostString()); - builder.field("ip_port", NetworkAddress.formatAddress(inetSocketAddress)); - } - } else if (node.address().uniqueAddressTypeId() == 2) { // local transport - builder.field("ip_port", "_" + node.address()); // will end up being "_local[ID]" - } - - builder.field("master_node", node.isMasterNode()); - builder.field("data_node", node.isDataNode()); - if (isMasterNode != null) { - builder.field("master", isMasterNode.booleanValue()); - } - - if (!node.attributes().isEmpty()) { - builder.startObject("attributes"); - for (Map.Entry attr : node.attributes().entrySet()) { - builder.field(attr.getKey(), attr.getValue()); - } - builder.endObject(); - } - return builder; - } + public static final String MARVEL_TEMPLATE_FILE = "/marvel_index_template.json"; + static final String MARVEL_VERSION_FIELD = "marvel_version"; + static final String VERSION_FIELD = "number"; public static String[] extractHostsFromAddress(BoundTransportAddress boundAddress, ESLogger logger) { if (boundAddress == null || boundAddress.boundAddress() == null) { @@ -108,16 +76,52 @@ public class AgentUtils { } - public static int parseIndexVersionFromTemplate(byte[] template) throws UnsupportedEncodingException { - Pattern versionRegex = Pattern.compile("marvel.index_format\"\\s*:\\s*\"?(\\d+)\"?"); - Matcher matcher = versionRegex.matcher(new String(template, "UTF-8")); - if (matcher.find()) { - return Integer.parseInt(matcher.group(1)); - } else { - return -1; + /** + * Loads the default Marvel template + */ + public static byte[] loadDefaultTemplate() { + try (InputStream is = HttpESExporterUtils.class.getResourceAsStream(MARVEL_TEMPLATE_FILE)) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Streams.copy(is, out); + return out.toByteArray(); + } catch (IOException e) { + throw new IllegalStateException("unable to load marvel template", e); } } + /** + * Extract & parse the version contained in the given template + */ + public static Version parseTemplateVersion(byte[] template) { + return parseTemplateVersion(new String(template, Charset.forName("UTF-8"))); + } + + /** + * Extract & parse the version contained in the given template + */ + public static Version parseTemplateVersion(String template) { + return parseVersion(MARVEL_VERSION_FIELD, template); + } + + /** + * Extract & parse the elasticsearch version, as returned by the REST API + */ + public static Version parseElasticsearchVersion(byte[] template) { + return parseVersion(VERSION_FIELD, new String(template, Charset.forName("UTF-8"))); + } + + static Version parseVersion(String field, String template) { + Pattern pattern = Pattern.compile(field + "\"\\s*:\\s*\"?([0-9a-zA-Z\\.\\-]+)\"?"); + Matcher matcher = pattern.matcher(template); + if (matcher.find()) { + String parsedVersion = matcher.group(1); + if (Strings.hasText(parsedVersion)) { + return Version.fromString(parsedVersion); + } + } + return null; + } + private static final String userInfoChars = "\\w-\\._~!$&\\'\\(\\)*+,;=%"; private static Pattern urlPwdSanitizer = Pattern.compile("([" + userInfoChars + "]+?):[" + userInfoChars + "]+?@"); diff --git a/marvel/src/main/resources/marvel_index_template.json b/marvel/src/main/resources/marvel_index_template.json index b3a19eefc01..8be78521cf5 100644 --- a/marvel/src/main/resources/marvel_index_template.json +++ b/marvel/src/main/resources/marvel_index_template.json @@ -1,8 +1,7 @@ { "template": ".marvel*", "settings": { - "marvel.index_format": 7, - "marvel.version": "${project.version}", + "marvel_version": "${project.version}", "index.number_of_shards": 1, "index.number_of_replicas": 1, "index.codec": "best_compression", diff --git a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterTests.java b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterTests.java index 55775451687..d275120dd79 100644 --- a/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterTests.java +++ b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.marvel.agent.exporter; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.cluster.ClusterState; @@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; // Transport Client instantiation also calls the marvel plugin, which then fails to find modules @@ -222,6 +224,26 @@ public class HttpESExporterTests extends ESIntegTestCase { assertMarvelTemplateExists(); } + @Test + public void testLoadRemoteClusterVersion() { + Settings.Builder builder = Settings.builder() + .put(MarvelSettings.STARTUP_DELAY, "200m") + .put(Node.HTTP_ENABLED, true); + String nodeId = internalCluster().startNode(builder); + + HttpESExporter httpEsExporter = getEsExporter(nodeId); + + logger.info("--> exporting events to force host resolution"); + httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc())); + + assertNotNull(httpEsExporter.getHosts()); + assertThat(httpEsExporter.getHosts().length, greaterThan(0)); + + logger.info("--> loading remote cluster version"); + Version resolved = httpEsExporter.loadRemoteClusterVersion(httpEsExporter.getHosts()[0]); + assertTrue(resolved.equals(Version.CURRENT)); + } + private HttpESExporter getEsExporter() { AgentService service = internalCluster().getInstance(AgentService.class); return (HttpESExporter) service.getExporters().iterator().next(); diff --git a/marvel/src/test/java/org/elasticsearch/marvel/agent/AgentUtilsTests.java b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterUtilsTests.java similarity index 58% rename from marvel/src/test/java/org/elasticsearch/marvel/agent/AgentUtilsTests.java rename to marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterUtilsTests.java index ef1b3d0351c..06ebe2ee30e 100644 --- a/marvel/src/test/java/org/elasticsearch/marvel/agent/AgentUtilsTests.java +++ b/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/HttpESExporterUtilsTests.java @@ -3,12 +3,10 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.marvel.agent; +package org.elasticsearch.marvel.agent.exporter; -import org.elasticsearch.marvel.agent.support.AgentUtils; +import org.elasticsearch.Version; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.StreamsUtils; -import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.Test; @@ -18,95 +16,137 @@ import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.URL; import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.List; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.not; -public class AgentUtilsTests extends ESTestCase { +public class HttpESExporterUtilsTests extends ESTestCase { @Test - public void testVersionIsExtractableFromIndexTemplate() throws IOException { - byte[] template = StreamsUtils.copyToBytesFromClasspath("/marvel_index_template.json"); - MatcherAssert.assertThat(AgentUtils.parseIndexVersionFromTemplate(template), Matchers.greaterThan(0)); + public void testLoadTemplate() { + byte[] template = HttpESExporterUtils.loadDefaultTemplate(); + assertNotNull(template); + assertThat(template.length, Matchers.greaterThan(0)); } + @Test + public void testParseTemplateVersionFromByteArrayTemplate() throws IOException { + byte[] template = HttpESExporterUtils.loadDefaultTemplate(); + assertNotNull(template); + + Version version = HttpESExporterUtils.parseTemplateVersion(template); + assertNotNull(version); + } + + @Test + public void testParseTemplateVersionFromStringTemplate() throws IOException { + List templates = new ArrayList<>(); + templates.add("{\"marvel_version\": \"1.4.0.Beta1\"}"); + templates.add("{\"marvel_version\": \"1.6.2-SNAPSHOT\"}"); + templates.add("{\"marvel_version\": \"1.7.1\"}"); + templates.add("{\"marvel_version\": \"2.0.0-beta1\"}"); + templates.add("{\"marvel_version\": \"2.0.0\"}"); + templates.add("{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }"); + + for (String template : templates) { + Version version = HttpESExporterUtils.parseTemplateVersion(template); + assertNotNull(version); + } + + Version version = HttpESExporterUtils.parseTemplateVersion("{\"marvel.index_format\": \"7\"}"); + assertNull(version); + } + + @Test + public void testParseVersion() throws IOException { + assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0-beta1\"}")); + assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0\"}")); + assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"1.5.2\"}")); + assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }")); + assertNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel.index_format\": \"7\"}")); + assertNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD + "unkown", "{\"marvel_version\": \"1.5.2\"}")); + } + + @Test public void testHostParsing() throws MalformedURLException, URISyntaxException { - URL url = AgentUtils.parseHostWithPath("localhost:9200", ""); + URL url = HttpESExporterUtils.parseHostWithPath("localhost:9200", ""); verifyUrl(url, "http", "localhost", 9200, "/"); - url = AgentUtils.parseHostWithPath("localhost", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("localhost", "_bulk"); verifyUrl(url, "http", "localhost", 9200, "/_bulk"); - url = AgentUtils.parseHostWithPath("http://localhost:9200", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("http://localhost:9200", "_bulk"); verifyUrl(url, "http", "localhost", 9200, "/_bulk"); - url = AgentUtils.parseHostWithPath("http://localhost", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("http://localhost", "_bulk"); verifyUrl(url, "http", "localhost", 9200, "/_bulk"); - url = AgentUtils.parseHostWithPath("https://localhost:9200", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("https://localhost:9200", "_bulk"); verifyUrl(url, "https", "localhost", 9200, "/_bulk"); - url = AgentUtils.parseHostWithPath("https://boaz-air.local:9200", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("https://boaz-air.local:9200", "_bulk"); verifyUrl(url, "https", "boaz-air.local", 9200, "/_bulk"); - url = AgentUtils.parseHostWithPath("boaz:test@localhost:9200", ""); + url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost:9200", ""); verifyUrl(url, "http", "localhost", 9200, "/", "boaz:test"); - url = AgentUtils.parseHostWithPath("boaz:test@localhost", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost", "_bulk"); verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test"); - url = AgentUtils.parseHostWithPath("http://boaz:test@localhost:9200", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost:9200", "_bulk"); verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test"); - url = AgentUtils.parseHostWithPath("http://boaz:test@localhost", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost", "_bulk"); verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test"); - url = AgentUtils.parseHostWithPath("https://boaz:test@localhost:9200", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("https://boaz:test@localhost:9200", "_bulk"); verifyUrl(url, "https", "localhost", 9200, "/_bulk", "boaz:test"); - url = AgentUtils.parseHostWithPath("boaz:test@localhost:9200/suburl", ""); + url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost:9200/suburl", ""); verifyUrl(url, "http", "localhost", 9200, "/suburl/", "boaz:test"); - url = AgentUtils.parseHostWithPath("boaz:test@localhost:9200/suburl/", ""); + url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost:9200/suburl/", ""); verifyUrl(url, "http", "localhost", 9200, "/suburl/", "boaz:test"); - url = AgentUtils.parseHostWithPath("localhost/suburl", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("localhost/suburl", "_bulk"); verifyUrl(url, "http", "localhost", 9200, "/suburl/_bulk"); - url = AgentUtils.parseHostWithPath("http://boaz:test@localhost:9200/suburl/suburl1", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost:9200/suburl/suburl1", "_bulk"); verifyUrl(url, "http", "localhost", 9200, "/suburl/suburl1/_bulk", "boaz:test"); - url = AgentUtils.parseHostWithPath("http://boaz:test@localhost/suburl", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost/suburl", "_bulk"); verifyUrl(url, "http", "localhost", 9200, "/suburl/_bulk", "boaz:test"); - url = AgentUtils.parseHostWithPath("https://boaz:test@localhost:9200/suburl", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("https://boaz:test@localhost:9200/suburl", "_bulk"); verifyUrl(url, "https", "localhost", 9200, "/suburl/_bulk", "boaz:test"); - url = AgentUtils.parseHostWithPath("https://user:test@server_with_underscore:9300", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("https://user:test@server_with_underscore:9300", "_bulk"); verifyUrl(url, "https", "server_with_underscore", 9300, "/_bulk", "user:test"); - url = AgentUtils.parseHostWithPath("user:test@server_with_underscore:9300", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("user:test@server_with_underscore:9300", "_bulk"); verifyUrl(url, "http", "server_with_underscore", 9300, "/_bulk", "user:test"); - url = AgentUtils.parseHostWithPath("server_with_underscore:9300", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("server_with_underscore:9300", "_bulk"); verifyUrl(url, "http", "server_with_underscore", 9300, "/_bulk"); - url = AgentUtils.parseHostWithPath("server_with_underscore", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("server_with_underscore", "_bulk"); verifyUrl(url, "http", "server_with_underscore", 9200, "/_bulk"); - url = AgentUtils.parseHostWithPath("https://user:test@server-dash:9300", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("https://user:test@server-dash:9300", "_bulk"); verifyUrl(url, "https", "server-dash", 9300, "/_bulk", "user:test"); - url = AgentUtils.parseHostWithPath("user:test@server-dash:9300", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("user:test@server-dash:9300", "_bulk"); verifyUrl(url, "http", "server-dash", 9300, "/_bulk", "user:test"); - url = AgentUtils.parseHostWithPath("server-dash:9300", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("server-dash:9300", "_bulk"); verifyUrl(url, "http", "server-dash", 9300, "/_bulk"); - url = AgentUtils.parseHostWithPath("server-dash", "_bulk"); + url = HttpESExporterUtils.parseHostWithPath("server-dash", "_bulk"); verifyUrl(url, "http", "server-dash", 9200, "/_bulk"); } @@ -148,7 +188,7 @@ public class AgentUtilsTests extends ESTestCase { }; for (String input : inputs) { - String sanitized = AgentUtils.santizeUrlPwds(input); + String sanitized = HttpESExporterUtils.santizeUrlPwds(input); assertThat(sanitized, not(containsString(pwd))); } }