[Monitoring] Add Empty Ingest Pipeline to Future Proof Monitoring Documents

This adds an empty _ingest/pipeline named after the _current_ version of the Monitoring API (currently 2) from both exporters.

This allows us to eventually change the API version (to 3, 4, etc.) and go _back_ and modify the pipeline that exists. The modified pipeline would then "fix" the documents as best as possible and rename the index. As a result, different versions (starting with 5.0) will be able to report to the same monitoring cluster regardless of the running API version.

Note: This has no impact on stale data (e.g., the day before the upgrade) _and_ it implies that the monitoring cluster should always be updated first. A simple reindexing script can be supplied for old data, which can be done at the discretion of the user.

Original commit: elastic/x-pack-elasticsearch@45df5ee87b
This commit is contained in:
Chris Earle 2016-07-25 14:18:40 -04:00
parent 72f580c82d
commit 3c9749b2b8
13 changed files with 675 additions and 234 deletions

View File

@ -10,14 +10,28 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.monitoring.MonitoringSettings; import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public abstract class Exporter implements AutoCloseable { public abstract class Exporter implements AutoCloseable {
/**
* The pipeline name passed with any <em>direct</em> indexing operation in order to support future API revisions.
*/
public static final String EXPORT_PIPELINE_NAME = "xpack_monitoring_" + MonitoringTemplateUtils.TEMPLATE_VERSION;
public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format"; public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format";
public static final String BULK_TIMEOUT_SETTING = "bulk.timeout"; public static final String BULK_TIMEOUT_SETTING = "bulk.timeout";
/**
* Every {@code Exporter} adds the ingest pipeline to bulk requests, but they should, at the exporter level, allow that to be disabled.
* <p>
* Note: disabling it obviously loses any benefit of using it, but it does allow clusters that don't run with ingest to not use it.
*/
public static final String USE_INGEST_PIPELINE_SETTING = "use_ingest";
protected final Config config; protected final Config config;
protected final ESLogger logger; protected final ESLogger logger;
@ -72,6 +86,34 @@ public abstract class Exporter implements AutoCloseable {
return MonitoringSettings.EXPORTERS_SETTINGS.getKey() + config.name + "." + setting; return MonitoringSettings.EXPORTERS_SETTINGS.getKey() + config.name + "." + setting;
} }
/**
* Create an empty pipeline.
* <pre><code>
* {
* "description" : "2: This is a placeholder pipeline ...",
* "processors": [ ]
* }
* </code></pre>
* The expectation is that you will call either {@link XContentBuilder#string()} or {@link XContentBuilder#bytes()}}.
*
* @param type The type of data you want to format for the request
* @return Never {@code null}. Always an ended-object.
*/
public static XContentBuilder emptyPipeline(XContentType type) {
try {
// For now: We prepend the API version to the string so that it's easy to parse in the future; if we ever add metadata
// to pipelines, then it would better serve this use case
return XContentBuilder.builder(type.xContent()).startObject()
.field("description", MonitoringTemplateUtils.TEMPLATE_VERSION +
": This is a placeholder pipeline for Monitoring API version " +
MonitoringTemplateUtils.TEMPLATE_VERSION + " so that future versions may fix breaking changes.")
.startArray("processors").endArray()
.endObject();
} catch (IOException e) {
throw new RuntimeException("Failed to create empty pipeline", e);
}
}
public static class Config { public static class Config {
private final String name; private final String name;

View File

@ -12,6 +12,7 @@ import org.elasticsearch.SpecialPermission;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -62,6 +63,16 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
/**
* With the forthcoming addition of the HTTP-based Java Client for ES, we should be able to combine this class with the
* {@code LocalExporter} implementation, with only a few minor differences:
*
* <ul>
* <li>The {@code HttpExporter} needs to support configuring the certificates and authentication parameters.</li>
* <li>Depending on how the REST client is implemented, it may or may not allow us to make some calls in the same way
* (only time will tell; unknown unknowns).</li>
* </ul>
*/
public class HttpExporter extends Exporter { public class HttpExporter extends Exporter {
public static final String TYPE = "http"; public static final String TYPE = "http";
@ -84,8 +95,14 @@ public class HttpExporter extends Exporter {
*/ */
public static final Set<String> BLACKLISTED_HEADERS = Collections.unmodifiableSet(Sets.newHashSet("Content-Length", "Content-Type")); public static final Set<String> BLACKLISTED_HEADERS = Collections.unmodifiableSet(Sets.newHashSet("Content-Length", "Content-Type"));
// es level timeout used when checking and writing templates (used to speed up tests) /**
* ES level timeout used when checking and writing templates (used to speed up tests)
*/
public static final String TEMPLATE_CHECK_TIMEOUT_SETTING = "index.template.master_timeout"; public static final String TEMPLATE_CHECK_TIMEOUT_SETTING = "index.template.master_timeout";
/**
* ES level timeout used when checking and writing pipelines (used to speed up tests)
*/
public static final String PIPELINE_CHECK_TIMEOUT_SETTING = "index.pipeline.master_timeout";
public static final String SSL_SETTING = "ssl"; public static final String SSL_SETTING = "ssl";
public static final String SSL_PROTOCOL_SETTING = "protocol"; public static final String SSL_PROTOCOL_SETTING = "protocol";
@ -95,9 +112,11 @@ public class HttpExporter extends Exporter {
public static final String SSL_HOSTNAME_VERIFICATION_SETTING = SSL_SETTING + ".hostname_verification"; public static final String SSL_HOSTNAME_VERIFICATION_SETTING = SSL_SETTING + ".hostname_verification";
/** /**
* Minimum supported version of the remote monitoring cluster * Minimum supported version of the remote monitoring cluster.
**/ * <p>
public static final Version MIN_SUPPORTED_CLUSTER_VERSION = Version.V_2_0_0_beta2; * We must have support for ingest pipelines, which requires a minimum of 5.0.
*/
public static final Version MIN_SUPPORTED_CLUSTER_VERSION = Version.V_5_0_0_alpha5;
private static final XContentType CONTENT_TYPE = XContentType.JSON; private static final XContentType CONTENT_TYPE = XContentType.JSON;
@ -117,6 +136,10 @@ public class HttpExporter extends Exporter {
@Nullable @Nullable
final TimeValue templateCheckTimeout; final TimeValue templateCheckTimeout;
@Nullable
final TimeValue pipelineCheckTimeout;
/** /**
* Headers supplied by the user to send (likely to a proxy for routing). * Headers supplied by the user to send (likely to a proxy for routing).
*/ */
@ -124,6 +147,7 @@ public class HttpExporter extends Exporter {
private final Map<String, String[]> headers; private final Map<String, String[]> headers;
volatile boolean checkedAndUploadedIndexTemplate = false; volatile boolean checkedAndUploadedIndexTemplate = false;
volatile boolean checkedAndUploadedIndexPipeline = false;
volatile boolean supportedClusterVersion = false; volatile boolean supportedClusterVersion = false;
boolean keepAlive; boolean keepAlive;
@ -142,11 +166,8 @@ public class HttpExporter extends Exporter {
this.connectionReadTimeout = config.settings().getAsTime(CONNECTION_READ_TIMEOUT_SETTING, this.connectionReadTimeout = config.settings().getAsTime(CONNECTION_READ_TIMEOUT_SETTING,
TimeValue.timeValueMillis(connectionTimeout.millis() * 10)); TimeValue.timeValueMillis(connectionTimeout.millis() * 10));
// HORRIBLE!!! We can't use settings.getAsTime(..) !!! templateCheckTimeout = parseTimeValue(TEMPLATE_CHECK_TIMEOUT_SETTING);
// WE MUST FIX THIS IN CORE... pipelineCheckTimeout = parseTimeValue(PIPELINE_CHECK_TIMEOUT_SETTING);
// TimeValue SHOULD NOT SELECTIVELY CHOOSE WHAT FIELDS TO PARSE BASED ON THEIR NAMES!!!!
String templateCheckTimeoutValue = config.settings().get(TEMPLATE_CHECK_TIMEOUT_SETTING, null);
templateCheckTimeout = TimeValue.parseTimeValue(templateCheckTimeoutValue, null, settingFQN(TEMPLATE_CHECK_TIMEOUT_SETTING));
keepAlive = config.settings().getAsBoolean(CONNECTION_KEEP_ALIVE_SETTING, true); keepAlive = config.settings().getAsBoolean(CONNECTION_KEEP_ALIVE_SETTING, true);
keepAliveWorker = new ConnectionKeepAliveWorker(); keepAliveWorker = new ConnectionKeepAliveWorker();
@ -166,7 +187,7 @@ public class HttpExporter extends Exporter {
Strings.arrayToCommaDelimitedString(hosts), MonitoringIndexNameResolver.PREFIX); Strings.arrayToCommaDelimitedString(hosts), MonitoringIndexNameResolver.PREFIX);
} }
private String[] resolveHosts(Settings settings) { private String[] resolveHosts(final Settings settings) {
final String[] hosts = settings.getAsArray(HOST_SETTING); final String[] hosts = settings.getAsArray(HOST_SETTING);
if (hosts.length == 0) { if (hosts.length == 0) {
@ -184,7 +205,7 @@ public class HttpExporter extends Exporter {
return hosts; return hosts;
} }
private Map<String, String[]> configureHeaders(Settings settings) { private Map<String, String[]> configureHeaders(final Settings settings) {
final Settings headerSettings = settings.getAsSettings(HEADERS); final Settings headerSettings = settings.getAsSettings(HEADERS);
final Set<String> names = headerSettings.names(); final Set<String> names = headerSettings.names();
@ -213,6 +234,15 @@ public class HttpExporter extends Exporter {
return Collections.unmodifiableMap(headers); return Collections.unmodifiableMap(headers);
} }
private TimeValue parseTimeValue(final String setting) {
// HORRIBLE!!! We can't use settings.getAsTime(..) !!!
// WE MUST FIX THIS IN CORE...
// TimeValue SHOULD NOT SELECTIVELY CHOOSE WHAT FIELDS TO PARSE BASED ON THEIR NAMES!!!!
final String checkTimeoutValue = config.settings().get(setting, null);
return TimeValue.parseTimeValue(checkTimeoutValue, null, settingFQN(setting));
}
ResolversRegistry getResolvers() { ResolversRegistry getResolvers() {
return resolvers; return resolvers;
} }
@ -236,12 +266,29 @@ public class HttpExporter extends Exporter {
} }
} }
private String buildQueryString() {
StringBuilder queryString = new StringBuilder();
if (bulkTimeout != null) {
queryString.append("master_timeout=").append(bulkTimeout);
}
// allow the use of ingest pipelines to be completely optional
if (config.settings().getAsBoolean(USE_INGEST_PIPELINE_SETTING, true)) {
if (queryString.length() != 0) {
queryString.append('&');
}
queryString.append("pipeline=").append(EXPORT_PIPELINE_NAME);
}
return queryString.length() != 0 ? '?' + queryString.toString() : "";
}
private HttpURLConnection openExportingConnection() { private HttpURLConnection openExportingConnection() {
logger.trace("setting up an export connection"); logger.trace("setting up an export connection");
String queryString = "";
if (bulkTimeout != null) { final String queryString = buildQueryString();
queryString = "?master_timeout=" + bulkTimeout;
}
HttpURLConnection conn = openAndValidateConnection("POST", "/_bulk" + queryString, CONTENT_TYPE.mediaType()); HttpURLConnection conn = openAndValidateConnection("POST", "/_bulk" + queryString, CONTENT_TYPE.mediaType());
if (conn != null && (keepAliveThread == null || !keepAliveThread.isAlive())) { if (conn != null && (keepAliveThread == null || !keepAliveThread.isAlive())) {
// start keep alive upon successful connection if not there. // start keep alive upon successful connection if not there.
@ -333,7 +380,7 @@ public class HttpExporter extends Exporter {
* @return a url connection to the selected host or null if no current host is available. * @return a url connection to the selected host or null if no current host is available.
*/ */
private HttpURLConnection openAndValidateConnection(String method, String path, String contentType) { private HttpURLConnection openAndValidateConnection(String method, String path, String contentType) {
// out of for to move faulty hosts to the end // allows us to move faulty hosts to the end; the HTTP Client will make this code obsolete
int hostIndex = 0; int hostIndex = 0;
try { try {
for (; hostIndex < hosts.length; hostIndex++) { for (; hostIndex < hosts.length; hostIndex++) {
@ -357,13 +404,17 @@ public class HttpExporter extends Exporter {
} }
} }
if (!checkedAndUploadedIndexTemplate) { // NOTE: This assumes that the user is configured properly and only sending to a single cluster
// check templates first on the host if (checkedAndUploadedIndexTemplate == false || checkedAndUploadedIndexPipeline == false) {
checkedAndUploadedIndexTemplate = checkAndUploadIndexTemplate(host); checkedAndUploadedIndexTemplate = checkAndUploadIndexTemplate(host);
if (!checkedAndUploadedIndexTemplate) { checkedAndUploadedIndexPipeline = checkedAndUploadedIndexTemplate && checkAndUploadIndexPipeline(host);
// did we fail?
if (checkedAndUploadedIndexTemplate == false || checkedAndUploadedIndexPipeline == false) {
continue; continue;
} }
} }
HttpURLConnection connection = openConnection(host, method, path, contentType); HttpURLConnection connection = openConnection(host, method, path, contentType);
if (connection != null) { if (connection != null) {
return connection; return connection;
@ -371,6 +422,7 @@ public class HttpExporter extends Exporter {
// failed hosts - reset template & cluster versions check, someone may have restarted the target cluster and deleted // failed hosts - reset template & cluster versions check, someone may have restarted the target cluster and deleted
// it's data folder. be safe. // it's data folder. be safe.
checkedAndUploadedIndexTemplate = false; checkedAndUploadedIndexTemplate = false;
checkedAndUploadedIndexPipeline = false;
supportedClusterVersion = false; supportedClusterVersion = false;
} }
} finally { } finally {
@ -393,6 +445,7 @@ public class HttpExporter extends Exporter {
* open a connection to the given hosts, returning null when not successful * * open a connection to the given hosts, returning null when not successful *
*/ */
private HttpURLConnection openConnection(String host, String method, String path, @Nullable String contentType) { private HttpURLConnection openConnection(String host, String method, String path, @Nullable String contentType) {
// the HTTP Client will make this code obsolete
try { try {
final URL url = HttpExporterUtils.parseHostWithPath(host, path); final URL url = HttpExporterUtils.parseHostWithPath(host, path);
final HttpURLConnection conn = (HttpURLConnection) url.openConnection(); final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
@ -420,7 +473,7 @@ public class HttpExporter extends Exporter {
httpsConn.setSSLSocketFactory(factory); httpsConn.setSSLSocketFactory(factory);
// Requires permission javax.net.ssl.SSLPermission "setHostnameVerifier"; // Requires permission javax.net.ssl.SSLPermission "setHostnameVerifier";
if (!hostnameVerification) { if (hostnameVerification == false) {
httpsConn.setHostnameVerifier(TrustAllHostnameVerifier.INSTANCE); httpsConn.setHostnameVerifier(TrustAllHostnameVerifier.INSTANCE);
} }
return null; return null;
@ -444,15 +497,9 @@ public class HttpExporter extends Exporter {
return conn; return conn;
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
logger.error("error parsing host [{}] [{}]", host, e.getMessage()); logger.error("error parsing host [{}]", e, host);
if (logger.isDebugEnabled()) {
logger.debug("error parsing host [{}]. full error details:\n[{}]", host, ExceptionsHelper.detailedMessage(e));
}
} catch (IOException e) { } catch (IOException e) {
logger.error("error connecting to [{}] [{}]", host, e.getMessage()); logger.error("error connecting to [{}]", e, host);
if (logger.isDebugEnabled()) {
logger.debug("error connecting to [{}]. full error details:\n[{}]", host, ExceptionsHelper.detailedMessage(e));
}
} }
return null; return null;
} }
@ -474,7 +521,91 @@ public class HttpExporter extends Exporter {
return VersionUtils.parseVersion(out.toByteArray()); return VersionUtils.parseVersion(out.toByteArray());
} }
} catch (IOException e) { } catch (IOException e) {
throw new ElasticsearchException("failed to verify the remote cluster version on host [" + host + "]:\n" + e.getMessage()); throw new ElasticsearchException("failed to verify the remote cluster version on host [" + host + "]", e);
} finally {
if (connection != null) {
try {
connection.getInputStream().close();
} catch (IOException e) {
// Ignore
}
}
}
}
/**
* Checks if the index pipeline already exists and, if not, uploads it.
*
* @return {@code true} if the pipeline exists after executing.
* @throws RuntimeException if any error occurs that should prevent indexing
*/
private boolean checkAndUploadIndexPipeline(final String host) {
if (hasPipeline(host) == false) {
logger.debug("monitoring pipeline [{}] not found", EXPORT_PIPELINE_NAME);
return putPipeline(host);
} else {
logger.trace("monitoring pipeline [{}] found", EXPORT_PIPELINE_NAME);
}
return true;
}
private boolean hasPipeline(final String host) {
final String url = urlWithMasterTimeout("_ingest/pipeline/" + EXPORT_PIPELINE_NAME, pipelineCheckTimeout);
HttpURLConnection connection = null;
try {
logger.trace("checking if monitoring pipeline [{}] exists on the monitoring cluster", EXPORT_PIPELINE_NAME);
connection = openConnection(host, "GET", url, null);
if (connection == null) {
throw new IOException("no available connection to check for monitoring pipeline [" + EXPORT_PIPELINE_NAME + "] existence");
}
// 200 means that the template has been found, 404 otherwise
if (connection.getResponseCode() == 200) {
logger.debug("monitoring pipeline [{}] found", EXPORT_PIPELINE_NAME);
return true;
}
} catch (Exception e) {
logger.error("failed to verify the monitoring pipeline [{}] on [{}]", e, EXPORT_PIPELINE_NAME, host);
return false;
} finally {
if (connection != null) {
try {
connection.getInputStream().close();
} catch (IOException e) {
// Ignore
}
}
}
return false;
}
private boolean putPipeline(final String host) {
logger.trace("installing pipeline [{}]", EXPORT_PIPELINE_NAME);
HttpURLConnection connection = null;
try {
connection = openConnection(host, "PUT", "_ingest/pipeline/" + EXPORT_PIPELINE_NAME, XContentType.JSON.mediaType());
if (connection == null) {
logger.debug("no available connection to upload monitoring pipeline [{}]", EXPORT_PIPELINE_NAME);
return false;
}
// Uploads the template and closes the outputstream
Streams.copy(BytesReference.toBytes(emptyPipeline(XContentType.JSON).bytes()), connection.getOutputStream());
if (connection.getResponseCode() != 200 && connection.getResponseCode() != 201) {
logConnectionError("error adding the monitoring pipeline [" + EXPORT_PIPELINE_NAME + "] to [" + host + "]", connection);
return false;
}
logger.info("monitoring pipeline [{}] set", EXPORT_PIPELINE_NAME);
return true;
} catch (IOException e) {
logger.error("failed to update monitoring pipeline [{}] on host [{}]", e, EXPORT_PIPELINE_NAME, host);
return false;
} finally { } finally {
if (connection != null) { if (connection != null) {
try { try {
@ -488,9 +619,9 @@ public class HttpExporter extends Exporter {
/** /**
* Checks if the index templates already exist and if not uploads it * Checks if the index templates already exist and if not uploads it
* Any critical error that should prevent data exporting is communicated via an exception.
* *
* @return true if template exists or was uploaded successfully. * @return true if template exists after executing.
* @throws RuntimeException if any error occurs that should prevent indexing
*/ */
private boolean checkAndUploadIndexTemplate(final String host) { private boolean checkAndUploadIndexTemplate(final String host) {
// List of distinct templates // List of distinct templates
@ -500,7 +631,7 @@ public class HttpExporter extends Exporter {
for (Map.Entry<String, String> template : templates.entrySet()) { for (Map.Entry<String, String> template : templates.entrySet()) {
if (hasTemplate(template.getKey(), host) == false) { if (hasTemplate(template.getKey(), host) == false) {
logger.debug("template [{}] not found", template.getKey()); logger.debug("template [{}] not found", template.getKey());
if (!putTemplate(host, template.getKey(), template.getValue())) { if (putTemplate(host, template.getKey(), template.getValue()) == false) {
return false; return false;
} }
} else { } else {
@ -511,10 +642,7 @@ public class HttpExporter extends Exporter {
} }
private boolean hasTemplate(String templateName, String host) { private boolean hasTemplate(String templateName, String host) {
String url = "_template/" + templateName; final String url = urlWithMasterTimeout("_template/" + templateName, templateCheckTimeout);
if (templateCheckTimeout != null) {
url += "?timeout=" + templateCheckTimeout;
}
HttpURLConnection connection = null; HttpURLConnection connection = null;
try { try {
@ -530,7 +658,7 @@ public class HttpExporter extends Exporter {
return true; return true;
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("failed to verify the monitoring template [{}] on [{}]:\n{}", templateName, host, e.getMessage()); logger.error("failed to verify the monitoring template [{}] on [{}]", e, templateName, host);
return false; return false;
} finally { } finally {
if (connection != null) { if (connection != null) {
@ -564,7 +692,7 @@ public class HttpExporter extends Exporter {
logger.info("monitoring template [{}] updated ", template); logger.info("monitoring template [{}] updated ", template);
return true; return true;
} catch (IOException e) { } catch (IOException e) {
logger.error("failed to update monitoring template [{}] on host [{}]:\n{}", template, host, e.getMessage()); logger.error("failed to update monitoring template [{}] on host [{}]", e, template, host);
return false; return false;
} finally { } finally {
if (connection != null) { if (connection != null) {
@ -577,6 +705,23 @@ public class HttpExporter extends Exporter {
} }
} }
/**
* Get the {@code url} with the optional {@code masterTimeout}.
* <p>
* This method assumes that there is no query string applied yet!
*
* @param url The URL being used
* @param masterTimeout The optional master_timeout
* @return Never {@code null}
*/
private String urlWithMasterTimeout(final String url, final TimeValue masterTimeout) {
if (masterTimeout != null) {
return url + "?master_timeout=" + masterTimeout;
}
return url;
}
private void logConnectionError(String msg, HttpURLConnection conn) { private void logConnectionError(String msg, HttpURLConnection conn) {
InputStream inputStream = conn.getErrorStream(); InputStream inputStream = conn.getErrorStream();
String err = ""; String err = "";

View File

@ -11,7 +11,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.common.init.proxy.ClientProxy;
import org.elasticsearch.xpack.monitoring.agent.exporter.ExportBulk; import org.elasticsearch.xpack.monitoring.agent.exporter.ExportBulk;
import org.elasticsearch.xpack.monitoring.agent.exporter.ExportException; import org.elasticsearch.xpack.monitoring.agent.exporter.ExportException;
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc; import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
@ -22,6 +21,8 @@ import org.elasticsearch.xpack.security.InternalClient;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import static org.elasticsearch.xpack.monitoring.agent.exporter.Exporter.EXPORT_PIPELINE_NAME;
/** /**
* LocalBulk exports monitoring data in the local cluster using bulk requests. Its usage is not thread safe since the * LocalBulk exports monitoring data in the local cluster using bulk requests. Its usage is not thread safe since the
* {@link LocalBulk#add(Collection)}, {@link LocalBulk#flush()} and {@link LocalBulk#doClose()} methods are not synchronized. * {@link LocalBulk#add(Collection)}, {@link LocalBulk#flush()} and {@link LocalBulk#doClose()} methods are not synchronized.
@ -31,15 +32,17 @@ public class LocalBulk extends ExportBulk {
private final ESLogger logger; private final ESLogger logger;
private final InternalClient client; private final InternalClient client;
private final ResolversRegistry resolvers; private final ResolversRegistry resolvers;
private final boolean usePipeline;
private BulkRequestBuilder requestBuilder; private BulkRequestBuilder requestBuilder;
public LocalBulk(String name, ESLogger logger, InternalClient client, ResolversRegistry resolvers) { public LocalBulk(String name, ESLogger logger, InternalClient client, ResolversRegistry resolvers, boolean usePipeline) {
super(name); super(name);
this.logger = logger; this.logger = logger;
this.client = client; this.client = client;
this.resolvers = resolvers; this.resolvers = resolvers;
this.usePipeline = usePipeline;
} }
@Override @Override
@ -58,11 +61,17 @@ public class LocalBulk extends ExportBulk {
MonitoringIndexNameResolver<MonitoringDoc> resolver = resolvers.getResolver(doc); MonitoringIndexNameResolver<MonitoringDoc> resolver = resolvers.getResolver(doc);
IndexRequest request = new IndexRequest(resolver.index(doc), resolver.type(doc), resolver.id(doc)); IndexRequest request = new IndexRequest(resolver.index(doc), resolver.type(doc), resolver.id(doc));
request.source(resolver.source(doc, XContentType.SMILE)); request.source(resolver.source(doc, XContentType.SMILE));
// allow the use of ingest pipelines to be completely optional
if (usePipeline) {
request.setPipeline(EXPORT_PIPELINE_NAME);
}
requestBuilder.add(request); requestBuilder.add(request);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("local exporter [{}] - added index request [index={}, type={}, id={}]", logger.trace("local exporter [{}] - added index request [index={}, type={}, id={}, pipeline={}]",
name, request.index(), request.type(), request.id()); name, request.index(), request.type(), request.id(), request.getPipeline());
} }
} catch (Exception e) { } catch (Exception e) {
if (exception == null) { if (exception == null) {

View File

@ -11,19 +11,23 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.xpack.common.init.proxy.ClientProxy; import org.elasticsearch.xpack.common.init.proxy.ClientProxy;
import org.elasticsearch.xpack.monitoring.agent.exporter.ExportBulk; import org.elasticsearch.xpack.monitoring.agent.exporter.ExportBulk;
import org.elasticsearch.xpack.monitoring.agent.exporter.Exporter; import org.elasticsearch.xpack.monitoring.agent.exporter.Exporter;
@ -37,6 +41,7 @@ import org.joda.time.DateTimeZone;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -133,11 +138,17 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
} }
} }
logger.debug("monitoring index templates are installed, service can start"); // if we don't have the ingest pipeline, then it's going to fail anyway
if (hasIngestPipelines(clusterState) == false) {
logger.debug("monitoring ingest pipeline [{}] does not exist, so service cannot start", EXPORT_PIPELINE_NAME);
return null;
}
logger.trace("monitoring index templates and pipelines are installed, service can start");
} else { } else {
// we are on master // we are on the elected master
// //
// Check that there is nothing that could block metadata updates // Check that there is nothing that could block metadata updates
if (clusterState.blocks().hasGlobalBlock(ClusterBlockLevel.METADATA_WRITE)) { if (clusterState.blocks().hasGlobalBlock(ClusterBlockLevel.METADATA_WRITE)) {
@ -145,24 +156,82 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
return null; return null;
} }
// whenever we install anything, we return null to force it to retry to give the cluster a chance to catch up
boolean installedSomething = false;
// Check that each required template exist, installing it if needed // Check that each required template exist, installing it if needed
for (Map.Entry<String, String> template : templates.entrySet()) { for (Map.Entry<String, String> template : templates.entrySet()) {
if (hasTemplate(template.getKey(), clusterState) == false) { if (hasTemplate(template.getKey(), clusterState) == false) {
logger.debug("template [{}] not found", template.getKey()); logger.debug("template [{}] not found", template.getKey());
putTemplate(template.getKey(), template.getValue()); putTemplate(template.getKey(), template.getValue());
return null; installedSomething = true;
} else { } else {
logger.debug("template [{}] found", template.getKey()); logger.trace("template [{}] found", template.getKey());
} }
} }
logger.debug("monitoring index templates are installed on master node, service can start"); // if we don't have the ingest pipeline, then install it
if (hasIngestPipelines(clusterState) == false) {
logger.debug("pipeline [{}] not found", EXPORT_PIPELINE_NAME);
putIngestPipeline();
installedSomething = true;
} else {
logger.trace("pipeline [{}] found", EXPORT_PIPELINE_NAME);
}
if (installedSomething) {
// let the cluster catch up (and because we do the PUTs asynchronously)
return null;
}
logger.trace("monitoring index templates and pipelines are installed on master node, service can start");
} }
if (state.compareAndSet(State.INITIALIZED, State.RUNNING)) { if (state.compareAndSet(State.INITIALIZED, State.RUNNING)) {
logger.debug("started"); logger.debug("started");
} }
return new LocalBulk(name(), logger, client, resolvers);
return new LocalBulk(name(), logger, client, resolvers, config.settings().getAsBoolean(USE_INGEST_PIPELINE_SETTING, true));
}
/**
* Determine if the ingest pipeline for {@link #EXPORT_PIPELINE_NAME} exists in the cluster or not.
*
* @param clusterState The current cluster state
* @return {@code true} if the {@code clusterState} contains a pipeline with {@link #EXPORT_PIPELINE_NAME}
*/
private boolean hasIngestPipelines(ClusterState clusterState) {
final IngestMetadata ingestMetadata = clusterState.getMetaData().custom(IngestMetadata.TYPE);
// NOTE: this will need to become a more advanced check once we actually supply a meaningful pipeline
// because we will want to _replace_ older pipelines so that they go from (e.g., monitoring-2 doing nothing to
// monitoring-2 becoming monitoring-3 documents)
return ingestMetadata != null && ingestMetadata.getPipelines().containsKey(EXPORT_PIPELINE_NAME);
}
/**
* Create the pipeline required to handle past data as well as to future-proof ingestion for <em>current</em> documents (the pipeline
* is initially empty, but it can be replaced later with one that translates it as-needed).
* <p>
* This should only be invoked by the <em>elected</em> master node.
* <p>
* Whenever we eventually make a backwards incompatible change, then we need to override any pipeline that already exists that is
* older than this one. Currently, we prepend the API version as the first part of the description followed by a <code>:</code>.
* <pre><code>
* {
* "description": "2: This is a placeholder ...",
* "pipelines" : [ ... ]
* }
* </code></pre>
* That should be used (until something better exists) to ensure that we do not override <em>newer</em> pipelines with our own.
*/
private void putIngestPipeline() {
logger.debug("installing ingest pipeline [{}]", EXPORT_PIPELINE_NAME);
final BytesReference emptyPipeline = emptyPipeline(XContentType.JSON).bytes();
final PutPipelineRequest request = new PutPipelineRequest(EXPORT_PIPELINE_NAME, emptyPipeline);
client.admin().cluster().putPipeline(request, new ResponseActionListener<>("pipeline", EXPORT_PIPELINE_NAME));
} }
/** /**
@ -193,28 +262,14 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
return templates.size() > 0; return templates.size() > 0;
} }
void putTemplate(String template, String source) { private void putTemplate(String template, String source) {
logger.debug("installing template [{}]",template); logger.debug("installing template [{}]",template);
PutIndexTemplateRequest request = new PutIndexTemplateRequest(template).source(source); PutIndexTemplateRequest request = new PutIndexTemplateRequest(template).source(source);
assert !Thread.currentThread().isInterrupted() : "current thread has been interrupted before putting index template!!!"; assert !Thread.currentThread().isInterrupted() : "current thread has been interrupted before putting index template!!!";
// async call, so we won't block cluster event thread // async call, so we won't block cluster event thread
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() { client.admin().indices().putTemplate(request, new ResponseActionListener<>("template", template));
@Override
public void onResponse(PutIndexTemplateResponse response) {
if (response.isAcknowledged()) {
logger.trace("successfully installed monitoring template [{}]", template);
} else {
logger.error("failed to update monitoring index template [{}]", template);
}
}
@Override
public void onFailure(Exception e) {
logger.error("failed to update monitoring index template [{}]", e, template);
}
});
} }
@Override @Override
@ -307,4 +362,32 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
RUNNING, RUNNING,
TERMINATED TERMINATED
} }
/**
* Acknowledge success / failure for any given creation attempt (e.g., template or pipeline).
*/
private class ResponseActionListener<Response extends AcknowledgedResponse> implements ActionListener<Response> {
private final String type;
private final String name;
public ResponseActionListener(String type, String name) {
this.type = Objects.requireNonNull(type);
this.name = Objects.requireNonNull(name);
}
@Override
public void onResponse(Response response) {
if (response.isAcknowledged()) {
logger.trace("successfully set monitoring {} [{}]", type, name);
} else {
logger.error("failed to set monitoring index {} [{}]", type, name);
}
}
@Override
public void onFailure(Exception e) {
logger.error("failed to set monitoring index {} [{}]", e, type, name);
}
}
} }

View File

@ -45,22 +45,32 @@ public abstract class AbstractExporterTemplateTestCase extends MonitoringIntegTe
protected abstract void deleteTemplates() throws Exception; protected abstract void deleteTemplates() throws Exception;
protected abstract void deletePipeline() throws Exception;
protected abstract void putTemplate(String name) throws Exception; protected abstract void putTemplate(String name) throws Exception;
protected abstract void assertTemplateExist(String name) throws Exception; protected abstract void putPipeline(String name) throws Exception;
protected abstract void assertTemplateExists(String name) throws Exception;
protected abstract void assertPipelineExists(String name) throws Exception;
protected abstract void assertTemplateNotUpdated(String name) throws Exception; protected abstract void assertTemplateNotUpdated(String name) throws Exception;
protected abstract void assertPipelineNotUpdated(String name) throws Exception;
public void testCreateWhenNoExistingTemplates() throws Exception { public void testCreateWhenNoExistingTemplates() throws Exception {
internalCluster().startNode(); internalCluster().startNode();
deleteTemplates(); deleteTemplates();
deletePipeline();
doExporting(); doExporting();
logger.debug("--> templates does not exist: it should have been created in the current version"); logger.debug("--> templates does not exist: it should have been created in the current version");
for (String template : monitoringTemplates().keySet()) { for (String template : monitoringTemplates().keySet()) {
assertTemplateExist(template); assertTemplateExists(template);
} }
assertPipelineExists(Exporter.EXPORT_PIPELINE_NAME);
doExporting(); doExporting();
@ -74,17 +84,19 @@ public abstract class AbstractExporterTemplateTestCase extends MonitoringIntegTe
putTemplate(indexTemplateName()); putTemplate(indexTemplateName());
putTemplate(dataTemplateName()); putTemplate(dataTemplateName());
putPipeline(Exporter.EXPORT_PIPELINE_NAME);
doExporting(); doExporting();
logger.debug("--> existing templates are old"); logger.debug("--> existing templates are old");
assertTemplateExist(dataTemplateName()); assertTemplateExists(dataTemplateName());
assertTemplateExist(indexTemplateName()); assertTemplateExists(indexTemplateName());
logger.debug("--> existing templates are old: new templates should be created"); logger.debug("--> existing templates are old: new templates should be created");
for (String template : monitoringTemplates().keySet()) { for (String template : monitoringTemplates().keySet()) {
assertTemplateExist(template); assertTemplateExists(template);
} }
assertPipelineExists(Exporter.EXPORT_PIPELINE_NAME);
doExporting(); doExporting();
@ -98,17 +110,20 @@ public abstract class AbstractExporterTemplateTestCase extends MonitoringIntegTe
putTemplate(indexTemplateName()); putTemplate(indexTemplateName());
putTemplate(dataTemplateName()); putTemplate(dataTemplateName());
putPipeline(Exporter.EXPORT_PIPELINE_NAME);
doExporting(); doExporting();
logger.debug("--> existing templates are up to date"); logger.debug("--> existing templates are up to date");
for (String template : monitoringTemplates().keySet()) { for (String template : monitoringTemplates().keySet()) {
assertTemplateExist(template); assertTemplateExists(template);
} }
assertPipelineExists(Exporter.EXPORT_PIPELINE_NAME);
logger.debug("--> existing templates has the same version: they should not be changed"); logger.debug("--> existing templates has the same version: they should not be changed");
assertTemplateNotUpdated(indexTemplateName()); assertTemplateNotUpdated(indexTemplateName());
assertTemplateNotUpdated(dataTemplateName()); assertTemplateNotUpdated(dataTemplateName());
assertPipelineNotUpdated(Exporter.EXPORT_PIPELINE_NAME);
doExporting(); doExporting();

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.monitoring.MonitoredSystem; import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringSettings; import org.elasticsearch.xpack.monitoring.MonitoringSettings;
@ -22,6 +23,7 @@ import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.security.InternalClient;
import org.junit.Before; import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -69,8 +71,7 @@ public class ExportersTests extends ESTestCase {
when(clusterService.getClusterSettings()).thenReturn(clusterSettings); when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
// we always need to have the local exporter as it serves as the default one // we always need to have the local exporter as it serves as the default one
factories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, clusterService, factories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, clusterService, mock(CleanerService.class)));
mock(CleanerService.class)));
exporters = new Exporters(Settings.EMPTY, factories, clusterService); exporters = new Exporters(Settings.EMPTY, factories, clusterService);
} }
@ -253,6 +254,14 @@ public class ExportersTests extends ESTestCase {
verifyNoMoreInteractions(exporters.getExporter("_name1")); verifyNoMoreInteractions(exporters.getExporter("_name1"));
} }
public void testEmptyPipeline() throws IOException {
String json = Exporter.emptyPipeline(XContentType.JSON).string();
// ensure the description starts with the API version
assertThat(json, containsString("\"description\":\"" + MonitoringTemplateUtils.TEMPLATE_VERSION + ":"));
assertThat(json, containsString("\"processors\":[]"));
}
/** /**
* This test creates N threads that export a random number of document * This test creates N threads that export a random number of document
* using a {@link Exporters} instance. * using a {@link Exporters} instance.

View File

@ -18,6 +18,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.monitoring.agent.exporter.AbstractExporterTemplateTestCase; import org.elasticsearch.xpack.monitoring.agent.exporter.AbstractExporterTemplateTestCase;
import org.elasticsearch.xpack.monitoring.agent.exporter.Exporter; import org.elasticsearch.xpack.monitoring.agent.exporter.Exporter;
import org.junit.After; import org.junit.After;
@ -72,16 +73,31 @@ public class HttpExporterTemplateTests extends AbstractExporterTemplateTestCase
dispatcher.templates.clear(); dispatcher.templates.clear();
} }
@Override
protected void deletePipeline() throws Exception {
dispatcher.pipelines.clear();
}
@Override @Override
protected void putTemplate(String name) throws Exception { protected void putTemplate(String name) throws Exception {
dispatcher.templates.put(name, generateTemplateSource(name)); dispatcher.templates.put(name, generateTemplateSource(name));
} }
@Override @Override
protected void assertTemplateExist(String name) throws Exception { protected void putPipeline(String name) throws Exception {
dispatcher.pipelines.put(name, Exporter.emptyPipeline(XContentType.JSON).bytes());
}
@Override
protected void assertTemplateExists(String name) throws Exception {
assertThat("failed to find a template matching [" + name + "]", dispatcher.templates.containsKey(name), is(true)); assertThat("failed to find a template matching [" + name + "]", dispatcher.templates.containsKey(name), is(true));
} }
@Override
protected void assertPipelineExists(String name) throws Exception {
assertThat("failed to find a pipeline matching [" + name + "]", dispatcher.pipelines.containsKey(name), is(true));
}
@Override @Override
protected void assertTemplateNotUpdated(String name) throws Exception { protected void assertTemplateNotUpdated(String name) throws Exception {
// Checks that no PUT Template request has been made // Checks that no PUT Template request has been made
@ -91,6 +107,15 @@ public class HttpExporterTemplateTests extends AbstractExporterTemplateTestCase
assertThat(dispatcher.templates.containsKey(name), is(true)); assertThat(dispatcher.templates.containsKey(name), is(true));
} }
@Override
protected void assertPipelineNotUpdated(String name) throws Exception {
// Checks that no PUT pipeline request has been made
assertThat(dispatcher.hasRequest("PUT", "/_ingest/pipeline/" + name), is(false));
// Checks that the current pipeline exists
assertThat(dispatcher.pipelines.containsKey(name), is(true));
}
@Override @Override
protected void awaitIndexExists(String index) throws Exception { protected void awaitIndexExists(String index) throws Exception {
Runnable busy = () -> assertThat("could not find index " + index, dispatcher.hasIndex(index), is(true)); Runnable busy = () -> assertThat("could not find index " + index, dispatcher.hasIndex(index), is(true));
@ -103,6 +128,7 @@ public class HttpExporterTemplateTests extends AbstractExporterTemplateTestCase
private final Set<String> requests = new HashSet<>(); private final Set<String> requests = new HashSet<>();
private final Map<String, BytesReference> templates = ConcurrentCollections.newConcurrentMap(); private final Map<String, BytesReference> templates = ConcurrentCollections.newConcurrentMap();
private final Map<String, BytesReference> pipelines = ConcurrentCollections.newConcurrentMap();
private final Set<String> indices = ConcurrentCollections.newConcurrentSet(); private final Set<String> indices = ConcurrentCollections.newConcurrentSet();
@Override @Override
@ -110,12 +136,11 @@ public class HttpExporterTemplateTests extends AbstractExporterTemplateTestCase
final String requestLine = request.getRequestLine(); final String requestLine = request.getRequestLine();
requests.add(requestLine); requests.add(requestLine);
switch (requestLine) {
// Cluster version // Cluster version
case "GET / HTTP/1.1": if ("GET / HTTP/1.1".equals(requestLine)) {
return newResponse(200, "{\"version\": {\"number\": \"" + Version.CURRENT.toString() + "\"}}"); return newResponse(200, "{\"version\": {\"number\": \"" + Version.CURRENT.toString() + "\"}}");
// Bulk // Bulk
case "POST /_bulk HTTP/1.1": } else if ("POST".equals(request.getMethod()) && request.getPath().startsWith("/_bulk")) {
// Parse the bulk request and extract all index names // Parse the bulk request and extract all index names
try { try {
BulkRequest bulk = new BulkRequest(); BulkRequest bulk = new BulkRequest();
@ -130,27 +155,37 @@ public class HttpExporterTemplateTests extends AbstractExporterTemplateTestCase
return newResponse(500, e.getMessage()); return newResponse(500, e.getMessage());
} }
return newResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}"); return newResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
default: // Templates and Pipelines
String[] paths = request.getPath().split("/"); } else if ("GET".equals(request.getMethod()) || "PUT".equals(request.getMethod())) {
final String[] paths = request.getPath().split("/");
if (paths.length > 2) {
// Templates // Templates
if ((paths != null) && (paths.length > 1) && ("_template".equals(paths[1]))) { if ("_template".equals(paths[1])) {
String templateName = paths[2]; // _template/{name}
boolean templateExist = templates.containsKey(templateName); return newResponseForType(templates, request, paths[2]);
} else if ("_ingest".equals(paths[1])) {
if ("GET".equals(request.getMethod())) { // _ingest/pipeline/{name}
return templateExist ? newResponse(200, templates.get(templateName).utf8ToString()) : NOT_FOUND; return newResponseForType(pipelines, request, paths[3]);
}
if ("PUT".equals(request.getMethod())) {
templates.put(templateName, new BytesArray(request.getBody().readByteArray()));
return templateExist ? newResponse(200, "updated") : newResponse(201, "created");
} }
} }
break;
} }
return newResponse(500, "MockServerDispatcher does not support: " + request.getRequestLine()); return newResponse(500, "MockServerDispatcher does not support: " + request.getRequestLine());
} }
private MockResponse newResponseForType(Map<String, BytesReference> type, RecordedRequest request, String name) {
final boolean exists = type.containsKey(name);
if ("GET".equals(request.getMethod())) {
return exists ? newResponse(200, type.get(name).utf8ToString()) : NOT_FOUND;
} else if ("PUT".equals(request.getMethod())) {
type.put(name, new BytesArray(request.getMethod()));
return exists ? newResponse(200, "updated") : newResponse(201, "created");
}
return newResponse(500, request.getMethod() + " " + request.getPath() + " is not supported");
}
MockResponse newResponse(int code, String body) { MockResponse newResponse(int code, String body) {
return new MockResponse().setResponseCode(code).setBody(body); return new MockResponse().setResponseCode(code).setBody(body);
} }

View File

@ -23,15 +23,16 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.xpack.monitoring.MonitoredSystem; import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringSettings; import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.agent.collector.cluster.ClusterStateMonitoringDoc; import org.elasticsearch.xpack.monitoring.agent.collector.cluster.ClusterStateMonitoringDoc;
import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndexRecoveryMonitoringDoc; import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndexRecoveryMonitoringDoc;
import org.elasticsearch.xpack.monitoring.agent.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.agent.exporter.Exporters; import org.elasticsearch.xpack.monitoring.agent.exporter.Exporters;
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc; import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringTemplateUtils; import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringTemplateUtils;
@ -90,26 +91,108 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
webServer.shutdown(); webServer.shutdown();
} }
private void assertMonitorTemplates() throws InterruptedException { private int expectedTemplateAndPipelineCalls(final boolean templateAlreadyExists, final boolean pipelineAlreadyExists) {
assertMonitorTemplates(null); return expectedTemplateCalls(templateAlreadyExists) + expectedPipelineCalls(pipelineAlreadyExists);
} }
private void assertMonitorTemplates(@Nullable final Map<String, String[]> customHeaders) throws InterruptedException { private int expectedTemplateCalls(final boolean alreadyExists) {
return monitoringTemplates().size() * (alreadyExists ? 1 : 2);
}
private int expectedPipelineCalls(final boolean alreadyExists) {
return alreadyExists ? 1 : 2;
}
private void assertMonitorVersion(final MockWebServer webServer) throws Exception {
assertMonitorVersion(webServer, null);
}
private void assertMonitorVersion(final MockWebServer webServer, @Nullable final Map<String, String[]> customHeaders)
throws Exception {
RecordedRequest request = webServer.takeRequest();
assertThat(request.getMethod(), equalTo("GET"));
assertThat(request.getPath(), equalTo("/"));
assertHeaders(request, customHeaders);
}
private void assertMonitorTemplatesAndPipeline(final MockWebServer webServer,
final boolean templateAlreadyExists, final boolean pipelineAlreadyExists)
throws Exception {
assertMonitorTemplatesAndPipeline(webServer, templateAlreadyExists, pipelineAlreadyExists, null);
}
private void assertMonitorTemplatesAndPipeline(final MockWebServer webServer,
final boolean templateAlreadyExists, final boolean pipelineAlreadyExists,
@Nullable final Map<String, String[]> customHeaders) throws Exception {
assertMonitorVersion(webServer, customHeaders);
assertMonitorTemplates(webServer, templateAlreadyExists, customHeaders);
assertMonitorPipelines(webServer, pipelineAlreadyExists, customHeaders);
}
private void assertMonitorTemplates(final MockWebServer webServer, final boolean alreadyExists,
@Nullable final Map<String, String[]> customHeaders) throws Exception {
RecordedRequest request; RecordedRequest request;
for (Map.Entry<String, String> template : monitoringTemplates().entrySet()) { for (Map.Entry<String, String> template : monitoringTemplates().entrySet()) {
request = webServer.takeRequest(); request = webServer.takeRequest();
assertThat(request.getMethod(), equalTo("GET")); assertThat(request.getMethod(), equalTo("GET"));
assertThat(request.getPath(), equalTo("/_template/" + template.getKey())); assertThat(request.getPath(), equalTo("/_template/" + template.getKey()));
assertHeaders(request, customHeaders); assertHeaders(request, customHeaders);
if (alreadyExists == false) {
request = webServer.takeRequest(); request = webServer.takeRequest();
assertThat(request.getMethod(), equalTo("PUT")); assertThat(request.getMethod(), equalTo("PUT"));
assertThat(request.getPath(), equalTo("/_template/" + template.getKey())); assertThat(request.getPath(), equalTo("/_template/" + template.getKey()));
assertThat(request.getBody().readUtf8(), equalTo(template.getValue())); assertThat(request.getBody().readUtf8(), equalTo(template.getValue()));
assertHeaders(request, customHeaders); assertHeaders(request, customHeaders);
} }
} }
}
private void assertMonitorPipelines(final MockWebServer webServer, final boolean alreadyExists,
@Nullable final Map<String, String[]> customHeaders) throws Exception {
RecordedRequest request = webServer.takeRequest();
assertThat(request.getMethod(), equalTo("GET"));
assertThat(request.getPath(), equalTo("/_ingest/pipeline/" + Exporter.EXPORT_PIPELINE_NAME));
assertHeaders(request, customHeaders);
if (alreadyExists == false) {
request = webServer.takeRequest();
assertThat(request.getMethod(), equalTo("PUT"));
assertThat(request.getPath(), equalTo("/_ingest/pipeline/" + Exporter.EXPORT_PIPELINE_NAME));
assertThat(request.getBody().readUtf8(), equalTo(Exporter.emptyPipeline(XContentType.JSON).string()));
assertHeaders(request, customHeaders);
}
}
private RecordedRequest assertBulk(final MockWebServer webServer) throws Exception {
return assertBulk(webServer, -1);
}
private RecordedRequest assertBulk(final MockWebServer webServer, final int docs) throws Exception {
return assertBulk(webServer, docs, null);
}
private RecordedRequest assertBulk(final MockWebServer webServer, final int docs, @Nullable final Map<String, String[]> customHeaders)
throws Exception {
RecordedRequest request = webServer.takeRequest();
assertThat(request.getMethod(), equalTo("POST"));
assertThat(request.getPath(), equalTo("/_bulk?pipeline=" + Exporter.EXPORT_PIPELINE_NAME));
assertHeaders(request, customHeaders);
if (docs != -1) {
assertBulkRequest(request.getBody(), docs);
}
return request;
}
private void assertHeaders(final RecordedRequest request, final Map<String, String[]> customHeaders) { private void assertHeaders(final RecordedRequest request, final Map<String, String[]> customHeaders) {
if (customHeaders != null) { if (customHeaders != null) {
@ -126,11 +209,12 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
} }
public void testExport() throws Exception { public void testExport() throws Exception {
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final int expectedTemplateAndPipelineCalls = expectedTemplateAndPipelineCalls(templatesExistsAlready, pipelineExistsAlready);
enqueueGetClusterVersionResponse(Version.CURRENT); enqueueGetClusterVersionResponse(Version.CURRENT);
for (String template : monitoringTemplates().keySet()) { enqueueTemplateAndPipelineResponses(webServer, templatesExistsAlready, pipelineExistsAlready);
enqueueResponse(404, "template [" + template + "] does not exist");
enqueueResponse(201, "template [" + template + "] created");
}
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}"); enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
Settings.Builder builder = Settings.builder() Settings.Builder builder = Settings.builder()
@ -145,22 +229,16 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
final int nbDocs = randomIntBetween(1, 25); final int nbDocs = randomIntBetween(1, 25);
export(newRandomMonitoringDocs(nbDocs)); export(newRandomMonitoringDocs(nbDocs));
assertThat(webServer.getRequestCount(), equalTo(2 + monitoringTemplates().size() * 2)); assertThat(webServer.getRequestCount(), equalTo(2 + expectedTemplateAndPipelineCalls));
assertMonitorTemplatesAndPipeline(webServer, templatesExistsAlready, pipelineExistsAlready);
RecordedRequest recordedRequest = webServer.takeRequest(); assertBulk(webServer, nbDocs);
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/"));
assertMonitorTemplates();
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("POST"));
assertThat(recordedRequest.getPath(), equalTo("/_bulk"));
assertBulkRequest(recordedRequest.getBody(), nbDocs);
} }
public void testExportWithHeaders() throws Exception { public void testExportWithHeaders() throws Exception {
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final int expectedTemplateAndPipelineCalls = expectedTemplateAndPipelineCalls(templatesExistsAlready, pipelineExistsAlready);
final String headerValue = randomAsciiOfLengthBetween(3, 9); final String headerValue = randomAsciiOfLengthBetween(3, 9);
final String[] array = generateRandomStringArray(2, 4, false); final String[] array = generateRandomStringArray(2, 4, false);
@ -171,10 +249,7 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
headers.put("Array-Check", array); headers.put("Array-Check", array);
enqueueGetClusterVersionResponse(Version.CURRENT); enqueueGetClusterVersionResponse(Version.CURRENT);
for (String template : monitoringTemplates().keySet()) { enqueueTemplateAndPipelineResponses(webServer, templatesExistsAlready, pipelineExistsAlready);
enqueueResponse(404, "template [" + template + "] does not exist");
enqueueResponse(201, "template [" + template + "] created");
}
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}"); enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
Settings.Builder builder = Settings.builder() Settings.Builder builder = Settings.builder()
@ -192,21 +267,9 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
final int nbDocs = randomIntBetween(1, 25); final int nbDocs = randomIntBetween(1, 25);
export(newRandomMonitoringDocs(nbDocs)); export(newRandomMonitoringDocs(nbDocs));
assertThat(webServer.getRequestCount(), equalTo(2 + monitoringTemplates().size() * 2)); assertThat(webServer.getRequestCount(), equalTo(2 + expectedTemplateAndPipelineCalls));
assertMonitorTemplatesAndPipeline(webServer, templatesExistsAlready, pipelineExistsAlready);
RecordedRequest recordedRequest = webServer.takeRequest(); assertBulk(webServer, nbDocs, headers);
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/"));
assertHeaders(recordedRequest, headers);
assertMonitorTemplates(headers);
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("POST"));
assertThat(recordedRequest.getPath(), equalTo("/_bulk"));
assertHeaders(recordedRequest, headers);
assertBulkRequest(recordedRequest.getBody(), nbDocs);
} }
public void testDynamicHostChange() { public void testDynamicHostChange() {
@ -234,6 +297,9 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
} }
public void testHostChangeReChecksTemplate() throws Exception { public void testHostChangeReChecksTemplate() throws Exception {
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final int expectedTemplateAndPipelineCalls = expectedTemplateAndPipelineCalls(templatesExistsAlready, pipelineExistsAlready);
Settings.Builder builder = Settings.builder() Settings.Builder builder = Settings.builder()
.put(MonitoringSettings.INTERVAL.getKey(), "-1") .put(MonitoringSettings.INTERVAL.getKey(), "-1")
@ -243,10 +309,7 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
.put("xpack.monitoring.exporters._http.update_mappings", false); .put("xpack.monitoring.exporters._http.update_mappings", false);
enqueueGetClusterVersionResponse(Version.CURRENT); enqueueGetClusterVersionResponse(Version.CURRENT);
for (String template : monitoringTemplates().keySet()) { enqueueTemplateAndPipelineResponses(webServer, templatesExistsAlready, pipelineExistsAlready);
enqueueResponse(404, "template [" + template + "] does not exist");
enqueueResponse(201, "template [" + template + "] created");
}
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}"); enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
String agentNode = internalCluster().startNode(builder); String agentNode = internalCluster().startNode(builder);
@ -256,23 +319,16 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
export(Collections.singletonList(newRandomMonitoringDoc())); export(Collections.singletonList(newRandomMonitoringDoc()));
assertThat(exporter.supportedClusterVersion, is(true)); assertThat(exporter.supportedClusterVersion, is(true));
assertThat(webServer.getRequestCount(), equalTo(2 + monitoringTemplates().size() * 2)); assertThat(webServer.getRequestCount(), equalTo(2 + expectedTemplateAndPipelineCalls));
assertMonitorTemplatesAndPipeline(webServer, templatesExistsAlready, pipelineExistsAlready);
assertBulk(webServer);
RecordedRequest recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/"));
assertMonitorTemplates();
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("POST"));
assertThat(recordedRequest.getPath(), equalTo("/_bulk"));
logger.info("--> setting up another web server");
MockWebServer secondWebServer = null; MockWebServer secondWebServer = null;
int secondWebPort; int secondWebPort;
try { try {
final int expectedPipelineCalls = expectedPipelineCalls(!pipelineExistsAlready);
for (secondWebPort = 9250; secondWebPort < 9300; secondWebPort++) { for (secondWebPort = 9250; secondWebPort < 9300; secondWebPort++) {
try { try {
secondWebServer = new MockWebServer(); secondWebServer = new MockWebServer();
@ -298,25 +354,23 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
enqueueGetClusterVersionResponse(secondWebServer, Version.CURRENT); enqueueGetClusterVersionResponse(secondWebServer, Version.CURRENT);
for (String template : monitoringTemplates().keySet()) { for (String template : monitoringTemplates().keySet()) {
if (template.contains(MonitoringBulkTimestampedResolver.Data.DATA)) { if (template.contains(MonitoringBulkTimestampedResolver.Data.DATA)) {
enqueueResponse(secondWebServer, 200, "template [" + template + "] exist"); enqueueResponse(secondWebServer, 200, "template [" + template + "] exists");
} else { } else {
enqueueResponse(secondWebServer, 404, "template [" + template + "] does not exist"); enqueueResponse(secondWebServer, 404, "template [" + template + "] does not exist");
enqueueResponse(secondWebServer, 201, "template [" + template + "] created"); enqueueResponse(secondWebServer, 201, "template [" + template + "] created");
} }
} }
enqueuePipelineResponses(secondWebServer, !pipelineExistsAlready);
enqueueResponse(secondWebServer, 200, "{\"errors\": false, \"msg\": \"successful bulk request\"}"); enqueueResponse(secondWebServer, 200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
logger.info("--> exporting a second event"); logger.info("--> exporting a second event");
export(Collections.singletonList(newRandomMonitoringDoc())); export(Collections.singletonList(newRandomMonitoringDoc()));
assertThat(secondWebServer.getRequestCount(), equalTo(2 + monitoringTemplates().size() * 2 - 1)); assertThat(secondWebServer.getRequestCount(), equalTo(2 + monitoringTemplates().size() * 2 - 1 + expectedPipelineCalls));
assertMonitorVersion(secondWebServer);
recordedRequest = secondWebServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/"));
for (Map.Entry<String, String> template : monitoringTemplates().entrySet()) { for (Map.Entry<String, String> template : monitoringTemplates().entrySet()) {
recordedRequest = secondWebServer.takeRequest(); RecordedRequest recordedRequest = secondWebServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET")); assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + template.getKey())); assertThat(recordedRequest.getPath(), equalTo("/_template/" + template.getKey()));
@ -327,11 +381,8 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
assertThat(recordedRequest.getBody().readUtf8(), equalTo(template.getValue())); assertThat(recordedRequest.getBody().readUtf8(), equalTo(template.getValue()));
} }
} }
assertMonitorPipelines(secondWebServer, !pipelineExistsAlready, null);
recordedRequest = secondWebServer.takeRequest(); assertBulk(secondWebServer);
assertThat(recordedRequest.getMethod(), equalTo("POST"));
assertThat(recordedRequest.getPath(), equalTo("/_bulk"));
} finally { } finally {
if (secondWebServer != null) { if (secondWebServer != null) {
secondWebServer.shutdown(); secondWebServer.shutdown();
@ -359,12 +410,14 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
assertThat(exporter.supportedClusterVersion, is(false)); assertThat(exporter.supportedClusterVersion, is(false));
assertThat(webServer.getRequestCount(), equalTo(1)); assertThat(webServer.getRequestCount(), equalTo(1));
RecordedRequest recordedRequest = webServer.takeRequest(); assertMonitorVersion(webServer);
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/"));
} }
public void testDynamicIndexFormatChange() throws Exception { public void testDynamicIndexFormatChange() throws Exception {
final boolean templatesExistsAlready = randomBoolean();
final boolean pipelineExistsAlready = randomBoolean();
final int expectedTemplateAndPipelineCalls = expectedTemplateAndPipelineCalls(templatesExistsAlready, pipelineExistsAlready);
Settings.Builder builder = Settings.builder() Settings.Builder builder = Settings.builder()
.put(MonitoringSettings.INTERVAL.getKey(), "-1") .put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put("xpack.monitoring.exporters._http.type", "http") .put("xpack.monitoring.exporters._http.type", "http")
@ -374,13 +427,8 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
String agentNode = internalCluster().startNode(builder); String agentNode = internalCluster().startNode(builder);
logger.info("--> exporting a first event");
enqueueGetClusterVersionResponse(Version.CURRENT); enqueueGetClusterVersionResponse(Version.CURRENT);
for (String template : monitoringTemplates().keySet()) { enqueueTemplateAndPipelineResponses(webServer, templatesExistsAlready, pipelineExistsAlready);
enqueueResponse(404, "template [" + template + "] does not exist");
enqueueResponse(201, "template [" + template + "] created");
}
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}"); enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
HttpExporter exporter = getExporter(agentNode); HttpExporter exporter = getExporter(agentNode);
@ -388,30 +436,12 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
MonitoringDoc doc = newRandomMonitoringDoc(); MonitoringDoc doc = newRandomMonitoringDoc();
export(Collections.singletonList(doc)); export(Collections.singletonList(doc));
final int expectedRequests = 2 + monitoringTemplates().size() * 2; final int expectedRequests = 2 + expectedTemplateAndPipelineCalls;
assertThat(webServer.getRequestCount(), equalTo(expectedRequests)); assertThat(webServer.getRequestCount(), equalTo(expectedRequests));
assertMonitorTemplatesAndPipeline(webServer, templatesExistsAlready, pipelineExistsAlready);
RecordedRequest recordedRequest = webServer.takeRequest(); RecordedRequest recordedRequest = assertBulk(webServer);
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/"));
for (Map.Entry<String, String> template : monitoringTemplates().entrySet()) {
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + template.getKey()));
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + template.getKey()));
assertThat(recordedRequest.getBody().readUtf8(), equalTo(template.getValue()));
}
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("POST"));
assertThat(recordedRequest.getPath(), equalTo("/_bulk"));
String indexName = exporter.getResolvers().getResolver(doc).index(doc); String indexName = exporter.getResolvers().getResolver(doc).index(doc);
logger.info("--> checks that the document in the bulk request is indexed in [{}]", indexName);
byte[] bytes = recordedRequest.getBody().readByteArray(); byte[] bytes = recordedRequest.getBody().readByteArray();
Map<String, Object> data = XContentHelper.convertToMap(new BytesArray(bytes), false).v2(); Map<String, Object> data = XContentHelper.convertToMap(new BytesArray(bytes), false).v2();
@ -419,17 +449,11 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
assertThat(index.get("_index"), equalTo(indexName)); assertThat(index.get("_index"), equalTo(indexName));
String newTimeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM"); String newTimeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM");
logger.info("--> updating index time format setting to {}", newTimeFormat);
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put("xpack.monitoring.exporters._http.index.name.time_format", newTimeFormat))); .put("xpack.monitoring.exporters._http.index.name.time_format", newTimeFormat)));
logger.info("--> exporting a second event");
enqueueGetClusterVersionResponse(Version.CURRENT); enqueueGetClusterVersionResponse(Version.CURRENT);
for (String template : monitoringTemplates().keySet()) { enqueueTemplateAndPipelineResponses(webServer, true, true);
enqueueResponse(200, "template [" + template + "] exist");
}
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}"); enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
doc = newRandomMonitoringDoc(); doc = newRandomMonitoringDoc();
@ -438,23 +462,10 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
String expectedMonitoringIndex = ".monitoring-es-" + MonitoringTemplateUtils.TEMPLATE_VERSION + "-" String expectedMonitoringIndex = ".monitoring-es-" + MonitoringTemplateUtils.TEMPLATE_VERSION + "-"
+ DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(doc.getTimestamp()); + DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(doc.getTimestamp());
assertThat(webServer.getRequestCount(), equalTo(expectedRequests + 2 + monitoringTemplates().size())); final int expectedTemplatesAndPipelineExists = expectedTemplateAndPipelineCalls(true, true);
assertThat(webServer.getRequestCount(), equalTo(expectedRequests + 2 + expectedTemplatesAndPipelineExists));
recordedRequest = webServer.takeRequest(); assertMonitorTemplatesAndPipeline(webServer, true, true);
assertThat(recordedRequest.getMethod(), equalTo("GET")); recordedRequest = assertBulk(webServer);
assertThat(recordedRequest.getPath(), equalTo("/"));
for (String template : monitoringTemplates().keySet()) {
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + template));
}
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("POST"));
assertThat(recordedRequest.getPath(), equalTo("/_bulk"));
logger.info("--> checks that the document in the bulk request is indexed in [{}]", expectedMonitoringIndex);
bytes = recordedRequest.getBody().readByteArray(); bytes = recordedRequest.getBody().readByteArray();
data = XContentHelper.convertToMap(new BytesArray(bytes), false).v2(); data = XContentHelper.convertToMap(new BytesArray(bytes), false).v2();
@ -536,6 +547,51 @@ public class HttpExporterTests extends MonitoringIntegTestCase {
.utf8ToString())); .utf8ToString()));
} }
private void enqueueTemplateAndPipelineResponses(final MockWebServer webServer,
final boolean templatesAlreadyExists, final boolean pipelineAlreadyExists)
throws IOException {
enqueueTemplateResponses(webServer, templatesAlreadyExists);
enqueuePipelineResponses(webServer, pipelineAlreadyExists);
}
private void enqueueTemplateResponses(final MockWebServer webServer, final boolean alreadyExists) throws IOException {
if (alreadyExists) {
enqueueTemplateResponsesExistsAlready(webServer);
} else {
enqueueTemplateResponsesDoesNotExistYet(webServer);
}
}
private void enqueueTemplateResponsesDoesNotExistYet(final MockWebServer webServer) throws IOException {
for (String template : monitoringTemplates().keySet()) {
enqueueResponse(webServer, 404, "template [" + template + "] does not exist");
enqueueResponse(webServer, 201, "template [" + template + "] created");
}
}
private void enqueueTemplateResponsesExistsAlready(final MockWebServer webServer) throws IOException {
for (String template : monitoringTemplates().keySet()) {
enqueueResponse(webServer, 200, "template [" + template + "] exists");
}
}
private void enqueuePipelineResponses(final MockWebServer webServer, final boolean alreadyExists) throws IOException {
if (alreadyExists) {
enqueuePipelineResponsesExistsAlready(webServer);
} else {
enqueuePipelineResponsesDoesNotExistYet(webServer);
}
}
private void enqueuePipelineResponsesDoesNotExistYet(final MockWebServer webServer) throws IOException {
enqueueResponse(webServer, 404, "pipeline [" + Exporter.EXPORT_PIPELINE_NAME + "] does not exist");
enqueueResponse(webServer, 201, "pipeline [" + Exporter.EXPORT_PIPELINE_NAME + "] created");
}
private void enqueuePipelineResponsesExistsAlready(final MockWebServer webServer) throws IOException {
enqueueResponse(webServer, 200, "pipeline [" + Exporter.EXPORT_PIPELINE_NAME + "] exists");
}
private void enqueueResponse(int responseCode, String body) throws IOException { private void enqueueResponse(int responseCode, String body) throws IOException {
enqueueResponse(webServer, responseCode, body); enqueueResponse(webServer, responseCode, body);
} }

View File

@ -5,10 +5,16 @@
*/ */
package org.elasticsearch.xpack.monitoring.agent.exporter.local; package org.elasticsearch.xpack.monitoring.agent.exporter.local;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.xpack.monitoring.agent.exporter.AbstractExporterTemplateTestCase; import org.elasticsearch.xpack.monitoring.agent.exporter.AbstractExporterTemplateTestCase;
import org.elasticsearch.xpack.monitoring.agent.exporter.Exporter;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -25,6 +31,12 @@ public class LocalExporterTemplateTests extends AbstractExporterTemplateTestCase
cluster().wipeAllTemplates(Collections.emptySet()); cluster().wipeAllTemplates(Collections.emptySet());
} }
@Override
protected void deletePipeline() throws Exception {
waitNoPendingTasksOnAll();
cluster().client().admin().cluster().deletePipeline(new DeletePipelineRequest(Exporter.EXPORT_PIPELINE_NAME));
}
@Override @Override
protected void putTemplate(String name) throws Exception { protected void putTemplate(String name) throws Exception {
waitNoPendingTasksOnAll(); waitNoPendingTasksOnAll();
@ -32,14 +44,44 @@ public class LocalExporterTemplateTests extends AbstractExporterTemplateTestCase
} }
@Override @Override
protected void assertTemplateExist(String name) throws Exception { protected void putPipeline(String name) throws Exception {
waitNoPendingTasksOnAll();
assertAcked(client().admin().cluster().preparePutPipeline(name, Exporter.emptyPipeline(XContentType.JSON).bytes()).get());
}
@Override
protected void assertTemplateExists(String name) throws Exception {
waitNoPendingTasksOnAll(); waitNoPendingTasksOnAll();
waitForMonitoringTemplate(name); waitForMonitoringTemplate(name);
} }
@Override
protected void assertPipelineExists(String name) throws Exception {
waitNoPendingTasksOnAll();
assertPipelineInstalled(name);
}
private void assertPipelineInstalled(String name) throws Exception {
assertBusy(() -> {
boolean found = false;
for (PipelineConfiguration pipeline : client().admin().cluster().prepareGetPipeline(name).get().pipelines()) {
if (Regex.simpleMatch(name, pipeline.getId())) {
found = true;
}
}
assertTrue("failed to find a pipeline matching [" + name + "]", found);
}, 30, TimeUnit.SECONDS);
}
@Override @Override
protected void assertTemplateNotUpdated(String name) throws Exception { protected void assertTemplateNotUpdated(String name) throws Exception {
waitNoPendingTasksOnAll(); waitNoPendingTasksOnAll();
assertTemplateExist(name); assertTemplateExists(name);
}
@Override
protected void assertPipelineNotUpdated(String name) throws Exception {
waitNoPendingTasksOnAll();
assertPipelineExists(name);
} }
} }

View File

@ -511,6 +511,7 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
" cluster: [ 'cluster:monitor/nodes/info', 'cluster:monitor/state', 'cluster:monitor/health', 'cluster:monitor/stats'," + " cluster: [ 'cluster:monitor/nodes/info', 'cluster:monitor/state', 'cluster:monitor/health', 'cluster:monitor/stats'," +
" 'cluster:admin/settings/update', 'cluster:admin/repository/delete', 'cluster:monitor/nodes/liveness'," + " 'cluster:admin/settings/update', 'cluster:admin/repository/delete', 'cluster:monitor/nodes/liveness'," +
" 'indices:admin/template/get', 'indices:admin/template/put', 'indices:admin/template/delete'," + " 'indices:admin/template/get', 'indices:admin/template/put', 'indices:admin/template/delete'," +
" 'cluster:admin/ingest/pipeline/get', 'cluster:admin/ingest/pipeline/put', 'cluster:admin/ingest/pipeline/delete'," +
" 'cluster:monitor/task', 'cluster:admin/xpack/monitoring/bulk' ]\n" + " 'cluster:monitor/task', 'cluster:admin/xpack/monitoring/bulk' ]\n" +
" indices:\n" + " indices:\n" +
" - names: '*'\n" + " - names: '*'\n" +

View File

@ -23,7 +23,7 @@ monitoring_user:
# Monitoring remote agent role. Assign to the agent user on the remote monitoring cluster # Monitoring remote agent role. Assign to the agent user on the remote monitoring cluster
# to which the monitoring agent will export all its data # to which the monitoring agent will export all its data
remote_monitoring_agent: remote_monitoring_agent:
cluster: [ "manage_index_templates", "monitor" ] cluster: [ "manage_index_templates", "manage_ingest_pipelines", "monitor" ]
indices: indices:
- names: - names:
- '.marvel-es-*' - '.marvel-es-*'

View File

@ -30,6 +30,7 @@ public class ClusterPrivilege extends AbstractAutomatonPrivilege<ClusterPrivileg
private static final Automaton MANAGE_AUTOMATON = minusAndDeterminize(ALL_CLUSTER_AUTOMATON, MANAGE_SECURITY_AUTOMATON); private static final Automaton MANAGE_AUTOMATON = minusAndDeterminize(ALL_CLUSTER_AUTOMATON, MANAGE_SECURITY_AUTOMATON);
private static final Automaton TRANSPORT_CLIENT_AUTOMATON = patterns("cluster:monitor/nodes/liveness", "cluster:monitor/state"); private static final Automaton TRANSPORT_CLIENT_AUTOMATON = patterns("cluster:monitor/nodes/liveness", "cluster:monitor/state");
private static final Automaton MANAGE_IDX_TEMPLATE_AUTOMATON = patterns("indices:admin/template/*"); private static final Automaton MANAGE_IDX_TEMPLATE_AUTOMATON = patterns("indices:admin/template/*");
private static final Automaton MANAGE_INGEST_PIPELINE_AUTOMATON = patterns("cluster:admin/ingest/pipeline/*");
public static final ClusterPrivilege NONE = new ClusterPrivilege(Name.NONE, Automatons.EMPTY); public static final ClusterPrivilege NONE = new ClusterPrivilege(Name.NONE, Automatons.EMPTY);
public static final ClusterPrivilege ALL = new ClusterPrivilege(Name.ALL, ALL_CLUSTER_AUTOMATON); public static final ClusterPrivilege ALL = new ClusterPrivilege(Name.ALL, ALL_CLUSTER_AUTOMATON);
@ -37,6 +38,8 @@ public class ClusterPrivilege extends AbstractAutomatonPrivilege<ClusterPrivileg
public static final ClusterPrivilege MANAGE = new ClusterPrivilege("manage", MANAGE_AUTOMATON); public static final ClusterPrivilege MANAGE = new ClusterPrivilege("manage", MANAGE_AUTOMATON);
public static final ClusterPrivilege MANAGE_IDX_TEMPLATES = public static final ClusterPrivilege MANAGE_IDX_TEMPLATES =
new ClusterPrivilege("manage_index_templates", MANAGE_IDX_TEMPLATE_AUTOMATON); new ClusterPrivilege("manage_index_templates", MANAGE_IDX_TEMPLATE_AUTOMATON);
public static final ClusterPrivilege MANAGE_INGEST_PIPELINES =
new ClusterPrivilege("manage_ingest_pipelines", MANAGE_INGEST_PIPELINE_AUTOMATON);
public static final ClusterPrivilege TRANSPORT_CLIENT = new ClusterPrivilege("transport_client", TRANSPORT_CLIENT_AUTOMATON); public static final ClusterPrivilege TRANSPORT_CLIENT = new ClusterPrivilege("transport_client", TRANSPORT_CLIENT_AUTOMATON);
public static final ClusterPrivilege MANAGE_SECURITY = new ClusterPrivilege("manage_security", MANAGE_SECURITY_AUTOMATON); public static final ClusterPrivilege MANAGE_SECURITY = new ClusterPrivilege("manage_security", MANAGE_SECURITY_AUTOMATON);
@ -50,6 +53,7 @@ public class ClusterPrivilege extends AbstractAutomatonPrivilege<ClusterPrivileg
values.add(MONITOR); values.add(MONITOR);
values.add(MANAGE); values.add(MANAGE);
values.add(MANAGE_IDX_TEMPLATES); values.add(MANAGE_IDX_TEMPLATES);
values.add(MANAGE_INGEST_PIPELINES);
values.add(TRANSPORT_CLIENT); values.add(TRANSPORT_CLIENT);
values.add(MANAGE_SECURITY); values.add(MANAGE_SECURITY);
} }

View File

@ -23,7 +23,7 @@ monitoring_user:
# Monitoring remote agent role. Assign to the agent user on the remote monitoring cluster # Monitoring remote agent role. Assign to the agent user on the remote monitoring cluster
# to which the monitoring agent will export all its data # to which the monitoring agent will export all its data
remote_monitoring_agent: remote_monitoring_agent:
cluster: [ "manage_index_templates" ] cluster: [ "manage_index_templates", "manage_ingest_pipelines" ]
indices: indices:
- names: - names:
- '.marvel-es-*' - '.marvel-es-*'