From fe97d2ba51df65ef31e7cbbd7a517dc48164ad32 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 15 Mar 2016 21:20:12 +0100 Subject: [PATCH] Monitoring: Add REST endpoint to allow external systems to index monitoring data Original commit: elastic/x-pack-elasticsearch@04aa96a228ae25498c1d4b0bccc007f745efb181 --- .../java/org/elasticsearch/marvel/Marvel.java | 66 ++-- .../marvel/action/MonitoringBulkAction.java | 30 ++ .../marvel/action/MonitoringBulkDoc.java | 80 +++++ .../marvel/action/MonitoringBulkRequest.java | 128 ++++++++ .../action/MonitoringBulkRequestBuilder.java | 29 ++ .../marvel/action/MonitoringBulkResponse.java | 138 +++++++++ .../action/TransportMonitoringBulkAction.java | 127 ++++++++ .../marvel/agent/AgentService.java | 3 + .../marvel/agent/exporter/ExportBulk.java | 43 +-- .../agent/exporter/ExportException.java | 76 +++++ .../marvel/agent/exporter/Exporters.java | 7 +- .../marvel/agent/exporter/MonitoringDoc.java | 69 ++--- .../agent/exporter/http/HttpExporter.java | 93 +++--- .../agent/exporter/local/LocalBulk.java | 82 +++-- .../agent/exporter/local/LocalExporter.java | 2 +- .../agent/resolver/ResolversRegistry.java | 36 ++- .../resolver/bulk/MonitoringBulkResolver.java | 36 +++ .../marvel/client/MonitoringClient.java | 41 ++- .../marvel/rest/MonitoringRestHandler.java | 32 ++ .../rest/action/RestMonitoringBulkAction.java | 86 +++++ .../init/proxy/MonitoringClientProxy.java | 5 - .../src/main/resources/monitoring-data.json | 6 +- .../src/main/resources/monitoring-es.json | 1 - .../marvel/MarvelPluginClientTests.java | 4 +- .../marvel/action/MonitoringBulkDocTests.java | 104 +++++++ .../action/MonitoringBulkRequestTests.java | 193 ++++++++++++ .../action/MonitoringBulkResponseTests.java | 73 +++++ .../marvel/action/MonitoringBulkTests.java | 147 +++++++++ .../TransportMonitoringBulkActionTests.java | 293 ++++++++++++++++++ .../marvel/agent/exporter/ExportersTests.java | 5 +- .../agent/exporter/MonitoringDocTests.java | 4 +- .../exporter/local/LocalExporterTests.java | 11 +- .../MonitoringIndexNameResolverTestCase.java | 2 +- .../bulk/MonitoringBulkResolverTests.java | 69 +++++ .../marvel/test/MarvelIntegTestCase.java | 6 +- .../org/elasticsearch/xpack/XPackPlugin.java | 2 + 36 files changed, 1929 insertions(+), 200 deletions(-) create mode 100644 elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkAction.java create mode 100644 elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkDoc.java create mode 100644 elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkRequest.java create mode 100644 elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkRequestBuilder.java create mode 100644 elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkResponse.java create mode 100644 elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/TransportMonitoringBulkAction.java create mode 100644 elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportException.java create mode 100644 elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/resolver/bulk/MonitoringBulkResolver.java create mode 100644 elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/rest/MonitoringRestHandler.java create mode 100644 elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/rest/action/RestMonitoringBulkAction.java create mode 100644 elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkDocTests.java create mode 100644 elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkRequestTests.java create mode 100644 elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkResponseTests.java create mode 100644 elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkTests.java create mode 100644 elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/TransportMonitoringBulkActionTests.java create mode 100644 elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/resolver/bulk/MonitoringBulkResolverTests.java diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/Marvel.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/Marvel.java index 5ba5e8f7848..6440795572d 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/Marvel.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/Marvel.java @@ -5,13 +5,14 @@ */ package org.elasticsearch.marvel; -import org.elasticsearch.client.Client; +import org.elasticsearch.action.ActionModule; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; +import org.elasticsearch.marvel.action.MonitoringBulkAction; +import org.elasticsearch.marvel.action.TransportMonitoringBulkAction; import org.elasticsearch.marvel.agent.AgentService; import org.elasticsearch.marvel.agent.collector.CollectorModule; import org.elasticsearch.marvel.agent.exporter.ExporterModule; @@ -19,49 +20,58 @@ import org.elasticsearch.marvel.cleaner.CleanerService; import org.elasticsearch.marvel.client.MonitoringClientModule; import org.elasticsearch.marvel.license.LicenseModule; import org.elasticsearch.marvel.license.MarvelLicensee; +import org.elasticsearch.marvel.rest.action.RestMonitoringBulkAction; import org.elasticsearch.marvel.support.init.proxy.MonitoringClientProxy; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.common.init.LazyInitializationModule; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.List; +/** + * This class activates/deactivates the monitoring modules depending if we're running a node client, transport client or tribe client: + * - node clients: all modules are binded + * - transport clients: only action/transport actions are binded + * - tribe clients: everything is disables by default but can be enabled per tribe cluster + */ public class Marvel { - private static final ESLogger logger = Loggers.getLogger(XPackPlugin.class); - public static final String NAME = "monitoring"; private final Settings settings; private final boolean enabled; + private final boolean transportClientMode; public Marvel(Settings settings) { this.settings = settings; - this.enabled = enabled(settings); + this.enabled = MarvelSettings.ENABLED.get(settings); + this.transportClientMode = XPackPlugin.transportClientMode(settings); } boolean isEnabled() { return enabled; } - public Collection nodeModules() { - List modules = new ArrayList<>(); + boolean isTransportClient() { + return transportClientMode; + } - if (enabled) { - modules.add(new MarvelModule()); - modules.add(new LicenseModule()); - modules.add(new CollectorModule()); - modules.add(new ExporterModule(settings)); - modules.add(new MonitoringClientModule()); + public Collection nodeModules() { + if (enabled == false || transportClientMode) { + return Collections.emptyList(); } - return Collections.unmodifiableList(modules); + + return Arrays.asList( + new MarvelModule(), + new LicenseModule(), + new CollectorModule(), + new ExporterModule(settings), + new MonitoringClientModule()); } public Collection> nodeServices() { - if (enabled == false) { + if (enabled == false || transportClientMode) { return Collections.emptyList(); } return Arrays.>asList(MarvelLicensee.class, @@ -69,18 +79,22 @@ public class Marvel { CleanerService.class); } - public static boolean enabled(Settings settings) { - if ("node".equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())) == false) { - logger.trace("monitoring cannot be started on a transport client"); - return false; - } - return MarvelSettings.ENABLED.get(settings); - } - public void onModule(SettingsModule module) { MarvelSettings.register(module); } + public void onModule(ActionModule module) { + if (enabled) { + module.registerAction(MonitoringBulkAction.INSTANCE, TransportMonitoringBulkAction.class); + } + } + + public void onModule(NetworkModule module) { + if (enabled && transportClientMode == false) { + module.registerRestHandler(RestMonitoringBulkAction.class); + } + } + public void onModule(LazyInitializationModule module) { if (enabled) { module.registerLazyInitializable(MonitoringClientProxy.class); diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkAction.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkAction.java new file mode 100644 index 00000000000..c0a8f005122 --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkAction.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +public class MonitoringBulkAction extends Action { + + public static final MonitoringBulkAction INSTANCE = new MonitoringBulkAction(); + public static final String NAME = "cluster:admin/xpack/monitoring/bulk"; + + private MonitoringBulkAction() { + super(NAME); + } + + @Override + public MonitoringBulkRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new MonitoringBulkRequestBuilder(client); + } + + @Override + public MonitoringBulkResponse newResponse() { + return new MonitoringBulkResponse(); + } +} + diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkDoc.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkDoc.java new file mode 100644 index 00000000000..96fcce29ab2 --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkDoc.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.action; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; + +import java.io.IOException; + +public class MonitoringBulkDoc extends MonitoringDoc { + + private String index; + private String type; + private String id; + + private BytesReference source; + + public MonitoringBulkDoc(String monitoringId, String monitoringVersion) { + super(monitoringId, monitoringVersion); + } + + public MonitoringBulkDoc(StreamInput in) throws IOException { + super(in); + index = in.readOptionalString(); + type = in.readOptionalString(); + id = in.readOptionalString(); + source = in.readBytesReference(); + } + + public String getIndex() { + return index; + } + + public void setIndex(String index) { + this.index = index; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public BytesReference getSource() { + return source; + } + + public void setSource(BytesReference source) { + this.source = source; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(index); + out.writeOptionalString(type); + out.writeOptionalString(id); + out.writeBytesReference(source); + } + + @Override + public MonitoringBulkDoc readFrom(StreamInput in) throws IOException { + return new MonitoringBulkDoc(in); + } +} diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkRequest.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkRequest.java new file mode 100644 index 00000000000..27a0626fd6e --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkRequest.java @@ -0,0 +1,128 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * A monitoring bulk request holds one or more {@link MonitoringBulkDoc}s. + *

+ * Every monitoring document added to the request is associated to a monitoring system id and version. If this {id, version} pair is + * supported by the monitoring plugin, the monitoring documents will be indexed in a single batch using a normal bulk request. + *

+ * The monitoring {id, version} pair is used by {org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolver} to resolve the index, + * type and id of the final document to be indexed. A {@link MonitoringBulkDoc} can also hold its own index/type/id values but there's no + * guarantee that these information will be effectively used. + */ +public class MonitoringBulkRequest extends ActionRequest { + + final List docs = new ArrayList<>(); + + /** + * @return the list of monitoring documents to be indexed + */ + public Collection getDocs() { + return Collections.unmodifiableCollection(new ArrayList<>(this.docs)); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (docs.isEmpty()) { + validationException = addValidationError("no monitoring documents added", validationException); + } + + for (int i = 0; i < docs.size(); i++) { + MonitoringBulkDoc doc = docs.get(i); + if (Strings.hasLength(doc.getMonitoringId()) == false) { + validationException = addValidationError("monitored system id is missing for monitoring document [" + i + "]", + validationException); + } + if (Strings.hasLength(doc.getMonitoringVersion()) == false) { + validationException = addValidationError("monitored system version is missing for monitoring document [" + i + "]", + validationException); + } + if (Strings.hasLength(doc.getType()) == false) { + validationException = addValidationError("type is missing for monitoring document [" + i + "]", + validationException); + } + if (doc.getSource() == null || doc.getSource().length() == 0) { + validationException = addValidationError("source is missing for monitoring document [" + i + "]", validationException); + } + } + + return validationException; + } + + /** + * Adds a monitoring document to the list of documents to be indexed. + */ + public MonitoringBulkRequest add(MonitoringBulkDoc doc) { + docs.add(doc); + return this; + } + + /** + * Parses a monitoring bulk request and builds the list of documents to be indexed. + */ + public MonitoringBulkRequest add(BytesReference content, String defaultMonitoringId, String defaultMonitoringVersion, + String defaultIndex, String defaultType) throws Exception { + // MonitoringBulkRequest accepts a body request that has the same format as the BulkRequest: + // instead of duplicating the parsing logic here we use a new BulkRequest instance to parse the content. + BulkRequest bulkRequest = Requests.bulkRequest().add(content, defaultIndex, defaultType); + + for (ActionRequest request : bulkRequest.requests()) { + if (request instanceof IndexRequest) { + IndexRequest indexRequest = (IndexRequest) request; + + // builds a new monitoring document based on the index request + MonitoringBulkDoc doc = new MonitoringBulkDoc(defaultMonitoringId, defaultMonitoringVersion); + doc.setIndex(indexRequest.index()); + doc.setType(indexRequest.type()); + doc.setId(indexRequest.id()); + doc.setSource(indexRequest.source()); + add(doc); + } else { + throw new IllegalArgumentException("monitoring bulk requests should only contain index requests"); + } + } + return this; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + add(new MonitoringBulkDoc(in)); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(docs.size()); + for (MonitoringBulkDoc doc : docs) { + doc.writeTo(out); + } + } +} diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkRequestBuilder.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkRequestBuilder.java new file mode 100644 index 00000000000..112c6bf4551 --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkRequestBuilder.java @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.action; + +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.bytes.BytesReference; + +public class MonitoringBulkRequestBuilder + extends ActionRequestBuilder { + + public MonitoringBulkRequestBuilder(ElasticsearchClient client) { + super(client, MonitoringBulkAction.INSTANCE, new MonitoringBulkRequest()); + } + + public MonitoringBulkRequestBuilder add(MonitoringBulkDoc doc) { + request.add(doc); + return this; + } + + public MonitoringBulkRequestBuilder add(BytesReference content, String defaultId, String defaultVersion, String defaultIndex, + String defaultType) throws Exception { + request.add(content, defaultId, defaultVersion, defaultIndex, defaultType); + return this; + } +} diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkResponse.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkResponse.java new file mode 100644 index 00000000000..070f10f5e2a --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/MonitoringBulkResponse.java @@ -0,0 +1,138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; +import java.util.Objects; + +public class MonitoringBulkResponse extends ActionResponse { + + private long tookInMillis; + private Error error; + + MonitoringBulkResponse() { + } + + public MonitoringBulkResponse(long tookInMillis) { + this(tookInMillis, null); + } + + public MonitoringBulkResponse(long tookInMillis, Error error) { + this.tookInMillis = tookInMillis; + this.error = error; + } + + public TimeValue getTook() { + return new TimeValue(tookInMillis); + } + + public long getTookInMillis() { + return tookInMillis; + } + + /** + * Returns HTTP status + *

    + *
  • {@link RestStatus#OK} if monitoring bulk request was successful
  • + *
  • {@link RestStatus#INTERNAL_SERVER_ERROR} if monitoring bulk request was partially successful or failed completely
  • + *
+ */ + public RestStatus status() { + return error == null ? RestStatus.OK : RestStatus.INTERNAL_SERVER_ERROR; + } + + public Error getError() { + return error; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + tookInMillis = in.readVLong(); + error = in.readOptionalWritable(Error::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVLong(tookInMillis); + out.writeOptionalWriteable(error); + } + + public static class Error implements Writeable, ToXContent { + + private final Throwable cause; + private final RestStatus status; + + public Error(Throwable t) { + cause = Objects.requireNonNull(t); + status = ExceptionsHelper.status(t); + } + + Error(StreamInput in) throws IOException { + this(in.readThrowable()); + } + + /** + * The failure message. + */ + public String getMessage() { + return this.cause.toString(); + } + + /** + * The rest status. + */ + public RestStatus getStatus() { + return this.status; + } + + /** + * The actual cause of the failure. + */ + public Throwable getCause() { + return cause; + } + + @Override + public Error readFrom(StreamInput in) throws IOException { + return new Error(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeThrowable(getCause()); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + ElasticsearchException.toXContent(builder, params, cause); + builder.endObject(); + return builder; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("Error ["); + sb.append("cause=").append(cause); + sb.append(", status=").append(status); + sb.append(']'); + return sb.toString(); + } + } +} diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/TransportMonitoringBulkAction.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/TransportMonitoringBulkAction.java new file mode 100644 index 00000000000..56e95d2c574 --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/action/TransportMonitoringBulkAction.java @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.marvel.agent.exporter.Exporters; +import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class TransportMonitoringBulkAction extends HandledTransportAction { + + private final ClusterService clusterService; + private final Exporters exportService; + + @Inject + public TransportMonitoringBulkAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Exporters exportService) { + super(settings, MonitoringBulkAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, + MonitoringBulkRequest::new); + this.clusterService = clusterService; + this.exportService = exportService; + } + + @Override + protected void doExecute(MonitoringBulkRequest request, ActionListener listener) { + clusterService.state().blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE); + new AsyncAction(request, listener, exportService, clusterService).start(); + } + + class AsyncAction { + + private final MonitoringBulkRequest request; + private final ActionListener listener; + private final Exporters exportService; + private final ClusterService clusterService; + + public AsyncAction(MonitoringBulkRequest request, ActionListener listener, + Exporters exportService, ClusterService clusterService) { + this.request = request; + this.listener = listener; + this.exportService = exportService; + this.clusterService = clusterService; + } + + void start() { + executeExport(prepareForExport(request.getDocs()), System.nanoTime(), listener); + } + + /** + * Iterate over the documents and set the values of common fields if needed: + * - cluster UUID + * - timestamp + * - source node + */ + Collection prepareForExport(Collection docs) { + final String clusterUUID = clusterService.state().metaData().clusterUUID(); + Function updateClusterUUID = doc -> { + if (doc.getClusterUUID() == null) { + doc.setClusterUUID(clusterUUID); + } + return doc; + }; + + final long timestamp = System.currentTimeMillis(); + Function updateTimestamp = doc -> { + if (doc.getTimestamp() == 0) { + doc.setTimestamp(timestamp); + } + return doc; + }; + + final DiscoveryNode sourceNode = clusterService.localNode(); + Function updateSourceNode = doc -> { + if (doc.getSourceNode() == null) { + doc.setSourceNode(sourceNode); + } + return doc; + }; + + return docs.stream() + .map(updateClusterUUID.andThen(updateTimestamp.andThen(updateSourceNode))) + .collect(Collectors.toList()); + } + + /** + * Exports the documents + */ + void executeExport(final Collection docs, final long startTimeNanos, + final ActionListener listener) { + threadPool.generic().execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + exportService.export(docs); + listener.onResponse(new MonitoringBulkResponse(buildTookInMillis(startTimeNanos))); + } + + @Override + public void onFailure(Throwable t) { + listener.onResponse(new MonitoringBulkResponse(buildTookInMillis(startTimeNanos), new MonitoringBulkResponse.Error(t))); + } + }); + } + } + + private long buildTookInMillis(long startTimeNanos) { + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos); + } +} diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java index 3f4199866dc..f8099becc1e 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.marvel.MarvelSettings; import org.elasticsearch.marvel.agent.collector.Collector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStatsCollector; +import org.elasticsearch.marvel.agent.exporter.ExportException; import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporters; import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; @@ -199,6 +200,8 @@ public class AgentService extends AbstractLifecycleComponent { exporters.export(docs); } + } catch (ExportException e) { + logger.error("exception when exporting documents", e); } catch (InterruptedException e) { logger.trace("interrupted"); Thread.currentThread().interrupt(); diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java index 963df57f04c..f02a11ac95e 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java @@ -5,8 +5,6 @@ */ package org.elasticsearch.marvel.agent.exporter; -import org.elasticsearch.ElasticsearchException; - import java.util.Collection; /** @@ -25,18 +23,14 @@ public abstract class ExportBulk { return name; } - public abstract ExportBulk add(Collection docs) throws Exception; + public abstract ExportBulk add(Collection docs) throws ExportException; - public abstract void flush() throws Exception; + public abstract void flush() throws ExportException; - public final void close(boolean flush) throws Exception { - Exception exception = null; + public final void close(boolean flush) throws ExportException { + ExportException exception = null; if (flush) { - try { - flush(); - } catch (Exception e) { - exception = e; - } + flush(); } // now closing @@ -46,7 +40,7 @@ public abstract class ExportBulk { if (exception != null) { exception.addSuppressed(e); } else { - exception = e; + exception = new ExportException("Exception when closing export bulk", e); } } @@ -69,24 +63,35 @@ public abstract class ExportBulk { } @Override - public ExportBulk add(Collection docs) throws Exception { + public ExportBulk add(Collection docs) throws ExportException { + ExportException exception = null; for (ExportBulk bulk : bulks) { - bulk.add(docs); + try { + bulk.add(docs); + } catch (ExportException e) { + if (exception == null) { + exception = new ExportException("failed to add documents to export bulks"); + } + exception.addExportException(e); + } + } + if (exception != null) { + throw exception; } return this; } @Override - public void flush() throws Exception { - Exception exception = null; + public void flush() throws ExportException { + ExportException exception = null; for (ExportBulk bulk : bulks) { try { bulk.flush(); - } catch (Exception e) { + } catch (ExportException e) { if (exception == null) { - exception = new ElasticsearchException("failed to flush exporter bulks"); + exception = new ExportException("failed to flush export bulks"); } - exception.addSuppressed(new ElasticsearchException("failed to flush [{}] exporter bulk", e, bulk.name)); + exception.addExportException(e); } } if (exception != null) { diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportException.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportException.java new file mode 100644 index 00000000000..02cddd86ece --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportException.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.agent.exporter; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class ExportException extends ElasticsearchException implements Iterable { + + private final List exceptions = new ArrayList<>(); + + public ExportException(Throwable throwable) { + super(throwable); + } + + public ExportException(String msg, Object... args) { + super(msg, args); + } + + public ExportException(String msg, Throwable throwable, Object... args) { + super(msg, throwable, args); + } + + public ExportException(StreamInput in) throws IOException { + super(in); + for (int i = in.readVInt(); i > 0; i--) { + exceptions.add(new ExportException(in)); + } + } + + public boolean addExportException(ExportException e) { + return exceptions.add(e); + } + + public boolean hasExportExceptions() { + return exceptions.size() > 0; + } + + @Override + public Iterator iterator() { + return exceptions.iterator(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(exceptions.size()); + for (ExportException e : exceptions) { + e.writeTo(out); + } + } + + @Override + protected void innerToXContent(XContentBuilder builder, Params params) throws IOException { + super.innerToXContent(builder, params); + if (hasExportExceptions()) { + builder.startArray("exceptions"); + for (ExportException exception : exceptions) { + builder.startObject(); + exception.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + } + } +} diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java index a01946922cc..aeadf2fff66 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java @@ -122,7 +122,7 @@ public class Exporters extends AbstractLifecycleComponent implements bulks.add(bulk); } } catch (Exception e) { - logger.error("exporter [{}] failed to export monitoring data", e, exporter.name()); + logger.error("exporter [{}] failed to open exporting bulk", e, exporter.name()); } } return bulks.isEmpty() ? null : new ExportBulk.Compound(bulks); @@ -179,9 +179,9 @@ public class Exporters extends AbstractLifecycleComponent implements /** * Exports a collection of monitoring documents using the configured exporters */ - public synchronized void export(Collection docs) throws Exception { + public synchronized void export(Collection docs) throws ExportException { if (this.lifecycleState() != Lifecycle.State.STARTED) { - throw new IllegalStateException("Export service is not started"); + throw new ExportException("Export service is not started"); } if (docs != null && docs.size() > 0) { ExportBulk bulk = openBulk(); @@ -191,7 +191,6 @@ public class Exporters extends AbstractLifecycleComponent implements } try { - logger.debug("exporting [{}] monitoring documents", docs.size()); bulk.add(docs); } finally { bulk.close(lifecycleState() == Lifecycle.State.STARTED); diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/MonitoringDoc.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/MonitoringDoc.java index 6e1bee7bf54..b2199b7f417 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/MonitoringDoc.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/MonitoringDoc.java @@ -22,8 +22,6 @@ import java.io.IOException; */ public class MonitoringDoc implements Writeable { - private static final MonitoringDoc PROTO = new MonitoringDoc(); - private final String monitoringId; private final String monitoringVersion; @@ -31,16 +29,18 @@ public class MonitoringDoc implements Writeable { private long timestamp; private Node sourceNode; - // Used by {@link #PROTO} instance and tests - MonitoringDoc() { - this(null, null); - } - public MonitoringDoc(String monitoringId, String monitoringVersion) { this.monitoringId = monitoringId; this.monitoringVersion = monitoringVersion; } + public MonitoringDoc(StreamInput in) throws IOException { + this(in.readOptionalString(), in.readOptionalString()); + clusterUUID = in.readOptionalString(); + timestamp = in.readVLong(); + sourceNode = in.readOptionalWritable(Node::new); + } + public String getClusterUUID() { return clusterUUID; } @@ -80,7 +80,7 @@ public class MonitoringDoc implements Writeable { @Override public String toString() { - return "marvel document [class=" + getClass().getName() + + return "monitoring document [class=" + getClass().getSimpleName() + ", monitoring id=" + getMonitoringId() + ", monitoring version=" + getMonitoringVersion() + "]"; @@ -92,33 +92,16 @@ public class MonitoringDoc implements Writeable { out.writeOptionalString(getMonitoringVersion()); out.writeOptionalString(getClusterUUID()); out.writeVLong(getTimestamp()); - if (getSourceNode() != null) { - out.writeBoolean(true); - getSourceNode().writeTo(out); - } else { - out.writeBoolean(false); - } + out.writeOptionalWriteable(getSourceNode()); } @Override public MonitoringDoc readFrom(StreamInput in) throws IOException { - MonitoringDoc doc = new MonitoringDoc(in.readOptionalString(), in.readOptionalString()); - doc.setClusterUUID(in.readOptionalString()); - doc.setTimestamp(in.readVLong()); - if (in.readBoolean()) { - doc.setSourceNode(Node.PROTO.readFrom(in)); - } - return doc; - } - - public static MonitoringDoc readMonitoringDoc(StreamInput in) throws IOException { - return PROTO.readFrom(in); + return new MonitoringDoc(in); } public static class Node implements Writeable, ToXContent { - public static final Node PROTO = new Node(); - private String uuid; private String host; private String transportAddress; @@ -126,10 +109,6 @@ public class MonitoringDoc implements Writeable { private String name; private ImmutableOpenMap attributes; - // Used by the {@link #PROTO} instance - Node() { - } - public Node(String uuid, String host, String transportAddress, String ip, String name, ImmutableOpenMap attributes) { this.uuid = uuid; @@ -147,6 +126,20 @@ public class MonitoringDoc implements Writeable { this.attributes = builder.build(); } + public Node(StreamInput in) throws IOException { + uuid = in.readOptionalString(); + host = in.readOptionalString(); + transportAddress = in.readOptionalString(); + ip = in.readOptionalString(); + name = in.readOptionalString(); + int size = in.readVInt(); + ImmutableOpenMap.Builder attributes = ImmutableOpenMap.builder(size); + for (int i = 0; i < size; i++) { + attributes.put(in.readOptionalString(), in.readOptionalString()); + } + this.attributes = attributes.build(); + } + public String getUUID() { return uuid; } @@ -208,19 +201,7 @@ public class MonitoringDoc implements Writeable { @Override public Node readFrom(StreamInput in) throws IOException { - Node node = new Node(); - node.uuid = in.readOptionalString(); - node.host = in.readOptionalString(); - node.transportAddress = in.readOptionalString(); - node.ip = in.readOptionalString(); - node.name = in.readOptionalString(); - int size = in.readVInt(); - ImmutableOpenMap.Builder attributes = ImmutableOpenMap.builder(size); - for (int i = 0; i < size; i++) { - attributes.put(in.readOptionalString(), in.readOptionalString()); - } - node.attributes = attributes.build(); - return node; + return new Node(in); } @Override diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java index 21a9ad75c22..5f92489d0bd 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.Environment; import org.elasticsearch.marvel.agent.exporter.ExportBulk; +import org.elasticsearch.marvel.agent.exporter.ExportException; import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils; import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; @@ -79,7 +80,9 @@ public class HttpExporter extends Exporter { public static final String SSL_TRUSTSTORE_ALGORITHM_SETTING = "truststore.algorithm"; public static final String SSL_HOSTNAME_VERIFICATION_SETTING = SSL_SETTING + ".hostname_verification"; - /** Minimum supported version of the remote monitoring cluster **/ + /** + * Minimum supported version of the remote monitoring cluster + **/ public static final Version MIN_SUPPORTED_CLUSTER_VERSION = Version.V_2_0_0_beta2; private static final XContentType CONTENT_TYPE = XContentType.JSON; @@ -89,19 +92,24 @@ public class HttpExporter extends Exporter { final TimeValue connectionReadTimeout; final BasicAuth auth; - /** https support * */ + /** + * https support * + */ final SSLSocketFactory sslSocketFactory; final boolean hostnameVerification; final Environment env; final ResolversRegistry resolvers; - final @Nullable TimeValue templateCheckTimeout; + @Nullable + final TimeValue templateCheckTimeout; volatile boolean checkedAndUploadedIndexTemplate = false; volatile boolean supportedClusterVersion = false; - /** Version number of built-in templates **/ + /** + * Version number of built-in templates + **/ private final Integer templateVersion; boolean keepAlive; @@ -218,9 +226,9 @@ public class HttpExporter extends Exporter { logger.trace("http exporter [{}] - added index request [index={}, type={}, id={}]", name(), index, type, id); } - } else { - logger.warn("http exporter [{}] - unable to render monitoring document of type [{}]: no renderer found in registry", - name(), doc); + } else if (logger.isTraceEnabled()) { + logger.trace("http exporter [{}] - no resolver found for monitoring document [class={}, id={}, version={}]", + name(), doc.getClass().getName(), doc.getMonitoringId(), doc.getMonitoringVersion()); } } catch (Exception e) { logger.warn("http exporter [{}] - failed to render document [{}], skipping it", e, name(), doc); @@ -318,7 +326,9 @@ public class HttpExporter extends Exporter { return null; } - /** open a connection to the given hosts, returning null when not successful * */ + /** + * open a connection to the given hosts, returning null when not successful * + */ private HttpURLConnection openConnection(String host, String method, String path, @Nullable String contentType) { try { final URL url = HttpExporterUtils.parseHostWithPath(host, path); @@ -450,7 +460,7 @@ public class HttpExporter extends Exporter { // 200 means that the template has been found, 404 otherwise if (connection.getResponseCode() == 200) { - logger.debug("monitoring template [{}] found",templateName); + logger.debug("monitoring template [{}] found", templateName); return true; } } catch (Exception e) { @@ -543,7 +553,9 @@ public class HttpExporter extends Exporter { } } - /** SSL Initialization * */ + /** + * SSL Initialization * + */ public SSLSocketFactory createSSLSocketFactory(Settings settings) { if (settings.names().isEmpty()) { logger.trace("no ssl context configured"); @@ -693,47 +705,54 @@ public class HttpExporter extends Exporter { } @Override - public Bulk add(Collection docs) throws Exception { - if (connection == null) { - connection = openExportingConnection(); - } - if ((docs != null) && (!docs.isEmpty())) { - if (out == null) { - out = connection.getOutputStream(); - } + public Bulk add(Collection docs) throws ExportException { + try { + if ((docs != null) && (!docs.isEmpty())) { + if (connection == null) { + connection = openExportingConnection(); + if (connection == null) { + throw new IllegalStateException("No connection available to export documents"); + } + } + if (out == null) { + out = connection.getOutputStream(); + } - // We need to use a buffer to render each monitoring document - // because the renderer might close the outputstream (ex: XContentBuilder) - try (BytesStreamOutput buffer = new BytesStreamOutput()) { - for (MonitoringDoc monitoringDoc : docs) { - try { - render(monitoringDoc, buffer); - // write the result to the connection - out.write(buffer.bytes().toBytes()); - } finally { - buffer.reset(); + // We need to use a buffer to render each monitoring document + // because the renderer might close the outputstream (ex: XContentBuilder) + try (BytesStreamOutput buffer = new BytesStreamOutput()) { + for (MonitoringDoc monitoringDoc : docs) { + try { + render(monitoringDoc, buffer); + // write the result to the connection + out.write(buffer.bytes().toBytes()); + } finally { + buffer.reset(); + } } } } + } catch (Exception e) { + throw new ExportException("failed to add documents to export bulk [{}]", name); } return this; } @Override - public void flush() throws IOException { + public void flush() throws ExportException { if (connection != null) { - flush(connection); - connection = null; + try { + flush(connection); + } catch (Exception e) { + throw new ExportException("failed to flush export bulk [{}]", e, name); + } finally { + connection = null; + } } } private void flush(HttpURLConnection connection) throws IOException { - try { - sendCloseExportingConnection(connection); - } catch (IOException e) { - logger.error("failed sending data to [{}]: {}", connection.getURL(), ExceptionsHelper.detailedMessage(e)); - throw e; - } + sendCloseExportingConnection(connection); } } diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java index 0f41d48d21c..5dbe433b082 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.marvel.agent.exporter.local; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -13,12 +12,13 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.marvel.agent.exporter.ExportBulk; +import org.elasticsearch.marvel.agent.exporter.ExportException; import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; import org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolver; import org.elasticsearch.marvel.agent.resolver.ResolversRegistry; import org.elasticsearch.marvel.support.init.proxy.MonitoringClientProxy; -import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.concurrent.atomic.AtomicReference; @@ -43,7 +43,9 @@ public class LocalBulk extends ExportBulk { } @Override - public synchronized ExportBulk add(Collection docs) throws Exception { + public synchronized ExportBulk add(Collection docs) throws ExportException { + ExportException exception = null; + for (MonitoringDoc doc : docs) { if (state.get() != State.ACTIVE) { return this; @@ -54,42 +56,61 @@ public class LocalBulk extends ExportBulk { try { MonitoringIndexNameResolver resolver = resolvers.getResolver(doc); - if (resolver != null) { - IndexRequest request = new IndexRequest(resolver.index(doc), resolver.type(doc), resolver.id(doc)); - request.source(resolver.source(doc, XContentType.SMILE)); - requestBuilder.add(request); + IndexRequest request = new IndexRequest(resolver.index(doc), resolver.type(doc), resolver.id(doc)); + request.source(resolver.source(doc, XContentType.SMILE)); + requestBuilder.add(request); - if (logger.isTraceEnabled()) { - logger.trace("local exporter [{}] - added index request [index={}, type={}, id={}]", - name, request.index(), request.type(), request.id()); - } - } else { - logger.warn("local exporter [{}] - unable to render monitoring document of type [{}]: no renderer found in registry", - name, doc); + if (logger.isTraceEnabled()) { + logger.trace("local exporter [{}] - added index request [index={}, type={}, id={}]", + name, request.index(), request.type(), request.id()); } } catch (Exception e) { - logger.warn("local exporter [{}] - failed to add document [{}], skipping it", e, name, doc); + if (exception == null) { + exception = new ExportException("failed to add documents to export bulk [{}]", name); + } + exception.addExportException(new ExportException("failed to add document [{}]", e, doc, name)); } } + + if (exception != null) { + throw exception; + } + return this; } @Override - public void flush() throws IOException { - if (state.get() != State.ACTIVE || requestBuilder == null) { + public void flush() throws ExportException { + if (state.get() != State.ACTIVE || requestBuilder == null || requestBuilder.numberOfActions() == 0) { return; } try { logger.trace("exporter [{}] - exporting {} documents", name, requestBuilder.numberOfActions()); BulkResponse bulkResponse = requestBuilder.get(); + if (bulkResponse.hasFailures()) { - throw new ElasticsearchException(buildFailureMessage(bulkResponse)); + throwExportException(bulkResponse.getItems()); } + } catch (Exception e) { + throw new ExportException("failed to flush export bulk [{}]", e, name); } finally { requestBuilder = null; } } + void throwExportException(BulkItemResponse[] bulkItemResponses) { + ExportException exception = new ExportException("bulk [{}] reports failures when exporting documents", name); + + Arrays.stream(bulkItemResponses) + .filter(BulkItemResponse::isFailed) + .map(item -> new ExportException(item.getFailure().getCause())) + .forEach(exception::addExportException); + + if (exception.hasExportExceptions()) { + throw exception; + } + } + void terminate() { state.set(State.TERMINATING); synchronized (this) { @@ -98,31 +119,6 @@ public class LocalBulk extends ExportBulk { } } - /** - * In case of something goes wrong and there's a lot of shards/indices, - * we limit the number of failures displayed in log. - */ - private String buildFailureMessage(BulkResponse bulkResponse) { - BulkItemResponse[] items = bulkResponse.getItems(); - - if (logger.isDebugEnabled() || (items.length < 100)) { - return bulkResponse.buildFailureMessage(); - } - - StringBuilder sb = new StringBuilder(); - sb.append("failure in bulk execution, only the first 100 failures are printed:"); - for (int i = 0; i < items.length && i < 100; i++) { - BulkItemResponse item = items[i]; - if (item.isFailed()) { - sb.append("\n[").append(i) - .append("]: index [").append(item.getIndex()).append("], type [").append(item.getType()) - .append("], id [").append(item.getId()).append("], message [").append(item.getFailureMessage()) - .append("]"); - } - } - return sb.toString(); - } - enum State { ACTIVE, TERMINATING, diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java index 3929a52211f..c547114b5df 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java @@ -282,7 +282,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle .distinct() .toArray(String[]::new); - MonitoringDoc monitoringDoc = new MonitoringDoc(MonitoredSystem.ES.getSystem(), Version.CURRENT.toString()); + MonitoringDoc monitoringDoc = new MonitoringDoc(null, null); monitoringDoc.setTimestamp(System.currentTimeMillis()); // Get the names of the current monitoring indices diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/resolver/ResolversRegistry.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/resolver/ResolversRegistry.java index 5703c2f415d..cff51999817 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/resolver/ResolversRegistry.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/resolver/ResolversRegistry.java @@ -5,8 +5,10 @@ */ package org.elasticsearch.marvel.agent.resolver; +import org.elasticsearch.Version; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.marvel.MonitoredSystem; +import org.elasticsearch.marvel.action.MonitoringBulkDoc; import org.elasticsearch.marvel.agent.collector.cluster.ClusterInfoMonitoringDoc; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMonitoringDoc; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateNodeMonitoringDoc; @@ -19,6 +21,7 @@ import org.elasticsearch.marvel.agent.collector.node.NodeStatsMonitoringDoc; import org.elasticsearch.marvel.agent.collector.shards.ShardMonitoringDoc; import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils; import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; +import org.elasticsearch.marvel.agent.resolver.bulk.MonitoringBulkResolver; import org.elasticsearch.marvel.agent.resolver.cluster.ClusterInfoResolver; import org.elasticsearch.marvel.agent.resolver.cluster.ClusterStateNodeResolver; import org.elasticsearch.marvel.agent.resolver.cluster.ClusterStateResolver; @@ -46,8 +49,8 @@ public class ResolversRegistry implements Iterable // register built-in defaults resolvers registerBuiltIn(ES, MarvelTemplateUtils.TEMPLATE_VERSION, settings); - // register resolvers for external applications, something like: - //registrations.add(resolveByIdVersion(MonitoringIds.KIBANA, "4.4.1", new KibanaDocResolver(KIBANA, 0, settings))); + // register resolvers for external applications + registerKibana(settings); } /** @@ -66,6 +69,14 @@ public class ResolversRegistry implements Iterable registrations.add(resolveByClass(ShardMonitoringDoc.class, new ShardsResolver(id, version, settings))); } + /** + * Registers resolvers for Kibana + */ + private void registerKibana(Settings settings) { + final MonitoringBulkResolver kibana = new MonitoringBulkResolver(MonitoredSystem.KIBANA, 0, settings); + registrations.add(resolveByClassSystemVersion(MonitoringBulkDoc.class, MonitoredSystem.KIBANA, Version.CURRENT, kibana)); + } + /** * @return a Resolver that is able to resolver the given monitoring document */ @@ -75,8 +86,7 @@ public class ResolversRegistry implements Iterable return registration.resolver(); } } - throw new IllegalArgumentException("No resolver found for monitoring document [class=" + document.getClass().getName() - + ", id=" + document.getMonitoringId() + ", version=" + document.getMonitoringVersion() + "]"); + throw new IllegalArgumentException("No resolver found for monitoring document"); } @Override @@ -88,6 +98,23 @@ public class ResolversRegistry implements Iterable return new Registration(resolver, type::isInstance); } + static Registration resolveByClassSystemVersion(Class type, MonitoredSystem system, Version version, + MonitoringIndexNameResolver resolver) { + return new Registration(resolver, doc -> { + try { + if (type.isInstance(doc) == false) { + return false; + } + if (system != MonitoredSystem.fromSystem(doc.getMonitoringId())) { + return false; + } + return version == Version.fromString(doc.getMonitoringVersion()); + } catch (Exception e) { + return false; + } + }); + } + static class Registration { private final MonitoringIndexNameResolver resolver; @@ -106,5 +133,4 @@ public class ResolversRegistry implements Iterable return resolver; } } - } diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/resolver/bulk/MonitoringBulkResolver.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/resolver/bulk/MonitoringBulkResolver.java new file mode 100644 index 00000000000..e92b3e34452 --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/resolver/bulk/MonitoringBulkResolver.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.agent.resolver.bulk; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.marvel.MonitoredSystem; +import org.elasticsearch.marvel.action.MonitoringBulkDoc; +import org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolver; + +import java.io.IOException; + +public class MonitoringBulkResolver extends MonitoringIndexNameResolver.Timestamped { + + public MonitoringBulkResolver(MonitoredSystem id, int version, Settings settings) { + super(id, version, settings); + } + + @Override + public String type(MonitoringBulkDoc document) { + return document.getType(); + } + + @Override + protected void buildXContent(MonitoringBulkDoc document, XContentBuilder builder, ToXContent.Params params) throws IOException { + BytesReference source = document.getSource(); + if (source != null && source.length() > 0) { + builder.rawField(type(document), source); + } + } +} \ No newline at end of file diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/client/MonitoringClient.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/client/MonitoringClient.java index 596096ebb6f..36072fc73ad 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/client/MonitoringClient.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/client/MonitoringClient.java @@ -5,8 +5,16 @@ */ package org.elasticsearch.marvel.client; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.marvel.action.MonitoringBulkAction; +import org.elasticsearch.marvel.action.MonitoringBulkRequest; +import org.elasticsearch.marvel.action.MonitoringBulkRequestBuilder; +import org.elasticsearch.marvel.action.MonitoringBulkResponse; + +import java.util.Map; public class MonitoringClient { @@ -17,5 +25,36 @@ public class MonitoringClient { this.client = client; } - // to be implemented: specific API for monitoring + + /** + * Creates a request builder that bulk index monitoring documents. + * + * @return The request builder + */ + public MonitoringBulkRequestBuilder prepareMonitoringBulk() { + return new MonitoringBulkRequestBuilder(client); + } + + /** + * Executes a bulk of index operations that concern monitoring documents. + * + * @param request The monitoring bulk request + * @param listener A listener to be notified with a result + */ + public void bulk(MonitoringBulkRequest request, ActionListener listener) { + client.execute(MonitoringBulkAction.INSTANCE, request, listener); + } + + /** + * Executes a bulk of index operations that concern monitoring documents. + * + * @param request The monitoring bulk request + */ + public ActionFuture bulk(MonitoringBulkRequest request) { + return client.execute(MonitoringBulkAction.INSTANCE, request); + } + + public MonitoringClient filterWithHeader(Map headers) { + return new MonitoringClient(client.filterWithHeader(headers)); + } } diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/rest/MonitoringRestHandler.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/rest/MonitoringRestHandler.java new file mode 100644 index 00000000000..ab67bb39016 --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/rest/MonitoringRestHandler.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.rest; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.marvel.client.MonitoringClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.xpack.XPackPlugin; + +import java.util.Locale; + +public abstract class MonitoringRestHandler extends BaseRestHandler { + + protected static String URI_BASE = String.format(Locale.ROOT, "/_%s/monitoring", XPackPlugin.NAME); + + public MonitoringRestHandler(Settings settings, Client client) { + super(settings, client); + } + + @Override + protected final void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + handleRequest(request, channel, new MonitoringClient(client)); + } + + protected abstract void handleRequest(RestRequest request, RestChannel channel, MonitoringClient client) throws Exception; +} diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/rest/action/RestMonitoringBulkAction.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/rest/action/RestMonitoringBulkAction.java new file mode 100644 index 00000000000..16cfaa81cd2 --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/rest/action/RestMonitoringBulkAction.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.rest.action; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.marvel.action.MonitoringBulkRequestBuilder; +import org.elasticsearch.marvel.action.MonitoringBulkResponse; +import org.elasticsearch.marvel.client.MonitoringClient; +import org.elasticsearch.marvel.rest.MonitoringRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.action.support.RestActions; +import org.elasticsearch.rest.action.support.RestBuilderListener; + +public class RestMonitoringBulkAction extends MonitoringRestHandler { + + public static final String MONITORING_ID = "system_id"; + public static final String MONITORING_VERSION = "system_version"; + + @Inject + public RestMonitoringBulkAction(Settings settings, RestController controller, Client client) { + super(settings, client); + controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/_bulk", this); + controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/_bulk", this); + controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/{index}/_bulk", this); + controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/{index}/_bulk", this); + controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/{index}/{type}/_bulk", this); + controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/{index}/{type}/_bulk", this); + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, MonitoringClient client) throws Exception { + String defaultIndex = request.param("index"); + String defaultType = request.param("type"); + + String id = request.param(MONITORING_ID); + if (Strings.hasLength(id) == false) { + throw new IllegalArgumentException("no monitoring id for monitoring bulk request"); + } + String version = request.param(MONITORING_VERSION); + if (Strings.hasLength(version) == false) { + throw new IllegalArgumentException("no monitoring version for monitoring bulk request"); + } + + if (!RestActions.hasBodyContent(request)) { + throw new ElasticsearchParseException("no body content for monitoring bulk request"); + } + + MonitoringBulkRequestBuilder requestBuilder = client.prepareMonitoringBulk(); + requestBuilder.add(request.content(), id, version, defaultIndex, defaultType); + requestBuilder.execute(new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(MonitoringBulkResponse response, XContentBuilder builder) throws Exception { + builder.startObject(); + builder.field(Fields.TOOK, response.getTookInMillis()); + + MonitoringBulkResponse.Error error = response.getError(); + builder.field(Fields.ERRORS, error != null); + + if (error != null) { + builder.field(Fields.ERROR, response.getError()); + } + builder.endObject(); + return new BytesRestResponse(response.status(), builder); + } + }); + } + + static final class Fields { + static final XContentBuilderString TOOK = new XContentBuilderString("took"); + static final XContentBuilderString ERRORS = new XContentBuilderString("errors"); + static final XContentBuilderString ERROR = new XContentBuilderString("error"); + } +} diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/support/init/proxy/MonitoringClientProxy.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/support/init/proxy/MonitoringClientProxy.java index f09877b3590..4c7c6bc53ea 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/support/init/proxy/MonitoringClientProxy.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/support/init/proxy/MonitoringClientProxy.java @@ -6,16 +6,11 @@ package org.elasticsearch.marvel.support.init.proxy; import org.elasticsearch.client.Client; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.shield.InternalClient; import org.elasticsearch.xpack.common.init.proxy.ClientProxy; public class MonitoringClientProxy extends ClientProxy { - @Inject - public MonitoringClientProxy() { - } - /** * Creates a proxy to the given internal client (can be used for testing) */ diff --git a/elasticsearch/x-pack/marvel/src/main/resources/monitoring-data.json b/elasticsearch/x-pack/marvel/src/main/resources/monitoring-data.json index b01dac11022..6b7f62fd116 100644 --- a/elasticsearch/x-pack/marvel/src/main/resources/monitoring-data.json +++ b/elasticsearch/x-pack/marvel/src/main/resources/monitoring-data.json @@ -1,7 +1,6 @@ { "template": ".monitoring-data-${monitoring.template.version}", "settings": { - "index.xpack.version": "${project.version}", "index.number_of_shards": 1, "index.number_of_replicas": 1, "index.codec": "best_compression", @@ -9,7 +8,10 @@ }, "mappings": { "cluster_info": { - "enabled": false + "enabled": false, + "_meta": { + "xpack.version": "${project.version}" + } }, "node": { "enabled": false diff --git a/elasticsearch/x-pack/marvel/src/main/resources/monitoring-es.json b/elasticsearch/x-pack/marvel/src/main/resources/monitoring-es.json index 6ed747e3c17..5919d19c1d8 100644 --- a/elasticsearch/x-pack/marvel/src/main/resources/monitoring-es.json +++ b/elasticsearch/x-pack/marvel/src/main/resources/monitoring-es.json @@ -1,7 +1,6 @@ { "template": ".monitoring-es-${monitoring.template.version}-*", "settings": { - "index.xpack.version": "${project.version}", "index.number_of_shards": 1, "index.number_of_replicas": 1, "index.codec": "best_compression", diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/MarvelPluginClientTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/MarvelPluginClientTests.java index a85d8456bd1..6eb4d8bbb0f 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/MarvelPluginClientTests.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/MarvelPluginClientTests.java @@ -22,7 +22,8 @@ public class MarvelPluginClientTests extends ESTestCase { .build(); Marvel plugin = new Marvel(settings); - assertThat(plugin.isEnabled(), is(false)); + assertThat(plugin.isEnabled(), is(true)); + assertThat(plugin.isTransportClient(), is(true)); Collection modules = plugin.nodeModules(); assertThat(modules.size(), is(0)); } @@ -34,6 +35,7 @@ public class MarvelPluginClientTests extends ESTestCase { .build(); Marvel plugin = new Marvel(settings); assertThat(plugin.isEnabled(), is(true)); + assertThat(plugin.isTransportClient(), is(false)); Collection modules = plugin.nodeModules(); assertThat(modules.size(), is(5)); } diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkDocTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkDocTests.java new file mode 100644 index 00000000000..7dc931dfe57 --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkDocTests.java @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.action; + +import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.elasticsearch.test.VersionUtils.randomVersion; +import static org.hamcrest.Matchers.equalTo; + +public class MonitoringBulkDocTests extends ESTestCase { + + public void testSerialization() throws IOException { + int iterations = randomIntBetween(5, 50); + for (int i = 0; i < iterations; i++) { + MonitoringBulkDoc doc = newRandomMonitoringBulkDoc(); + + boolean hasSourceNode = randomBoolean(); + if (hasSourceNode) { + doc.setSourceNode(newRandomSourceNode()); + } + + BytesStreamOutput output = new BytesStreamOutput(); + Version outputVersion = randomVersion(random()); + output.setVersion(outputVersion); + doc.writeTo(output); + + StreamInput streamInput = StreamInput.wrap(output.bytes()); + streamInput.setVersion(randomVersion(random())); + MonitoringBulkDoc doc2 = new MonitoringBulkDoc(streamInput); + + assertThat(doc2.getMonitoringId(), equalTo(doc.getMonitoringId())); + assertThat(doc2.getMonitoringVersion(), equalTo(doc.getMonitoringVersion())); + assertThat(doc2.getClusterUUID(), equalTo(doc.getClusterUUID())); + assertThat(doc2.getTimestamp(), equalTo(doc.getTimestamp())); + assertThat(doc2.getSourceNode(), equalTo(doc.getSourceNode())); + assertThat(doc2.getIndex(), equalTo(doc.getIndex())); + assertThat(doc2.getType(), equalTo(doc.getType())); + assertThat(doc2.getId(), equalTo(doc.getId())); + if (doc.getSource() == null) { + assertThat(doc2.getSource(), equalTo(BytesArray.EMPTY)); + } else { + assertThat(doc2.getSource(), equalTo(doc.getSource())); + } + } + } + + private MonitoringBulkDoc newRandomMonitoringBulkDoc() { + MonitoringBulkDoc doc = new MonitoringBulkDoc(randomAsciiOfLength(2), randomAsciiOfLength(2)); + if (frequently()) { + doc.setClusterUUID(randomAsciiOfLength(5)); + doc.setType(randomAsciiOfLength(5)); + } + if (randomBoolean()) { + doc.setTimestamp(System.currentTimeMillis()); + doc.setSource(new BytesArray("{\"key\" : \"value\"}")); + } + if (rarely()) { + doc.setIndex(randomAsciiOfLength(5)); + doc.setId(randomAsciiOfLength(2)); + } + return doc; + } + + private MonitoringDoc.Node newRandomSourceNode() { + String uuid = null; + String name = null; + String ip = null; + String transportAddress = null; + String host = null; + ImmutableOpenMap attributes = null; + + if (frequently()) { + uuid = randomAsciiOfLength(5); + name = randomAsciiOfLength(5); + } + if (randomBoolean()) { + ip = randomAsciiOfLength(5); + transportAddress = randomAsciiOfLength(5); + host = randomAsciiOfLength(3); + } + if (rarely()) { + int nbAttributes = randomIntBetween(0, 5); + + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); + for (int i = 0; i < nbAttributes; i++) { + builder.put("key#" + i, String.valueOf(i)); + } + attributes = builder.build(); + } + return new MonitoringDoc.Node(uuid, host, transportAddress, ip, name, attributes); + } + +} diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkRequestTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkRequestTests.java new file mode 100644 index 00000000000..4e9f2ef9e79 --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkRequestTests.java @@ -0,0 +1,193 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.test.ESTestCase; +import org.hamcrest.CoreMatchers; +import org.hamcrest.Matcher; + +import java.io.IOException; + +import static org.elasticsearch.test.VersionUtils.randomVersion; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.nullValue; + +public class MonitoringBulkRequestTests extends ESTestCase { + + private static final BytesArray SOURCE = new BytesArray("{\"key\" : \"value\"}"); + + public void testValidateRequestNoDocs() { + assertValidationErrors(new MonitoringBulkRequest(), hasItems("no monitoring documents added")); + } + + public void testValidateRequestSingleDoc() { + MonitoringBulkDoc doc = new MonitoringBulkDoc(null, null); + + assertValidationErrors(new MonitoringBulkRequest().add(doc), hasItems("monitored system id is missing for monitoring document [0]", + "monitored system version is missing for monitoring document [0]", + "type is missing for monitoring document [0]", + "source is missing for monitoring document [0]")); + + doc = new MonitoringBulkDoc("id", null); + assertValidationErrors(new MonitoringBulkRequest().add(doc), + hasItems("monitored system version is missing for monitoring document [0]", + "type is missing for monitoring document [0]", + "source is missing for monitoring document [0]")); + + doc = new MonitoringBulkDoc("id", "version"); + assertValidationErrors(new MonitoringBulkRequest().add(doc), hasItems("type is missing for monitoring document [0]", + "source is missing for monitoring document [0]")); + + doc.setType("type"); + assertValidationErrors(new MonitoringBulkRequest().add(doc), hasItems("source is missing for monitoring document [0]")); + + doc.setSource(SOURCE); + assertValidationErrors(new MonitoringBulkRequest().add(doc), nullValue()); + } + + + public void testValidateRequestMultiDocs() { + MonitoringBulkRequest request = new MonitoringBulkRequest(); + + // Doc0 is complete + MonitoringBulkDoc doc0 = new MonitoringBulkDoc(randomAsciiOfLength(2), randomAsciiOfLength(2)); + doc0.setType(randomAsciiOfLength(5)); + doc0.setSource(SOURCE); + request.add(doc0); + + // Doc1 has no type + MonitoringBulkDoc doc1 = new MonitoringBulkDoc(randomAsciiOfLength(2), randomAsciiOfLength(2)); + doc1.setSource(SOURCE); + request.add(doc1); + + // Doc2 has no source + MonitoringBulkDoc doc2 = new MonitoringBulkDoc(randomAsciiOfLength(2), randomAsciiOfLength(2)); + doc2.setType(randomAsciiOfLength(5)); + doc2.setSource(BytesArray.EMPTY); + request.add(doc2); + + // Doc3 has no version + MonitoringBulkDoc doc3 = new MonitoringBulkDoc(randomAsciiOfLength(2), null); + doc3.setType(randomAsciiOfLength(5)); + doc3.setSource(SOURCE); + request.add(doc3); + + // Doc4 has no id + MonitoringBulkDoc doc4 = new MonitoringBulkDoc(null, randomAsciiOfLength(2)); + doc4.setType(randomAsciiOfLength(5)); + doc4.setSource(SOURCE); + request.add(doc4); + + assertValidationErrors(request, hasItems("type is missing for monitoring document [1]", + "source is missing for monitoring document [2]", + "monitored system version is missing for monitoring document [3]", + "monitored system id is missing for monitoring document [4]")); + + } + + public void testAddSingleDoc() { + MonitoringBulkRequest request = new MonitoringBulkRequest(); + final int nbDocs = randomIntBetween(1, 20); + for (int i = 0; i < nbDocs; i++) { + request.add(new MonitoringBulkDoc(String.valueOf(i), String.valueOf(i))); + } + assertThat(request.getDocs(), hasSize(nbDocs)); + } + + public void testAddMultipleDocs() throws Exception { + final int nbDocs = randomIntBetween(3, 20); + final XContentType xContentType = XContentType.JSON; + + try (BytesStreamOutput content = new BytesStreamOutput()) { + try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType, content)) { + for (int i = 0; i < nbDocs; i++) { + builder.startObject().startObject("index").endObject().endObject().flush(); + content.write(xContentType.xContent().streamSeparator()); + builder.startObject().field("foo").value(i).endObject().flush(); + content.write(xContentType.xContent().streamSeparator()); + } + } + + String defaultMonitoringId = randomBoolean() ? randomAsciiOfLength(2) : null; + String defaultMonitoringVersion = randomBoolean() ? randomAsciiOfLength(3) : null; + String defaultIndex = randomBoolean() ? randomAsciiOfLength(5) : null; + String defaultType = randomBoolean() ? randomAsciiOfLength(4) : null; + + MonitoringBulkRequest request = new MonitoringBulkRequest(); + request.add(content.bytes(), defaultMonitoringId, defaultMonitoringVersion, defaultIndex, defaultType); + assertThat(request.getDocs(), hasSize(nbDocs)); + + for (MonitoringBulkDoc doc : request.getDocs()) { + assertThat(doc.getMonitoringId(), equalTo(defaultMonitoringId)); + assertThat(doc.getMonitoringVersion(), equalTo(defaultMonitoringVersion)); + assertThat(doc.getIndex(), equalTo(defaultIndex)); + assertThat(doc.getType(), equalTo(defaultType)); + } + } + } + + public void testSerialization() throws IOException { + MonitoringBulkRequest request = new MonitoringBulkRequest(); + + int numDocs = iterations(10, 30); + for (int i = 0; i < numDocs; i++) { + MonitoringBulkDoc doc = new MonitoringBulkDoc(randomAsciiOfLength(2), randomVersion(random()).toString()); + doc.setType(randomFrom("type1", "type2", "type3")); + doc.setSource(SOURCE); + if (randomBoolean()) { + doc.setIndex("index"); + } + if (randomBoolean()) { + doc.setId(randomAsciiOfLength(3)); + } + if (rarely()) { + doc.setClusterUUID(randomAsciiOfLength(5)); + } + request.add(doc); + } + + BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(randomVersion(random())); + request.writeTo(out); + + StreamInput in = StreamInput.wrap(out.bytes()); + in.setVersion(out.getVersion()); + MonitoringBulkRequest request2 = new MonitoringBulkRequest(); + request2.readFrom(in); + + assertThat(request2.docs.size(), CoreMatchers.equalTo(request.docs.size())); + for (int i = 0; i < request2.docs.size(); i++) { + MonitoringBulkDoc doc = request.docs.get(i); + MonitoringBulkDoc doc2 = request2.docs.get(i); + assertThat(doc2.getMonitoringId(), equalTo(doc.getMonitoringId())); + assertThat(doc2.getMonitoringVersion(), equalTo(doc.getMonitoringVersion())); + assertThat(doc2.getClusterUUID(), equalTo(doc.getClusterUUID())); + assertThat(doc2.getIndex(), equalTo(doc.getIndex())); + assertThat(doc2.getType(), equalTo(doc.getType())); + assertThat(doc2.getId(), equalTo(doc.getId())); + assertThat(doc2.getSource(), equalTo(doc.getSource())); + } + } + + @SuppressWarnings("unchecked") + private static void assertValidationErrors(MonitoringBulkRequest request, Matcher matcher) { + ActionRequestValidationException validation = request.validate(); + if (validation != null) { + assertThat((T) validation.validationErrors(), matcher); + } else { + assertThat(null, matcher); + } + } +} diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkResponseTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkResponseTests.java new file mode 100644 index 00000000000..8c414555277 --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkResponseTests.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.action; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.marvel.agent.exporter.ExportException; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.elasticsearch.test.VersionUtils.randomVersion; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class MonitoringBulkResponseTests extends ESTestCase { + + public void testResponseStatus() { + final long took = Math.abs(randomLong()); + MonitoringBulkResponse response = new MonitoringBulkResponse(took); + + assertThat(response.getTookInMillis(), equalTo(took)); + assertThat(response.getError(), is(nullValue())); + assertThat(response.status(), equalTo(RestStatus.OK)); + + ExportException exception = new ExportException(randomAsciiOfLength(10)); + response = new MonitoringBulkResponse(took, new MonitoringBulkResponse.Error(exception)); + + assertThat(response.getTookInMillis(), equalTo(took)); + assertThat(response.getError(), is(notNullValue())); + assertThat(response.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + } + + public void testSerialization() throws IOException { + int iterations = randomIntBetween(5, 50); + for (int i = 0; i < iterations; i++) { + MonitoringBulkResponse response; + if (randomBoolean()) { + response = new MonitoringBulkResponse(Math.abs(randomLong())); + } else { + Exception exception = randomFrom( + new ExportException(randomAsciiOfLength(5), new IllegalStateException(randomAsciiOfLength(5))), + new IllegalStateException(randomAsciiOfLength(5)), + new IllegalArgumentException(randomAsciiOfLength(5))); + response = new MonitoringBulkResponse(Math.abs(randomLong()), new MonitoringBulkResponse.Error(exception)); + } + + BytesStreamOutput output = new BytesStreamOutput(); + Version outputVersion = randomVersion(random()); + output.setVersion(outputVersion); + response.writeTo(output); + + StreamInput streamInput = StreamInput.wrap(output.bytes()); + streamInput.setVersion(randomVersion(random())); + MonitoringBulkResponse response2 = new MonitoringBulkResponse(); + response2.readFrom(streamInput); + + assertThat(response2.getTookInMillis(), equalTo(response.getTookInMillis())); + if (response.getError() == null) { + assertThat(response2.getError(), is(nullValue())); + } else { + assertThat(response2.getError(), is(notNullValue())); + } + } + } +} diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkTests.java new file mode 100644 index 00000000000..d5644708f6a --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkTests.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.action; + +import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.marvel.MonitoredSystem; +import org.elasticsearch.marvel.agent.resolver.bulk.MonitoringBulkResolver; +import org.elasticsearch.marvel.test.MarvelIntegTestCase; +import org.elasticsearch.search.SearchHit; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class MonitoringBulkTests extends MarvelIntegTestCase { + + @Override + protected Settings transportClientSettings() { + return super.transportClientSettings(); + } + + public void testMonitoringBulkIndexing() throws Exception { + MonitoringBulkRequestBuilder requestBuilder = monitoringClient().prepareMonitoringBulk(); + String[] types = {"type1", "type2", "type3"}; + + int numDocs = scaledRandomIntBetween(100, 5000); + for (int i = 0; i < numDocs; i++) { + MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString()); + doc.setType(randomFrom(types)); + doc.setSource(jsonBuilder().startObject().field("num", numDocs).endObject().bytes()); + requestBuilder.add(doc); + } + + MonitoringBulkResponse response = requestBuilder.get(); + assertThat(response.getError(), is(nullValue())); + refresh(); + + SearchResponse searchResponse = client().prepareSearch().setTypes(types).setSize(numDocs).get(); + assertHitCount(searchResponse, numDocs); + + for (SearchHit searchHit : searchResponse.getHits()) { + Map source = searchHit.sourceAsMap(); + assertNotNull(source.get(MonitoringBulkResolver.Fields.CLUSTER_UUID.underscore().toString())); + assertNotNull(source.get(MonitoringBulkResolver.Fields.TIMESTAMP.underscore().toString())); + assertNotNull(source.get(MonitoringBulkResolver.Fields.SOURCE_NODE.underscore().toString())); + } + } + + /** + * This test creates N threads that execute a random number of monitoring bulk requests. + */ + public void testConcurrentRequests() throws Exception { + final Thread[] threads = new Thread[3 + randomInt(7)]; + final List exceptions = new CopyOnWriteArrayList<>(); + + AtomicInteger total = new AtomicInteger(0); + + logger.info("--> using {} concurrent clients to execute requests", threads.length); + for (int i = 0; i < threads.length; i++) { + final int nbRequests = randomIntBetween(3, 10); + + threads[i] = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + logger.error("unexpected error in exporting thread", t); + exceptions.add(t); + } + + @Override + protected void doRun() throws Exception { + for (int j = 0; j < nbRequests; j++) { + MonitoringBulkRequestBuilder requestBuilder = monitoringClient().prepareMonitoringBulk(); + + int numDocs = scaledRandomIntBetween(10, 1000); + for (int k = 0; k < numDocs; k++) { + MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString()); + doc.setType("concurrent"); + doc.setSource(jsonBuilder().startObject().field("num", k).endObject().bytes()); + requestBuilder.add(doc); + } + + total.addAndGet(numDocs); + MonitoringBulkResponse response = requestBuilder.get(); + assertThat(response.getError(), is(nullValue())); + } + } + }, "export_thread_" + i); + threads[i].start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + assertThat(exceptions, empty()); + refresh(); + + SearchResponse countResponse = client().prepareSearch().setTypes("concurrent").setSize(0).get(); + assertHitCount(countResponse, total.get()); + } + + public void testUnsupportedSystem() throws Exception { + MonitoringBulkRequestBuilder requestBuilder = monitoringClient().prepareMonitoringBulk(); + String[] types = {"type1", "type2", "type3"}; + + int totalDocs = randomIntBetween(10, 1000); + int unsupportedDocs = 0; + + for (int i = 0; i < totalDocs; i++) { + MonitoringBulkDoc doc; + if (randomBoolean()) { + doc = new MonitoringBulkDoc("unknown", Version.CURRENT.toString()); + unsupportedDocs++; + } else { + doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString()); + } + doc.setType(randomFrom(types)); + doc.setSource(jsonBuilder().startObject().field("num", i).endObject().bytes()); + requestBuilder.add(doc); + } + + MonitoringBulkResponse response = requestBuilder.get(); + if (unsupportedDocs == 0) { + assertThat(response.getError(), is(nullValue())); + } else { + assertThat(response.getError(), is(notNullValue())); + } + refresh(); + + SearchResponse countResponse = client().prepareSearch().setTypes(types).setSize(0).get(); + assertHitCount(countResponse, totalDocs - unsupportedDocs); + } +} diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/TransportMonitoringBulkActionTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/TransportMonitoringBulkActionTests.java new file mode 100644 index 00000000000..f99b2af4bf3 --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/TransportMonitoringBulkActionTests.java @@ -0,0 +1,293 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.NodeConnectionsService; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.marvel.MarvelSettings; +import org.elasticsearch.marvel.MonitoredSystem; +import org.elasticsearch.marvel.agent.exporter.ExportException; +import org.elasticsearch.marvel.agent.exporter.Exporters; +import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.VersionUtils.randomVersion; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.IsEqual.equalTo; + +public class TransportMonitoringBulkActionTests extends ESTestCase { + + private static ThreadPool threadPool; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private ClusterService clusterService; + private TransportService transportService; + private CapturingExporters exportService; + private TransportMonitoringBulkAction action; + + @BeforeClass + public static void beforeClass() { + threadPool = new ThreadPool(TransportMonitoringBulkActionTests.class.getSimpleName()); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + } + + @Before + public void setUp() throws Exception { + super.setUp(); + CapturingTransport transport = new CapturingTransport(); + clusterService = new ClusterService(Settings.EMPTY, null, new ClusterSettings(Settings.EMPTY, + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool, + new ClusterName(TransportMonitoringBulkActionTests.class.getName())); + clusterService.setLocalNode(new DiscoveryNode("node", DummyTransportAddress.INSTANCE, Version.CURRENT)); + clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { + @Override + public void connectToAddedNodes(ClusterChangedEvent event) { + // skip + } + + @Override + public void disconnectFromRemovedNodes(ClusterChangedEvent event) { + // skip + } + }); + clusterService.setClusterStatePublisher((event, ackListener) -> {}); + clusterService.start(); + + transportService = new TransportService(transport, threadPool); + transportService.start(); + transportService.acceptIncomingRequests(); + exportService = new CapturingExporters(); + action = new TransportMonitoringBulkAction( + Settings.EMPTY, + threadPool, + clusterService, + transportService, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver(Settings.EMPTY), + exportService + ); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + transportService.close(); + } + + public void testGlobalBlock() throws Exception { + expectedException.expect(ExecutionException.class); + expectedException.expect(hasToString(containsString("ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/2/no master]"))); + + final ClusterBlocks.Builder block = ClusterBlocks.builder().addGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ALL); + final CountDownLatch latch = new CountDownLatch(1); + + clusterService.submitStateUpdateTask("add blocks to cluster state", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + // make sure we increment versions as listener may depend on it for change + return ClusterState.builder(currentState).blocks(block).version(currentState.version() + 1).build(); + } + + @Override + public boolean runOnlyOnMaster() { + return false; + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Throwable t) { + fail("unexpected exception: " + t); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + throw new ElasticsearchException("unexpected interruption", e); + } + + MonitoringBulkRequest request = randomRequest(); + action.execute(request).get(); + } + + public void testEmptyRequest() throws Exception { + expectedException.expect(ExecutionException.class); + expectedException.expect(hasToString(containsString("no monitoring documents added"))); + + MonitoringBulkRequest request = randomRequest(0); + action.execute(request).get(); + + assertThat(exportService.getExported(), hasSize(0)); + } + + public void testBasicRequest() throws Exception { + MonitoringBulkRequest request = randomRequest(); + action.execute(request).get(); + + assertThat(exportService.getExported(), hasSize(request.getDocs().size())); + } + + public void testAsyncActionPrepareDocs() throws Exception { + final PlainActionFuture listener = new PlainActionFuture<>(); + final MonitoringBulkRequest request = randomRequest(); + + Collection results = action.new AsyncAction(request, listener, exportService, clusterService) + .prepareForExport(request.getDocs()); + + assertThat(results, hasSize(request.getDocs().size())); + for (MonitoringDoc exported : results) { + assertThat(exported.getClusterUUID(), equalTo(clusterService.state().metaData().clusterUUID())); + assertThat(exported.getTimestamp(), greaterThan(0L)); + assertThat(exported.getSourceNode(), notNullValue()); + assertThat(exported.getSourceNode().getUUID(), equalTo(clusterService.localNode().getId())); + assertThat(exported.getSourceNode().getName(), equalTo(clusterService.localNode().getName())); + } + } + + public void testAsyncActionExecuteExport() throws Exception { + final PlainActionFuture listener = new PlainActionFuture<>(); + final MonitoringBulkRequest request = randomRequest(); + final Collection docs = Collections.unmodifiableCollection(request.getDocs()); + + action.new AsyncAction(request, listener, exportService, clusterService).executeExport(docs, 0L, listener); + assertThat(listener.get().getError(), nullValue()); + + Collection exported = exportService.getExported(); + assertThat(exported, hasSize(request.getDocs().size())); + } + + public void testAsyncActionExportThrowsException() throws Exception { + final PlainActionFuture listener = new PlainActionFuture<>(); + final MonitoringBulkRequest request = randomRequest(); + + final Exporters exporters = new ConsumingExporters(docs -> { + throw new IllegalStateException(); + }); + + action.new AsyncAction(request, listener, exporters, clusterService).start(); + assertThat(listener.get().getError(), notNullValue()); + assertThat(listener.get().getError().getCause(), instanceOf(IllegalStateException.class)); + } + + /** + * @return a new MonitoringBulkRequest instance with random number of documents + */ + private static MonitoringBulkRequest randomRequest() throws IOException { + return randomRequest(scaledRandomIntBetween(1, 100)); + } + + /** + * @return a new MonitoringBulkRequest instance with given number of documents + */ + private static MonitoringBulkRequest randomRequest(final int numDocs) throws IOException { + MonitoringBulkRequest request = new MonitoringBulkRequest(); + for (int i = 0; i < numDocs; i++) { + MonitoringBulkDoc doc = new MonitoringBulkDoc(randomFrom(MonitoredSystem.values()).getSystem(), + randomVersion(random()).toString()); + doc.setType(randomFrom("type1", "type2")); + doc.setSource(jsonBuilder().startObject().field("num", i).endObject().bytes()); + request.add(doc); + } + return request; + } + + /** + * A Exporters implementation that captures the documents to export + */ + class CapturingExporters extends Exporters { + + private final Collection exported = ConcurrentCollections.newConcurrentSet(); + + public CapturingExporters() { + super(Settings.EMPTY, Collections.emptyMap(), clusterService, + new ClusterSettings(Settings.EMPTY, Collections.singleton(MarvelSettings.EXPORTERS_SETTINGS))); + } + + @Override + public synchronized void export(Collection docs) throws ExportException { + exported.addAll(docs); + } + + public Collection getExported() { + return exported; + } + } + + /** + * A Exporters implementation that applies a Consumer when exporting documents + */ + class ConsumingExporters extends Exporters { + + private final Consumer> consumer; + + public ConsumingExporters(Consumer> consumer) { + super(Settings.EMPTY, Collections.emptyMap(), clusterService, + new ClusterSettings(Settings.EMPTY, Collections.singleton(MarvelSettings.EXPORTERS_SETTINGS))); + this.consumer = consumer; + } + + @Override + public synchronized void export(Collection docs) throws ExportException { + consumer.accept(docs); + } + } + + public static void setState(ClusterService clusterService, ClusterState clusterState) { + + } +} diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java index e44f8c06a9c..66ae9e3595a 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java @@ -327,7 +327,6 @@ public class ExportersTests extends ESTestCase { } } - static class TestFactory extends Exporter.Factory { public TestFactory(String type, boolean singleton) { super(type, singleton); @@ -424,13 +423,13 @@ public class ExportersTests extends ESTestCase { } @Override - public ExportBulk add(Collection docs) throws Exception { + public ExportBulk add(Collection docs) throws ExportException { count.addAndGet(docs.size()); return this; } @Override - public void flush() throws Exception { + public void flush() throws ExportException { } AtomicInteger getCount() { diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/MonitoringDocTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/MonitoringDocTests.java index 6172dfc9ba4..298021e148d 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/MonitoringDocTests.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/MonitoringDocTests.java @@ -43,7 +43,7 @@ public class MonitoringDocTests extends ESTestCase { StreamInput streamInput = StreamInput.wrap(output.bytes()); streamInput.setVersion(randomVersion(random())); - MonitoringDoc monitoringDoc2 = MonitoringDoc.readMonitoringDoc(streamInput); + MonitoringDoc monitoringDoc2 = new MonitoringDoc(streamInput); assertThat(monitoringDoc2.getMonitoringId(), equalTo(monitoringDoc.getMonitoringId())); assertThat(monitoringDoc2.getMonitoringVersion(), equalTo(monitoringDoc.getMonitoringVersion())); @@ -64,7 +64,7 @@ public class MonitoringDocTests extends ESTestCase { public void testSetSourceNode() { int iterations = randomIntBetween(5, 50); for (int i = 0; i < iterations; i++) { - MonitoringDoc monitoringDoc = new MonitoringDoc(); + MonitoringDoc monitoringDoc = new MonitoringDoc(null, null); if (randomBoolean()) { DiscoveryNode discoveryNode = newRandomDiscoveryNode(); diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java index 5a39a514882..eb97432e3ba 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.marvel.MarvelSettings; import org.elasticsearch.marvel.MonitoredSystem; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMonitoringDoc; import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMonitoringDoc; +import org.elasticsearch.marvel.agent.exporter.ExportException; import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporters; import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils; @@ -39,7 +40,6 @@ import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.dataTemplateName; import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.indexTemplateName; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -167,7 +167,14 @@ public class LocalExporterTests extends MarvelIntegTestCase { logger.debug("--> exporting a second monitoring doc"); exporter.export(Collections.singletonList(newRandomMarvelDoc())); } catch (ElasticsearchException e) { - assertThat(e.getMessage(), allOf(containsString("failure in bulk execution"), containsString("IndexClosedException[closed]"))); + assertThat(e.getMessage(), containsString("failed to flush export bulk [_local]")); + assertThat(e.getCause(), instanceOf(ExportException.class)); + + ExportException cause = (ExportException) e.getCause(); + assertTrue(cause.hasExportExceptions()); + for (ExportException c : cause) { + assertThat(c.getMessage(), containsString("IndexClosedException[closed]")); + } assertNull(exporter.getBulk().requestBuilder); } } diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/resolver/MonitoringIndexNameResolverTestCase.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/resolver/MonitoringIndexNameResolverTestCase.java index e80c4226862..3c2fdb0e9a1 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/resolver/MonitoringIndexNameResolverTestCase.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/resolver/MonitoringIndexNameResolverTestCase.java @@ -41,7 +41,7 @@ import static org.hamcrest.Matchers.startsWith; public abstract class MonitoringIndexNameResolverTestCase> extends ESTestCase { - private final ResolversRegistry resolversRegistry = new ResolversRegistry(Settings.EMPTY); + protected final ResolversRegistry resolversRegistry = new ResolversRegistry(Settings.EMPTY); /** * @return the {@link MonitoringIndexNameResolver} to test diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/resolver/bulk/MonitoringBulkResolverTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/resolver/bulk/MonitoringBulkResolverTests.java new file mode 100644 index 00000000000..3ffeb72e4b2 --- /dev/null +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/resolver/bulk/MonitoringBulkResolverTests.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.agent.resolver.bulk; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.marvel.MonitoredSystem; +import org.elasticsearch.marvel.action.MonitoringBulkDoc; +import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils; +import org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolverTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class MonitoringBulkResolverTests extends MonitoringIndexNameResolverTestCase { + + @Override + protected MonitoringBulkDoc newMarvelDoc() { + MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString()); + doc.setClusterUUID(randomAsciiOfLength(5)); + doc.setTimestamp(Math.abs(randomLong())); + doc.setSourceNode(new DiscoveryNode("id", DummyTransportAddress.INSTANCE, Version.CURRENT)); + doc.setType("kibana_stats"); + doc.setSource(new BytesArray("{\"field1\" : \"value1\"}")); + return doc; + } + + @Override + protected boolean checkResolvedId() { + return false; + } + + @Override + protected boolean checkFilters() { + return false; + } + + public void testMonitoringBulkResolver() throws Exception { + MonitoringBulkDoc doc = newMarvelDoc(); + doc.setTimestamp(1437580442979L); + if (randomBoolean()) { + doc.setIndex(randomAsciiOfLength(5)); + } + if (randomBoolean()) { + doc.setId(randomAsciiOfLength(35)); + } + if (randomBoolean()) { + doc.setClusterUUID(randomAsciiOfLength(5)); + } + + MonitoringBulkResolver resolver = newResolver(); + assertThat(resolver.index(doc), equalTo(".monitoring-kibana-0-2015.07.22")); + assertThat(resolver.type(doc), equalTo(doc.getType())); + assertThat(resolver.id(doc), nullValue()); + + assertSource(resolver.source(doc, XContentType.JSON), + "cluster_uuid", + "timestamp", + "source_node", + "kibana_stats", + "kibana_stats.field1"); + } +} diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/test/MarvelIntegTestCase.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/test/MarvelIntegTestCase.java index 9e4c8dbbd4e..b7af75d0320 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/test/MarvelIntegTestCase.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/test/MarvelIntegTestCase.java @@ -442,9 +442,9 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase { public static final String ROLES = "test:\n" + // a user for the test infra. " cluster: [ 'cluster:monitor/nodes/info', 'cluster:monitor/state', 'cluster:monitor/health', 'cluster:monitor/stats'," + - " 'cluster:admin/settings/update', 'cluster:admin/repository/delete', 'cluster:monitor/nodes/liveness'," + - " 'indices:admin/template/get', 'indices:admin/template/put', 'indices:admin/template/delete'," + - " 'cluster:monitor/task']\n" + + " 'cluster:admin/settings/update', 'cluster:admin/repository/delete', 'cluster:monitor/nodes/liveness'," + + " 'indices:admin/template/get', 'indices:admin/template/put', 'indices:admin/template/delete'," + + " 'cluster:monitor/task', 'cluster:admin/xpack/monitoring/bulk' ]\n" + " indices:\n" + " - names: '*'\n" + " privileges: [ all ]\n" + diff --git a/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/XPackPlugin.java b/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/XPackPlugin.java index 17e835d3d97..df0ab3e000a 100644 --- a/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/XPackPlugin.java +++ b/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/XPackPlugin.java @@ -163,6 +163,7 @@ public class XPackPlugin extends Plugin { public void onModule(NetworkModule module) { licensing.onModule(module); + marvel.onModule(module); shield.onModule(module); watcher.onModule(module); graph.onModule(module); @@ -170,6 +171,7 @@ public class XPackPlugin extends Plugin { public void onModule(ActionModule module) { licensing.onModule(module); + marvel.onModule(module); shield.onModule(module); watcher.onModule(module); graph.onModule(module);