Marvel: Use Version in index template
Original commit: elastic/x-pack-elasticsearch@6ee3300773
This commit is contained in:
parent
ba13e3bf1c
commit
81dc0ee210
|
@ -5,7 +5,9 @@
|
|||
*/
|
||||
package org.elasticsearch.marvel.agent.exporter;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.common.Base64;
|
||||
|
@ -27,7 +29,6 @@ import org.elasticsearch.http.HttpServer;
|
|||
import org.elasticsearch.marvel.agent.renderer.Renderer;
|
||||
import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
|
||||
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
|
||||
import org.elasticsearch.marvel.agent.support.AgentUtils;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.service.NodeService;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
|
@ -91,6 +92,16 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
final TimeValue bulkTimeout;
|
||||
|
||||
volatile boolean checkedAndUploadedIndexTemplate = false;
|
||||
volatile boolean supportedClusterVersion = false;
|
||||
|
||||
/** Version of the built-in template **/
|
||||
final Version templateVersion;
|
||||
|
||||
/** Minimum supported version of the remote template **/
|
||||
final Version minCompatibleTemplateVersion = Version.V_2_0_0_beta2;
|
||||
|
||||
/** Minimum supported version of the remote marvel cluster **/
|
||||
final Version minCompatibleClusterVersion = Version.V_2_0_0_beta2;
|
||||
|
||||
final ConnectionKeepAliveWorker keepAliveWorker;
|
||||
Thread keepAliveThread;
|
||||
|
@ -140,20 +151,27 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
}
|
||||
hostnameVerification = settings.getAsBoolean(SETTINGS_SSL_HOSTNAME_VERIFICATION, true);
|
||||
|
||||
logger.debug("initialized with targets: {}, index prefix [{}], index time format [{}]",
|
||||
AgentUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts)), MarvelSettings.MARVEL_INDICES_PREFIX, indexTimeFormat);
|
||||
// Checks that the built-in template is versioned
|
||||
templateVersion = HttpESExporterUtils.parseTemplateVersion(HttpESExporterUtils.loadDefaultTemplate());
|
||||
if (templateVersion == null) {
|
||||
throw new IllegalStateException("unable to find built-in template version");
|
||||
}
|
||||
|
||||
logger.debug("initialized with targets: {}, index prefix [{}], index time format [{}], template version [{}]",
|
||||
HttpESExporterUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts)),
|
||||
MarvelSettings.MARVEL_INDICES_PREFIX, indexTimeFormat, templateVersion);
|
||||
}
|
||||
|
||||
static private void validateHosts(String[] hosts) {
|
||||
for (String host : hosts) {
|
||||
try {
|
||||
AgentUtils.parseHostWithPath(host, "");
|
||||
HttpESExporterUtils.parseHostWithPath(host, "");
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + AgentUtils.santizeUrlPwds(host) + "]." +
|
||||
" error: [" + AgentUtils.santizeUrlPwds(e.getMessage()) + "]");
|
||||
throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + HttpESExporterUtils.santizeUrlPwds(host) + "]." +
|
||||
" error: [" + HttpESExporterUtils.santizeUrlPwds(e.getMessage()) + "]");
|
||||
} catch (MalformedURLException e) {
|
||||
throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + AgentUtils.santizeUrlPwds(host) + "]." +
|
||||
" error: [" + AgentUtils.santizeUrlPwds(e.getMessage()) + "]");
|
||||
throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + HttpESExporterUtils.santizeUrlPwds(host) + "]." +
|
||||
" error: [" + HttpESExporterUtils.santizeUrlPwds(e.getMessage()) + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -279,7 +297,7 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
try {
|
||||
sendCloseExportingConnection(connection);
|
||||
} catch (IOException e) {
|
||||
logger.error("error sending data to [{}]: {}", AgentUtils.santizeUrlPwds(connection.getURL()), AgentUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(e)));
|
||||
logger.error("error sending data to [{}]: {}", HttpESExporterUtils.santizeUrlPwds(connection.getURL()), HttpESExporterUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -320,15 +338,6 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* open a connection to any host, validating it has the template installed if needed
|
||||
*
|
||||
* @return a url connection to the selected host or null if no current host is available.
|
||||
*/
|
||||
private HttpURLConnection openAndValidateConnection(String method, String path) {
|
||||
return openAndValidateConnection(method, path, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* open a connection to any host, validating it has the template installed if needed
|
||||
*
|
||||
|
@ -355,7 +364,7 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
return null;
|
||||
}
|
||||
|
||||
String[] extractedHosts = AgentUtils.extractHostsFromAddress(boundAddress, logger);
|
||||
String[] extractedHosts = HttpESExporterUtils.extractHostsFromAddress(boundAddress, logger);
|
||||
if (extractedHosts == null || extractedHosts.length == 0) {
|
||||
return null;
|
||||
}
|
||||
|
@ -375,6 +384,24 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
try {
|
||||
for (; hostIndex < hosts.length; hostIndex++) {
|
||||
String host = hosts[hostIndex];
|
||||
if (!supportedClusterVersion) {
|
||||
try {
|
||||
Version remoteVersion = loadRemoteClusterVersion(host);
|
||||
if (remoteVersion == null) {
|
||||
logger.warn("unable to check remote cluster version: no version found on host [" + HttpESExporterUtils.santizeUrlPwds(host) + "]");
|
||||
continue;
|
||||
}
|
||||
supportedClusterVersion = remoteVersion.onOrAfter(minCompatibleClusterVersion);
|
||||
if (!supportedClusterVersion) {
|
||||
logger.error("remote cluster version [" + remoteVersion + "] is not supported, please use a cluster with minimum version [" + minCompatibleClusterVersion + "]");
|
||||
continue;
|
||||
}
|
||||
} catch (ElasticsearchException e) {
|
||||
logger.error("exception when checking remote cluster version on host [{}]", e, HttpESExporterUtils.santizeUrlPwds(host));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (!checkedAndUploadedIndexTemplate) {
|
||||
// check templates first on the host
|
||||
checkedAndUploadedIndexTemplate = checkAndUploadIndexTemplate(host);
|
||||
|
@ -386,9 +413,10 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
if (connection != null) {
|
||||
return connection;
|
||||
}
|
||||
// failed hosts - reset template check , someone may have restarted the target cluster and deleted
|
||||
// failed hosts - reset template & cluster versions check, someone may have restarted the target cluster and deleted
|
||||
// it's data folder. be safe.
|
||||
checkedAndUploadedIndexTemplate = false;
|
||||
supportedClusterVersion = false;
|
||||
}
|
||||
} finally {
|
||||
if (hostIndex > 0 && hostIndex < hosts.length) {
|
||||
|
@ -397,11 +425,11 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
System.arraycopy(hosts, hostIndex, newHosts, 0, hosts.length - hostIndex);
|
||||
System.arraycopy(hosts, 0, newHosts, hosts.length - hostIndex, hostIndex);
|
||||
hosts = newHosts;
|
||||
logger.debug("preferred target host is now [{}]", AgentUtils.santizeUrlPwds(hosts[0]));
|
||||
logger.debug("preferred target host is now [{}]", HttpESExporterUtils.santizeUrlPwds(hosts[0]));
|
||||
}
|
||||
}
|
||||
|
||||
logger.error("could not connect to any configured elasticsearch instances: [{}]", AgentUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts)));
|
||||
logger.error("could not connect to any configured elasticsearch instances: [{}]", HttpESExporterUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts)));
|
||||
|
||||
return null;
|
||||
|
||||
|
@ -410,7 +438,7 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
/** open a connection to the given hosts, returning null when not successful * */
|
||||
private HttpURLConnection openConnection(String host, String method, String path, @Nullable String contentType) {
|
||||
try {
|
||||
final URL url = AgentUtils.parseHostWithPath(host, path);
|
||||
final URL url = HttpESExporterUtils.parseHostWithPath(host, path);
|
||||
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
||||
|
||||
if (conn instanceof HttpsURLConnection && sslSocketFactory != null) {
|
||||
|
@ -439,20 +467,48 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
|
||||
return conn;
|
||||
} catch (URISyntaxException e) {
|
||||
logErrorBasedOnLevel(e, "error parsing host [{}]", AgentUtils.santizeUrlPwds(host));
|
||||
logErrorBasedOnLevel(e, "error parsing host [{}]", HttpESExporterUtils.santizeUrlPwds(host));
|
||||
} catch (IOException e) {
|
||||
logErrorBasedOnLevel(e, "error connecting to [{}]", AgentUtils.santizeUrlPwds(host));
|
||||
logErrorBasedOnLevel(e, "error connecting to [{}]", HttpESExporterUtils.santizeUrlPwds(host));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void logErrorBasedOnLevel(Throwable t, String msg, Object... params) {
|
||||
logger.error(msg + " [" + AgentUtils.santizeUrlPwds(t.getMessage()) + "]", params);
|
||||
logger.error(msg + " [" + HttpESExporterUtils.santizeUrlPwds(t.getMessage()) + "]", params);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(msg + ". full error details:\n[{}]", params, AgentUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(t)));
|
||||
logger.debug(msg + ". full error details:\n[{}]", params, HttpESExporterUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(t)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the version of the remote Marvel cluster
|
||||
*/
|
||||
Version loadRemoteClusterVersion(final String host) {
|
||||
HttpURLConnection connection = null;
|
||||
try {
|
||||
connection = openConnection(host, "GET", "/", null);
|
||||
if (connection == null) {
|
||||
throw new ElasticsearchException("unable to check remote cluster version: no available connection for host [" + HttpESExporterUtils.santizeUrlPwds(host) + "]");
|
||||
}
|
||||
|
||||
try (InputStream is = connection.getInputStream()) {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
Streams.copy(is, out);
|
||||
return HttpESExporterUtils.parseElasticsearchVersion(out.toByteArray());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchException("failed to verify the remote cluster version on host [" + HttpESExporterUtils.santizeUrlPwds(host) + "]:\n" + HttpESExporterUtils.santizeUrlPwds(e.getMessage()));
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
try {
|
||||
connection.getInputStream().close();
|
||||
} catch (IOException e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the index templates already exist and if not uploads it
|
||||
|
@ -461,70 +517,118 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
* @return true if template exists or was uploaded successfully.
|
||||
*/
|
||||
private boolean checkAndUploadIndexTemplate(final String host) {
|
||||
byte[] template;
|
||||
try (InputStream is = getClass().getResourceAsStream("/marvel_index_template.json")) {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
Streams.copy(is, out);
|
||||
template = out.toByteArray();
|
||||
} catch (IOException e) {
|
||||
// throwing an exception to stop exporting process - we don't want to send data unless
|
||||
// we put in the template for it.
|
||||
throw new RuntimeException("failed to load marvel_index_template.json", e);
|
||||
boolean updateTemplate = true;
|
||||
|
||||
String url = "_template/marvel";
|
||||
if (templateCheckTimeout != null) {
|
||||
url += "?timeout=" + templateCheckTimeout;
|
||||
}
|
||||
|
||||
HttpURLConnection connection = null;
|
||||
try {
|
||||
int expectedVersion = AgentUtils.parseIndexVersionFromTemplate(template);
|
||||
if (expectedVersion < 0) {
|
||||
throw new RuntimeException("failed to find an index version in pre-configured index template");
|
||||
}
|
||||
|
||||
String queryString = "";
|
||||
if (templateCheckTimeout != null) {
|
||||
queryString = "?timeout=" + templateCheckTimeout;
|
||||
}
|
||||
HttpURLConnection conn = openConnection(host, "GET", "_template/marvel" + queryString, null);
|
||||
if (conn == null) {
|
||||
logger.debug("checking if marvel template exists on the marvel cluster");
|
||||
connection = openConnection(host, "GET", url, null);
|
||||
if (connection == null) {
|
||||
logger.debug("no available connection to check marvel template existence");
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean hasTemplate = false;
|
||||
if (conn.getResponseCode() == 200) {
|
||||
// verify content.
|
||||
InputStream is = conn.getInputStream();
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
Streams.copy(is, out);
|
||||
byte[] existingTemplate = out.toByteArray();
|
||||
is.close();
|
||||
int foundVersion = AgentUtils.parseIndexVersionFromTemplate(existingTemplate);
|
||||
if (foundVersion < 0) {
|
||||
logger.warn("found an existing index template but couldn't extract it's version. leaving it as is.");
|
||||
hasTemplate = true;
|
||||
} else if (foundVersion >= expectedVersion) {
|
||||
logger.debug("accepting existing index template (version [{}], needed [{}])", foundVersion, expectedVersion);
|
||||
hasTemplate = true;
|
||||
} else {
|
||||
logger.debug("replacing existing index template (version [{}], needed [{}])", foundVersion, expectedVersion);
|
||||
}
|
||||
}
|
||||
// nothing there, lets create it
|
||||
if (!hasTemplate) {
|
||||
logger.debug("uploading index template");
|
||||
conn = openConnection(host, "PUT", "_template/marvel" + queryString, XContentType.JSON.restContentType());
|
||||
OutputStream os = conn.getOutputStream();
|
||||
Streams.copy(template, os);
|
||||
if (!(conn.getResponseCode() == 200 || conn.getResponseCode() == 201)) {
|
||||
logConnectionError("error adding the marvel template to [" + host + "]", conn);
|
||||
} else {
|
||||
hasTemplate = true;
|
||||
}
|
||||
conn.getInputStream().close(); // close and release to connection pool.
|
||||
}
|
||||
// 200 means that the template has been found, 404 otherwise
|
||||
if (connection.getResponseCode() == 200) {
|
||||
logger.debug("marvel template found, checking its version");
|
||||
|
||||
return hasTemplate;
|
||||
byte[] remoteTemplate;
|
||||
try (InputStream is = connection.getInputStream()) {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
Streams.copy(is, out);
|
||||
remoteTemplate = out.toByteArray();
|
||||
}
|
||||
|
||||
if ((remoteTemplate == null) || (remoteTemplate.length == 0)) {
|
||||
logger.error("unable to load remote marvel template on host [{}]", HttpESExporterUtils.santizeUrlPwds(host));
|
||||
return false;
|
||||
}
|
||||
|
||||
Version remoteVersion = HttpESExporterUtils.parseTemplateVersion(remoteTemplate);
|
||||
logger.debug("detected existing remote template in version [{}] on host [{}]", remoteVersion, HttpESExporterUtils.santizeUrlPwds(host));
|
||||
|
||||
if (remoteVersion == null) {
|
||||
logger.warn("marvel template version cannot be found: template will be updated to version [{}]", templateVersion);
|
||||
} else {
|
||||
|
||||
if (remoteVersion.before(minCompatibleTemplateVersion)) {
|
||||
logger.error("marvel template version [{}] is below the minimum compatible version [{}] on host [{}]: "
|
||||
+ "please manually update the marvel template to a more recent version"
|
||||
+ "and delete the current active marvel index (don't forget to back up it first if needed)",
|
||||
remoteVersion, minCompatibleTemplateVersion, HttpESExporterUtils.santizeUrlPwds(host));
|
||||
return false;
|
||||
}
|
||||
|
||||
// Compares the remote template version with the built-in template
|
||||
if (templateVersion.after(remoteVersion)) {
|
||||
logger.info("marvel template version will be updated to a newer version [remote:{}, built-in:{}]", remoteVersion, templateVersion);
|
||||
updateTemplate = true;
|
||||
|
||||
} else if (templateVersion.equals(remoteVersion)) {
|
||||
logger.debug("marvel template version is up-to-date [remote:{}, built-in:{}]", remoteVersion, templateVersion);
|
||||
// Always update a snapshot version
|
||||
updateTemplate = templateVersion.snapshot();
|
||||
} else {
|
||||
logger.debug("marvel template version is newer than the one required by the marvel agent [remote:{}, built-in:{}]", remoteVersion, templateVersion);
|
||||
updateTemplate = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("failed to verify/upload the marvel template to [{}]:\n{}", AgentUtils.santizeUrlPwds(host), AgentUtils.santizeUrlPwds(e.getMessage()));
|
||||
logger.error("failed to verify the marvel template to [{}]:\n{}", HttpESExporterUtils.santizeUrlPwds(host), HttpESExporterUtils.santizeUrlPwds(e.getMessage()));
|
||||
return false;
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
try {
|
||||
connection.getInputStream().close();
|
||||
} catch (IOException e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (updateTemplate) {
|
||||
try {
|
||||
connection = openConnection(host, "PUT", url, XContentType.JSON.restContentType());
|
||||
|
||||
if (connection == null) {
|
||||
logger.debug("no available connection to update marvel template");
|
||||
return false;
|
||||
}
|
||||
|
||||
logger.debug("loading marvel pre-configured template");
|
||||
byte[] template = HttpESExporterUtils.loadDefaultTemplate();
|
||||
|
||||
// Uploads the template and closes the outputstream
|
||||
Streams.copy(template, connection.getOutputStream());
|
||||
|
||||
if (!(connection.getResponseCode() == 200 || connection.getResponseCode() == 201)) {
|
||||
logConnectionError("error adding the marvel template to [" + host + "]", connection);
|
||||
return false;
|
||||
}
|
||||
|
||||
logger.info("marvel template updated to version [{}]", templateVersion);
|
||||
} catch (IOException e) {
|
||||
logger.error("failed to update the marvel template to [{}]:\n{}", HttpESExporterUtils.santizeUrlPwds(host), HttpESExporterUtils.santizeUrlPwds(e.getMessage()));
|
||||
return false;
|
||||
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
try {
|
||||
connection.getInputStream().close();
|
||||
} catch (IOException e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return updateTemplate;
|
||||
}
|
||||
|
||||
private void logConnectionError(String msg, HttpURLConnection conn) {
|
||||
|
@ -537,11 +641,11 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
|
||||
try {
|
||||
logger.error("{} response code [{} {}]. content: [{}]",
|
||||
AgentUtils.santizeUrlPwds(msg), conn.getResponseCode(),
|
||||
AgentUtils.santizeUrlPwds(conn.getResponseMessage()),
|
||||
AgentUtils.santizeUrlPwds(err));
|
||||
HttpESExporterUtils.santizeUrlPwds(msg), conn.getResponseCode(),
|
||||
HttpESExporterUtils.santizeUrlPwds(conn.getResponseMessage()),
|
||||
HttpESExporterUtils.santizeUrlPwds(err));
|
||||
} catch (IOException e) {
|
||||
logger.error("{}. connection had an error while reporting the error. tough life.", AgentUtils.santizeUrlPwds(msg));
|
||||
logger.error("{}. connection had an error while reporting the error. tough life.", HttpESExporterUtils.santizeUrlPwds(msg));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -561,9 +665,10 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
|
||||
String[] newHosts = settings.getAsArray(SETTINGS_HOSTS, null);
|
||||
if (newHosts != null) {
|
||||
logger.info("hosts set to [{}]", AgentUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(newHosts)));
|
||||
logger.info("hosts set to [{}]", HttpESExporterUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(newHosts)));
|
||||
this.hosts = newHosts;
|
||||
this.checkedAndUploadedIndexTemplate = false;
|
||||
this.supportedClusterVersion = false;
|
||||
this.boundToLocalNode = false;
|
||||
}
|
||||
|
||||
|
@ -612,7 +717,7 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
}
|
||||
HttpURLConnection conn = openConnection(currentHosts[0], "GET", "", null);
|
||||
if (conn == null) {
|
||||
logger.trace("keep alive thread shutting down. failed to open connection to current host [{}]", AgentUtils.santizeUrlPwds(currentHosts[0]));
|
||||
logger.trace("keep alive thread shutting down. failed to open connection to current host [{}]", HttpESExporterUtils.santizeUrlPwds(currentHosts[0]));
|
||||
return;
|
||||
} else {
|
||||
conn.getInputStream().close(); // close and release to connection pool.
|
||||
|
@ -621,7 +726,7 @@ public class HttpESExporter extends AbstractExporter<HttpESExporter> implements
|
|||
// ignore, if closed, good....
|
||||
} catch (Throwable t) {
|
||||
logger.debug("error in keep alive thread, shutting down (will be restarted after a successful connection has been made) {}",
|
||||
AgentUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(t)));
|
||||
HttpESExporterUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(t)));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,61 +3,29 @@
|
|||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.marvel.agent.support;
|
||||
package org.elasticsearch.marvel.agent.exporter;
|
||||
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.io.InputStream;
|
||||
import java.net.*;
|
||||
import java.util.Map;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class AgentUtils {
|
||||
public class HttpESExporterUtils {
|
||||
|
||||
public static XContentBuilder nodeToXContent(DiscoveryNode node, XContentBuilder builder) throws IOException {
|
||||
return nodeToXContent(node, null, builder);
|
||||
}
|
||||
|
||||
public static XContentBuilder nodeToXContent(DiscoveryNode node, Boolean isMasterNode, XContentBuilder builder) throws IOException {
|
||||
builder.field("id", node.id());
|
||||
builder.field("name", node.name());
|
||||
builder.field("transport_address", node.address());
|
||||
|
||||
if (node.address().uniqueAddressTypeId() == 1) { // InetSocket
|
||||
InetSocketTransportAddress address = (InetSocketTransportAddress) node.address();
|
||||
InetSocketAddress inetSocketAddress = address.address();
|
||||
InetAddress inetAddress = inetSocketAddress.getAddress();
|
||||
if (inetAddress != null) {
|
||||
builder.field("ip", NetworkAddress.formatAddress(inetAddress));
|
||||
builder.field("host", inetSocketAddress.getHostString());
|
||||
builder.field("ip_port", NetworkAddress.formatAddress(inetSocketAddress));
|
||||
}
|
||||
} else if (node.address().uniqueAddressTypeId() == 2) { // local transport
|
||||
builder.field("ip_port", "_" + node.address()); // will end up being "_local[ID]"
|
||||
}
|
||||
|
||||
builder.field("master_node", node.isMasterNode());
|
||||
builder.field("data_node", node.isDataNode());
|
||||
if (isMasterNode != null) {
|
||||
builder.field("master", isMasterNode.booleanValue());
|
||||
}
|
||||
|
||||
if (!node.attributes().isEmpty()) {
|
||||
builder.startObject("attributes");
|
||||
for (Map.Entry<String, String> attr : node.attributes().entrySet()) {
|
||||
builder.field(attr.getKey(), attr.getValue());
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
public static final String MARVEL_TEMPLATE_FILE = "/marvel_index_template.json";
|
||||
static final String MARVEL_VERSION_FIELD = "marvel_version";
|
||||
static final String VERSION_FIELD = "number";
|
||||
|
||||
public static String[] extractHostsFromAddress(BoundTransportAddress boundAddress, ESLogger logger) {
|
||||
if (boundAddress == null || boundAddress.boundAddress() == null) {
|
||||
|
@ -108,16 +76,52 @@ public class AgentUtils {
|
|||
|
||||
}
|
||||
|
||||
public static int parseIndexVersionFromTemplate(byte[] template) throws UnsupportedEncodingException {
|
||||
Pattern versionRegex = Pattern.compile("marvel.index_format\"\\s*:\\s*\"?(\\d+)\"?");
|
||||
Matcher matcher = versionRegex.matcher(new String(template, "UTF-8"));
|
||||
if (matcher.find()) {
|
||||
return Integer.parseInt(matcher.group(1));
|
||||
} else {
|
||||
return -1;
|
||||
/**
|
||||
* Loads the default Marvel template
|
||||
*/
|
||||
public static byte[] loadDefaultTemplate() {
|
||||
try (InputStream is = HttpESExporterUtils.class.getResourceAsStream(MARVEL_TEMPLATE_FILE)) {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
Streams.copy(is, out);
|
||||
return out.toByteArray();
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException("unable to load marvel template", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract & parse the version contained in the given template
|
||||
*/
|
||||
public static Version parseTemplateVersion(byte[] template) {
|
||||
return parseTemplateVersion(new String(template, Charset.forName("UTF-8")));
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract & parse the version contained in the given template
|
||||
*/
|
||||
public static Version parseTemplateVersion(String template) {
|
||||
return parseVersion(MARVEL_VERSION_FIELD, template);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract & parse the elasticsearch version, as returned by the REST API
|
||||
*/
|
||||
public static Version parseElasticsearchVersion(byte[] template) {
|
||||
return parseVersion(VERSION_FIELD, new String(template, Charset.forName("UTF-8")));
|
||||
}
|
||||
|
||||
static Version parseVersion(String field, String template) {
|
||||
Pattern pattern = Pattern.compile(field + "\"\\s*:\\s*\"?([0-9a-zA-Z\\.\\-]+)\"?");
|
||||
Matcher matcher = pattern.matcher(template);
|
||||
if (matcher.find()) {
|
||||
String parsedVersion = matcher.group(1);
|
||||
if (Strings.hasText(parsedVersion)) {
|
||||
return Version.fromString(parsedVersion);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static final String userInfoChars = "\\w-\\._~!$&\\'\\(\\)*+,;=%";
|
||||
private static Pattern urlPwdSanitizer = Pattern.compile("([" + userInfoChars + "]+?):[" + userInfoChars + "]+?@");
|
||||
|
|
@ -1,8 +1,7 @@
|
|||
{
|
||||
"template": ".marvel*",
|
||||
"settings": {
|
||||
"marvel.index_format": 7,
|
||||
"marvel.version": "${project.version}",
|
||||
"marvel_version": "${project.version}",
|
||||
"index.number_of_shards": 1,
|
||||
"index.number_of_replicas": 1,
|
||||
"index.codec": "best_compression",
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.marvel.agent.exporter;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
|
||||
// Transport Client instantiation also calls the marvel plugin, which then fails to find modules
|
||||
|
@ -222,6 +224,26 @@ public class HttpESExporterTests extends ESIntegTestCase {
|
|||
assertMarvelTemplateExists();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadRemoteClusterVersion() {
|
||||
Settings.Builder builder = Settings.builder()
|
||||
.put(MarvelSettings.STARTUP_DELAY, "200m")
|
||||
.put(Node.HTTP_ENABLED, true);
|
||||
String nodeId = internalCluster().startNode(builder);
|
||||
|
||||
HttpESExporter httpEsExporter = getEsExporter(nodeId);
|
||||
|
||||
logger.info("--> exporting events to force host resolution");
|
||||
httpEsExporter.export(Collections.singletonList(newRandomMarvelDoc()));
|
||||
|
||||
assertNotNull(httpEsExporter.getHosts());
|
||||
assertThat(httpEsExporter.getHosts().length, greaterThan(0));
|
||||
|
||||
logger.info("--> loading remote cluster version");
|
||||
Version resolved = httpEsExporter.loadRemoteClusterVersion(httpEsExporter.getHosts()[0]);
|
||||
assertTrue(resolved.equals(Version.CURRENT));
|
||||
}
|
||||
|
||||
private HttpESExporter getEsExporter() {
|
||||
AgentService service = internalCluster().getInstance(AgentService.class);
|
||||
return (HttpESExporter) service.getExporters().iterator().next();
|
||||
|
|
|
@ -3,12 +3,10 @@
|
|||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.marvel.agent;
|
||||
package org.elasticsearch.marvel.agent.exporter;
|
||||
|
||||
import org.elasticsearch.marvel.agent.support.AgentUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.StreamsUtils;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -18,95 +16,137 @@ import java.net.MalformedURLException;
|
|||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.net.URLEncoder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
|
||||
public class AgentUtilsTests extends ESTestCase {
|
||||
public class HttpESExporterUtilsTests extends ESTestCase {
|
||||
|
||||
@Test
|
||||
public void testVersionIsExtractableFromIndexTemplate() throws IOException {
|
||||
byte[] template = StreamsUtils.copyToBytesFromClasspath("/marvel_index_template.json");
|
||||
MatcherAssert.assertThat(AgentUtils.parseIndexVersionFromTemplate(template), Matchers.greaterThan(0));
|
||||
public void testLoadTemplate() {
|
||||
byte[] template = HttpESExporterUtils.loadDefaultTemplate();
|
||||
assertNotNull(template);
|
||||
assertThat(template.length, Matchers.greaterThan(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseTemplateVersionFromByteArrayTemplate() throws IOException {
|
||||
byte[] template = HttpESExporterUtils.loadDefaultTemplate();
|
||||
assertNotNull(template);
|
||||
|
||||
Version version = HttpESExporterUtils.parseTemplateVersion(template);
|
||||
assertNotNull(version);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseTemplateVersionFromStringTemplate() throws IOException {
|
||||
List<String> templates = new ArrayList<>();
|
||||
templates.add("{\"marvel_version\": \"1.4.0.Beta1\"}");
|
||||
templates.add("{\"marvel_version\": \"1.6.2-SNAPSHOT\"}");
|
||||
templates.add("{\"marvel_version\": \"1.7.1\"}");
|
||||
templates.add("{\"marvel_version\": \"2.0.0-beta1\"}");
|
||||
templates.add("{\"marvel_version\": \"2.0.0\"}");
|
||||
templates.add("{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }");
|
||||
|
||||
for (String template : templates) {
|
||||
Version version = HttpESExporterUtils.parseTemplateVersion(template);
|
||||
assertNotNull(version);
|
||||
}
|
||||
|
||||
Version version = HttpESExporterUtils.parseTemplateVersion("{\"marvel.index_format\": \"7\"}");
|
||||
assertNull(version);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseVersion() throws IOException {
|
||||
assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0-beta1\"}"));
|
||||
assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0\"}"));
|
||||
assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel_version\": \"1.5.2\"}"));
|
||||
assertNotNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }"));
|
||||
assertNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD, "{\"marvel.index_format\": \"7\"}"));
|
||||
assertNull(HttpESExporterUtils.parseVersion(HttpESExporterUtils.MARVEL_VERSION_FIELD + "unkown", "{\"marvel_version\": \"1.5.2\"}"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testHostParsing() throws MalformedURLException, URISyntaxException {
|
||||
URL url = AgentUtils.parseHostWithPath("localhost:9200", "");
|
||||
URL url = HttpESExporterUtils.parseHostWithPath("localhost:9200", "");
|
||||
verifyUrl(url, "http", "localhost", 9200, "/");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("localhost", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("localhost", "_bulk");
|
||||
verifyUrl(url, "http", "localhost", 9200, "/_bulk");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("http://localhost:9200", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("http://localhost:9200", "_bulk");
|
||||
verifyUrl(url, "http", "localhost", 9200, "/_bulk");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("http://localhost", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("http://localhost", "_bulk");
|
||||
verifyUrl(url, "http", "localhost", 9200, "/_bulk");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("https://localhost:9200", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("https://localhost:9200", "_bulk");
|
||||
verifyUrl(url, "https", "localhost", 9200, "/_bulk");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("https://boaz-air.local:9200", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("https://boaz-air.local:9200", "_bulk");
|
||||
verifyUrl(url, "https", "boaz-air.local", 9200, "/_bulk");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("boaz:test@localhost:9200", "");
|
||||
url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost:9200", "");
|
||||
verifyUrl(url, "http", "localhost", 9200, "/", "boaz:test");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("boaz:test@localhost", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost", "_bulk");
|
||||
verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("http://boaz:test@localhost:9200", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost:9200", "_bulk");
|
||||
verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("http://boaz:test@localhost", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost", "_bulk");
|
||||
verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("https://boaz:test@localhost:9200", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("https://boaz:test@localhost:9200", "_bulk");
|
||||
verifyUrl(url, "https", "localhost", 9200, "/_bulk", "boaz:test");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("boaz:test@localhost:9200/suburl", "");
|
||||
url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost:9200/suburl", "");
|
||||
verifyUrl(url, "http", "localhost", 9200, "/suburl/", "boaz:test");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("boaz:test@localhost:9200/suburl/", "");
|
||||
url = HttpESExporterUtils.parseHostWithPath("boaz:test@localhost:9200/suburl/", "");
|
||||
verifyUrl(url, "http", "localhost", 9200, "/suburl/", "boaz:test");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("localhost/suburl", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("localhost/suburl", "_bulk");
|
||||
verifyUrl(url, "http", "localhost", 9200, "/suburl/_bulk");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("http://boaz:test@localhost:9200/suburl/suburl1", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost:9200/suburl/suburl1", "_bulk");
|
||||
verifyUrl(url, "http", "localhost", 9200, "/suburl/suburl1/_bulk", "boaz:test");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("http://boaz:test@localhost/suburl", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("http://boaz:test@localhost/suburl", "_bulk");
|
||||
verifyUrl(url, "http", "localhost", 9200, "/suburl/_bulk", "boaz:test");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("https://boaz:test@localhost:9200/suburl", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("https://boaz:test@localhost:9200/suburl", "_bulk");
|
||||
verifyUrl(url, "https", "localhost", 9200, "/suburl/_bulk", "boaz:test");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("https://user:test@server_with_underscore:9300", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("https://user:test@server_with_underscore:9300", "_bulk");
|
||||
verifyUrl(url, "https", "server_with_underscore", 9300, "/_bulk", "user:test");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("user:test@server_with_underscore:9300", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("user:test@server_with_underscore:9300", "_bulk");
|
||||
verifyUrl(url, "http", "server_with_underscore", 9300, "/_bulk", "user:test");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("server_with_underscore:9300", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("server_with_underscore:9300", "_bulk");
|
||||
verifyUrl(url, "http", "server_with_underscore", 9300, "/_bulk");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("server_with_underscore", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("server_with_underscore", "_bulk");
|
||||
verifyUrl(url, "http", "server_with_underscore", 9200, "/_bulk");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("https://user:test@server-dash:9300", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("https://user:test@server-dash:9300", "_bulk");
|
||||
verifyUrl(url, "https", "server-dash", 9300, "/_bulk", "user:test");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("user:test@server-dash:9300", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("user:test@server-dash:9300", "_bulk");
|
||||
verifyUrl(url, "http", "server-dash", 9300, "/_bulk", "user:test");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("server-dash:9300", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("server-dash:9300", "_bulk");
|
||||
verifyUrl(url, "http", "server-dash", 9300, "/_bulk");
|
||||
|
||||
url = AgentUtils.parseHostWithPath("server-dash", "_bulk");
|
||||
url = HttpESExporterUtils.parseHostWithPath("server-dash", "_bulk");
|
||||
verifyUrl(url, "http", "server-dash", 9200, "/_bulk");
|
||||
}
|
||||
|
||||
|
@ -148,7 +188,7 @@ public class AgentUtilsTests extends ESTestCase {
|
|||
};
|
||||
|
||||
for (String input : inputs) {
|
||||
String sanitized = AgentUtils.santizeUrlPwds(input);
|
||||
String sanitized = HttpESExporterUtils.santizeUrlPwds(input);
|
||||
assertThat(sanitized, not(containsString(pwd)));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue