Monitoring: Add REST endpoint to allow external systems to index monitoring data

Original commit: elastic/x-pack-elasticsearch@04aa96a228
This commit is contained in:
Tanguy Leroux 2016-03-15 21:20:12 +01:00
parent 40dc747968
commit fe97d2ba51
36 changed files with 1929 additions and 200 deletions

View File

@ -5,13 +5,14 @@
*/ */
package org.elasticsearch.marvel; package org.elasticsearch.marvel;
import org.elasticsearch.client.Client; import org.elasticsearch.action.ActionModule;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.marvel.action.MonitoringBulkAction;
import org.elasticsearch.marvel.action.TransportMonitoringBulkAction;
import org.elasticsearch.marvel.agent.AgentService; import org.elasticsearch.marvel.agent.AgentService;
import org.elasticsearch.marvel.agent.collector.CollectorModule; import org.elasticsearch.marvel.agent.collector.CollectorModule;
import org.elasticsearch.marvel.agent.exporter.ExporterModule; import org.elasticsearch.marvel.agent.exporter.ExporterModule;
@ -19,49 +20,58 @@ import org.elasticsearch.marvel.cleaner.CleanerService;
import org.elasticsearch.marvel.client.MonitoringClientModule; import org.elasticsearch.marvel.client.MonitoringClientModule;
import org.elasticsearch.marvel.license.LicenseModule; import org.elasticsearch.marvel.license.LicenseModule;
import org.elasticsearch.marvel.license.MarvelLicensee; import org.elasticsearch.marvel.license.MarvelLicensee;
import org.elasticsearch.marvel.rest.action.RestMonitoringBulkAction;
import org.elasticsearch.marvel.support.init.proxy.MonitoringClientProxy; import org.elasticsearch.marvel.support.init.proxy.MonitoringClientProxy;
import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.common.init.LazyInitializationModule; import org.elasticsearch.xpack.common.init.LazyInitializationModule;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List;
/**
* This class activates/deactivates the monitoring modules depending if we're running a node client, transport client or tribe client:
* - node clients: all modules are binded
* - transport clients: only action/transport actions are binded
* - tribe clients: everything is disables by default but can be enabled per tribe cluster
*/
public class Marvel { public class Marvel {
private static final ESLogger logger = Loggers.getLogger(XPackPlugin.class);
public static final String NAME = "monitoring"; public static final String NAME = "monitoring";
private final Settings settings; private final Settings settings;
private final boolean enabled; private final boolean enabled;
private final boolean transportClientMode;
public Marvel(Settings settings) { public Marvel(Settings settings) {
this.settings = settings; this.settings = settings;
this.enabled = enabled(settings); this.enabled = MarvelSettings.ENABLED.get(settings);
this.transportClientMode = XPackPlugin.transportClientMode(settings);
} }
boolean isEnabled() { boolean isEnabled() {
return enabled; return enabled;
} }
public Collection<Module> nodeModules() { boolean isTransportClient() {
List<Module> modules = new ArrayList<>(); return transportClientMode;
if (enabled) {
modules.add(new MarvelModule());
modules.add(new LicenseModule());
modules.add(new CollectorModule());
modules.add(new ExporterModule(settings));
modules.add(new MonitoringClientModule());
} }
return Collections.unmodifiableList(modules);
public Collection<Module> nodeModules() {
if (enabled == false || transportClientMode) {
return Collections.emptyList();
}
return Arrays.<Module>asList(
new MarvelModule(),
new LicenseModule(),
new CollectorModule(),
new ExporterModule(settings),
new MonitoringClientModule());
} }
public Collection<Class<? extends LifecycleComponent>> nodeServices() { public Collection<Class<? extends LifecycleComponent>> nodeServices() {
if (enabled == false) { if (enabled == false || transportClientMode) {
return Collections.emptyList(); return Collections.emptyList();
} }
return Arrays.<Class<? extends LifecycleComponent>>asList(MarvelLicensee.class, return Arrays.<Class<? extends LifecycleComponent>>asList(MarvelLicensee.class,
@ -69,18 +79,22 @@ public class Marvel {
CleanerService.class); CleanerService.class);
} }
public static boolean enabled(Settings settings) {
if ("node".equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())) == false) {
logger.trace("monitoring cannot be started on a transport client");
return false;
}
return MarvelSettings.ENABLED.get(settings);
}
public void onModule(SettingsModule module) { public void onModule(SettingsModule module) {
MarvelSettings.register(module); MarvelSettings.register(module);
} }
public void onModule(ActionModule module) {
if (enabled) {
module.registerAction(MonitoringBulkAction.INSTANCE, TransportMonitoringBulkAction.class);
}
}
public void onModule(NetworkModule module) {
if (enabled && transportClientMode == false) {
module.registerRestHandler(RestMonitoringBulkAction.class);
}
}
public void onModule(LazyInitializationModule module) { public void onModule(LazyInitializationModule module) {
if (enabled) { if (enabled) {
module.registerLazyInitializable(MonitoringClientProxy.class); module.registerLazyInitializable(MonitoringClientProxy.class);

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
public class MonitoringBulkAction extends Action<MonitoringBulkRequest, MonitoringBulkResponse, MonitoringBulkRequestBuilder> {
public static final MonitoringBulkAction INSTANCE = new MonitoringBulkAction();
public static final String NAME = "cluster:admin/xpack/monitoring/bulk";
private MonitoringBulkAction() {
super(NAME);
}
@Override
public MonitoringBulkRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new MonitoringBulkRequestBuilder(client);
}
@Override
public MonitoringBulkResponse newResponse() {
return new MonitoringBulkResponse();
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.action;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.marvel.agent.exporter.MonitoringDoc;
import java.io.IOException;
public class MonitoringBulkDoc extends MonitoringDoc {
private String index;
private String type;
private String id;
private BytesReference source;
public MonitoringBulkDoc(String monitoringId, String monitoringVersion) {
super(monitoringId, monitoringVersion);
}
public MonitoringBulkDoc(StreamInput in) throws IOException {
super(in);
index = in.readOptionalString();
type = in.readOptionalString();
id = in.readOptionalString();
source = in.readBytesReference();
}
public String getIndex() {
return index;
}
public void setIndex(String index) {
this.index = index;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public BytesReference getSource() {
return source;
}
public void setSource(BytesReference source) {
this.source = source;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(index);
out.writeOptionalString(type);
out.writeOptionalString(id);
out.writeBytesReference(source);
}
@Override
public MonitoringBulkDoc readFrom(StreamInput in) throws IOException {
return new MonitoringBulkDoc(in);
}
}

View File

@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* A monitoring bulk request holds one or more {@link MonitoringBulkDoc}s.
* <p>
* Every monitoring document added to the request is associated to a monitoring system id and version. If this {id, version} pair is
* supported by the monitoring plugin, the monitoring documents will be indexed in a single batch using a normal bulk request.
* <p>
* The monitoring {id, version} pair is used by {org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolver} to resolve the index,
* type and id of the final document to be indexed. A {@link MonitoringBulkDoc} can also hold its own index/type/id values but there's no
* guarantee that these information will be effectively used.
*/
public class MonitoringBulkRequest extends ActionRequest<MonitoringBulkRequest> {
final List<MonitoringBulkDoc> docs = new ArrayList<>();
/**
* @return the list of monitoring documents to be indexed
*/
public Collection<MonitoringBulkDoc> getDocs() {
return Collections.unmodifiableCollection(new ArrayList<>(this.docs));
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (docs.isEmpty()) {
validationException = addValidationError("no monitoring documents added", validationException);
}
for (int i = 0; i < docs.size(); i++) {
MonitoringBulkDoc doc = docs.get(i);
if (Strings.hasLength(doc.getMonitoringId()) == false) {
validationException = addValidationError("monitored system id is missing for monitoring document [" + i + "]",
validationException);
}
if (Strings.hasLength(doc.getMonitoringVersion()) == false) {
validationException = addValidationError("monitored system version is missing for monitoring document [" + i + "]",
validationException);
}
if (Strings.hasLength(doc.getType()) == false) {
validationException = addValidationError("type is missing for monitoring document [" + i + "]",
validationException);
}
if (doc.getSource() == null || doc.getSource().length() == 0) {
validationException = addValidationError("source is missing for monitoring document [" + i + "]", validationException);
}
}
return validationException;
}
/**
* Adds a monitoring document to the list of documents to be indexed.
*/
public MonitoringBulkRequest add(MonitoringBulkDoc doc) {
docs.add(doc);
return this;
}
/**
* Parses a monitoring bulk request and builds the list of documents to be indexed.
*/
public MonitoringBulkRequest add(BytesReference content, String defaultMonitoringId, String defaultMonitoringVersion,
String defaultIndex, String defaultType) throws Exception {
// MonitoringBulkRequest accepts a body request that has the same format as the BulkRequest:
// instead of duplicating the parsing logic here we use a new BulkRequest instance to parse the content.
BulkRequest bulkRequest = Requests.bulkRequest().add(content, defaultIndex, defaultType);
for (ActionRequest request : bulkRequest.requests()) {
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
// builds a new monitoring document based on the index request
MonitoringBulkDoc doc = new MonitoringBulkDoc(defaultMonitoringId, defaultMonitoringVersion);
doc.setIndex(indexRequest.index());
doc.setType(indexRequest.type());
doc.setId(indexRequest.id());
doc.setSource(indexRequest.source());
add(doc);
} else {
throw new IllegalArgumentException("monitoring bulk requests should only contain index requests");
}
}
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
add(new MonitoringBulkDoc(in));
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(docs.size());
for (MonitoringBulkDoc doc : docs) {
doc.writeTo(out);
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
public class MonitoringBulkRequestBuilder
extends ActionRequestBuilder<MonitoringBulkRequest, MonitoringBulkResponse, MonitoringBulkRequestBuilder> {
public MonitoringBulkRequestBuilder(ElasticsearchClient client) {
super(client, MonitoringBulkAction.INSTANCE, new MonitoringBulkRequest());
}
public MonitoringBulkRequestBuilder add(MonitoringBulkDoc doc) {
request.add(doc);
return this;
}
public MonitoringBulkRequestBuilder add(BytesReference content, String defaultId, String defaultVersion, String defaultIndex,
String defaultType) throws Exception {
request.add(content, defaultId, defaultVersion, defaultIndex, defaultType);
return this;
}
}

View File

@ -0,0 +1,138 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Objects;
public class MonitoringBulkResponse extends ActionResponse {
private long tookInMillis;
private Error error;
MonitoringBulkResponse() {
}
public MonitoringBulkResponse(long tookInMillis) {
this(tookInMillis, null);
}
public MonitoringBulkResponse(long tookInMillis, Error error) {
this.tookInMillis = tookInMillis;
this.error = error;
}
public TimeValue getTook() {
return new TimeValue(tookInMillis);
}
public long getTookInMillis() {
return tookInMillis;
}
/**
* Returns HTTP status
* <ul>
* <li>{@link RestStatus#OK} if monitoring bulk request was successful</li>
* <li>{@link RestStatus#INTERNAL_SERVER_ERROR} if monitoring bulk request was partially successful or failed completely</li>
* </ul>
*/
public RestStatus status() {
return error == null ? RestStatus.OK : RestStatus.INTERNAL_SERVER_ERROR;
}
public Error getError() {
return error;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
tookInMillis = in.readVLong();
error = in.readOptionalWritable(Error::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(tookInMillis);
out.writeOptionalWriteable(error);
}
public static class Error implements Writeable<Error>, ToXContent {
private final Throwable cause;
private final RestStatus status;
public Error(Throwable t) {
cause = Objects.requireNonNull(t);
status = ExceptionsHelper.status(t);
}
Error(StreamInput in) throws IOException {
this(in.<Throwable>readThrowable());
}
/**
* The failure message.
*/
public String getMessage() {
return this.cause.toString();
}
/**
* The rest status.
*/
public RestStatus getStatus() {
return this.status;
}
/**
* The actual cause of the failure.
*/
public Throwable getCause() {
return cause;
}
@Override
public Error readFrom(StreamInput in) throws IOException {
return new Error(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeThrowable(getCause());
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
ElasticsearchException.toXContent(builder, params, cause);
builder.endObject();
return builder;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Error [");
sb.append("cause=").append(cause);
sb.append(", status=").append(status);
sb.append(']');
return sb.toString();
}
}
}

View File

@ -0,0 +1,127 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.exporter.MonitoringDoc;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
public class TransportMonitoringBulkAction extends HandledTransportAction<MonitoringBulkRequest, MonitoringBulkResponse> {
private final ClusterService clusterService;
private final Exporters exportService;
@Inject
public TransportMonitoringBulkAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Exporters exportService) {
super(settings, MonitoringBulkAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
MonitoringBulkRequest::new);
this.clusterService = clusterService;
this.exportService = exportService;
}
@Override
protected void doExecute(MonitoringBulkRequest request, ActionListener<MonitoringBulkResponse> listener) {
clusterService.state().blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);
new AsyncAction(request, listener, exportService, clusterService).start();
}
class AsyncAction {
private final MonitoringBulkRequest request;
private final ActionListener<MonitoringBulkResponse> listener;
private final Exporters exportService;
private final ClusterService clusterService;
public AsyncAction(MonitoringBulkRequest request, ActionListener<MonitoringBulkResponse> listener,
Exporters exportService, ClusterService clusterService) {
this.request = request;
this.listener = listener;
this.exportService = exportService;
this.clusterService = clusterService;
}
void start() {
executeExport(prepareForExport(request.getDocs()), System.nanoTime(), listener);
}
/**
* Iterate over the documents and set the values of common fields if needed:
* - cluster UUID
* - timestamp
* - source node
*/
Collection<MonitoringDoc> prepareForExport(Collection<? extends MonitoringDoc> docs) {
final String clusterUUID = clusterService.state().metaData().clusterUUID();
Function<MonitoringDoc, MonitoringDoc> updateClusterUUID = doc -> {
if (doc.getClusterUUID() == null) {
doc.setClusterUUID(clusterUUID);
}
return doc;
};
final long timestamp = System.currentTimeMillis();
Function<MonitoringDoc, MonitoringDoc> updateTimestamp = doc -> {
if (doc.getTimestamp() == 0) {
doc.setTimestamp(timestamp);
}
return doc;
};
final DiscoveryNode sourceNode = clusterService.localNode();
Function<MonitoringDoc, MonitoringDoc> updateSourceNode = doc -> {
if (doc.getSourceNode() == null) {
doc.setSourceNode(sourceNode);
}
return doc;
};
return docs.stream()
.map(updateClusterUUID.andThen(updateTimestamp.andThen(updateSourceNode)))
.collect(Collectors.toList());
}
/**
* Exports the documents
*/
void executeExport(final Collection<MonitoringDoc> docs, final long startTimeNanos,
final ActionListener<MonitoringBulkResponse> listener) {
threadPool.generic().execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
exportService.export(docs);
listener.onResponse(new MonitoringBulkResponse(buildTookInMillis(startTimeNanos)));
}
@Override
public void onFailure(Throwable t) {
listener.onResponse(new MonitoringBulkResponse(buildTookInMillis(startTimeNanos), new MonitoringBulkResponse.Error(t)));
}
});
}
}
private long buildTookInMillis(long startTimeNanos) {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.marvel.MarvelSettings; import org.elasticsearch.marvel.MarvelSettings;
import org.elasticsearch.marvel.agent.collector.Collector; import org.elasticsearch.marvel.agent.collector.Collector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStatsCollector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.marvel.agent.exporter.ExportException;
import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.Exporters; import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; import org.elasticsearch.marvel.agent.exporter.MonitoringDoc;
@ -199,6 +200,8 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
exporters.export(docs); exporters.export(docs);
} }
} catch (ExportException e) {
logger.error("exception when exporting documents", e);
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.trace("interrupted"); logger.trace("interrupted");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();

View File

@ -5,8 +5,6 @@
*/ */
package org.elasticsearch.marvel.agent.exporter; package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.ElasticsearchException;
import java.util.Collection; import java.util.Collection;
/** /**
@ -25,18 +23,14 @@ public abstract class ExportBulk {
return name; return name;
} }
public abstract ExportBulk add(Collection<MonitoringDoc> docs) throws Exception; public abstract ExportBulk add(Collection<MonitoringDoc> docs) throws ExportException;
public abstract void flush() throws Exception; public abstract void flush() throws ExportException;
public final void close(boolean flush) throws Exception { public final void close(boolean flush) throws ExportException {
Exception exception = null; ExportException exception = null;
if (flush) { if (flush) {
try {
flush(); flush();
} catch (Exception e) {
exception = e;
}
} }
// now closing // now closing
@ -46,7 +40,7 @@ public abstract class ExportBulk {
if (exception != null) { if (exception != null) {
exception.addSuppressed(e); exception.addSuppressed(e);
} else { } else {
exception = e; exception = new ExportException("Exception when closing export bulk", e);
} }
} }
@ -69,24 +63,35 @@ public abstract class ExportBulk {
} }
@Override @Override
public ExportBulk add(Collection<MonitoringDoc> docs) throws Exception { public ExportBulk add(Collection<MonitoringDoc> docs) throws ExportException {
ExportException exception = null;
for (ExportBulk bulk : bulks) { for (ExportBulk bulk : bulks) {
try {
bulk.add(docs); bulk.add(docs);
} catch (ExportException e) {
if (exception == null) {
exception = new ExportException("failed to add documents to export bulks");
}
exception.addExportException(e);
}
}
if (exception != null) {
throw exception;
} }
return this; return this;
} }
@Override @Override
public void flush() throws Exception { public void flush() throws ExportException {
Exception exception = null; ExportException exception = null;
for (ExportBulk bulk : bulks) { for (ExportBulk bulk : bulks) {
try { try {
bulk.flush(); bulk.flush();
} catch (Exception e) { } catch (ExportException e) {
if (exception == null) { if (exception == null) {
exception = new ElasticsearchException("failed to flush exporter bulks"); exception = new ExportException("failed to flush export bulks");
} }
exception.addSuppressed(new ElasticsearchException("failed to flush [{}] exporter bulk", e, bulk.name)); exception.addExportException(e);
} }
} }
if (exception != null) { if (exception != null) {

View File

@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class ExportException extends ElasticsearchException implements Iterable<ExportException> {
private final List<ExportException> exceptions = new ArrayList<>();
public ExportException(Throwable throwable) {
super(throwable);
}
public ExportException(String msg, Object... args) {
super(msg, args);
}
public ExportException(String msg, Throwable throwable, Object... args) {
super(msg, throwable, args);
}
public ExportException(StreamInput in) throws IOException {
super(in);
for (int i = in.readVInt(); i > 0; i--) {
exceptions.add(new ExportException(in));
}
}
public boolean addExportException(ExportException e) {
return exceptions.add(e);
}
public boolean hasExportExceptions() {
return exceptions.size() > 0;
}
@Override
public Iterator<ExportException> iterator() {
return exceptions.iterator();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(exceptions.size());
for (ExportException e : exceptions) {
e.writeTo(out);
}
}
@Override
protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
super.innerToXContent(builder, params);
if (hasExportExceptions()) {
builder.startArray("exceptions");
for (ExportException exception : exceptions) {
builder.startObject();
exception.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
}
}
}

View File

@ -122,7 +122,7 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
bulks.add(bulk); bulks.add(bulk);
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("exporter [{}] failed to export monitoring data", e, exporter.name()); logger.error("exporter [{}] failed to open exporting bulk", e, exporter.name());
} }
} }
return bulks.isEmpty() ? null : new ExportBulk.Compound(bulks); return bulks.isEmpty() ? null : new ExportBulk.Compound(bulks);
@ -179,9 +179,9 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
/** /**
* Exports a collection of monitoring documents using the configured exporters * Exports a collection of monitoring documents using the configured exporters
*/ */
public synchronized void export(Collection<MonitoringDoc> docs) throws Exception { public synchronized void export(Collection<MonitoringDoc> docs) throws ExportException {
if (this.lifecycleState() != Lifecycle.State.STARTED) { if (this.lifecycleState() != Lifecycle.State.STARTED) {
throw new IllegalStateException("Export service is not started"); throw new ExportException("Export service is not started");
} }
if (docs != null && docs.size() > 0) { if (docs != null && docs.size() > 0) {
ExportBulk bulk = openBulk(); ExportBulk bulk = openBulk();
@ -191,7 +191,6 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
} }
try { try {
logger.debug("exporting [{}] monitoring documents", docs.size());
bulk.add(docs); bulk.add(docs);
} finally { } finally {
bulk.close(lifecycleState() == Lifecycle.State.STARTED); bulk.close(lifecycleState() == Lifecycle.State.STARTED);

View File

@ -22,8 +22,6 @@ import java.io.IOException;
*/ */
public class MonitoringDoc implements Writeable<MonitoringDoc> { public class MonitoringDoc implements Writeable<MonitoringDoc> {
private static final MonitoringDoc PROTO = new MonitoringDoc();
private final String monitoringId; private final String monitoringId;
private final String monitoringVersion; private final String monitoringVersion;
@ -31,16 +29,18 @@ public class MonitoringDoc implements Writeable<MonitoringDoc> {
private long timestamp; private long timestamp;
private Node sourceNode; private Node sourceNode;
// Used by {@link #PROTO} instance and tests
MonitoringDoc() {
this(null, null);
}
public MonitoringDoc(String monitoringId, String monitoringVersion) { public MonitoringDoc(String monitoringId, String monitoringVersion) {
this.monitoringId = monitoringId; this.monitoringId = monitoringId;
this.monitoringVersion = monitoringVersion; this.monitoringVersion = monitoringVersion;
} }
public MonitoringDoc(StreamInput in) throws IOException {
this(in.readOptionalString(), in.readOptionalString());
clusterUUID = in.readOptionalString();
timestamp = in.readVLong();
sourceNode = in.readOptionalWritable(Node::new);
}
public String getClusterUUID() { public String getClusterUUID() {
return clusterUUID; return clusterUUID;
} }
@ -80,7 +80,7 @@ public class MonitoringDoc implements Writeable<MonitoringDoc> {
@Override @Override
public String toString() { public String toString() {
return "marvel document [class=" + getClass().getName() + return "monitoring document [class=" + getClass().getSimpleName() +
", monitoring id=" + getMonitoringId() + ", monitoring id=" + getMonitoringId() +
", monitoring version=" + getMonitoringVersion() + ", monitoring version=" + getMonitoringVersion() +
"]"; "]";
@ -92,33 +92,16 @@ public class MonitoringDoc implements Writeable<MonitoringDoc> {
out.writeOptionalString(getMonitoringVersion()); out.writeOptionalString(getMonitoringVersion());
out.writeOptionalString(getClusterUUID()); out.writeOptionalString(getClusterUUID());
out.writeVLong(getTimestamp()); out.writeVLong(getTimestamp());
if (getSourceNode() != null) { out.writeOptionalWriteable(getSourceNode());
out.writeBoolean(true);
getSourceNode().writeTo(out);
} else {
out.writeBoolean(false);
}
} }
@Override @Override
public MonitoringDoc readFrom(StreamInput in) throws IOException { public MonitoringDoc readFrom(StreamInput in) throws IOException {
MonitoringDoc doc = new MonitoringDoc(in.readOptionalString(), in.readOptionalString()); return new MonitoringDoc(in);
doc.setClusterUUID(in.readOptionalString());
doc.setTimestamp(in.readVLong());
if (in.readBoolean()) {
doc.setSourceNode(Node.PROTO.readFrom(in));
}
return doc;
}
public static MonitoringDoc readMonitoringDoc(StreamInput in) throws IOException {
return PROTO.readFrom(in);
} }
public static class Node implements Writeable<Node>, ToXContent { public static class Node implements Writeable<Node>, ToXContent {
public static final Node PROTO = new Node();
private String uuid; private String uuid;
private String host; private String host;
private String transportAddress; private String transportAddress;
@ -126,10 +109,6 @@ public class MonitoringDoc implements Writeable<MonitoringDoc> {
private String name; private String name;
private ImmutableOpenMap<String, String> attributes; private ImmutableOpenMap<String, String> attributes;
// Used by the {@link #PROTO} instance
Node() {
}
public Node(String uuid, String host, String transportAddress, String ip, String name, public Node(String uuid, String host, String transportAddress, String ip, String name,
ImmutableOpenMap<String, String> attributes) { ImmutableOpenMap<String, String> attributes) {
this.uuid = uuid; this.uuid = uuid;
@ -147,6 +126,20 @@ public class MonitoringDoc implements Writeable<MonitoringDoc> {
this.attributes = builder.build(); this.attributes = builder.build();
} }
public Node(StreamInput in) throws IOException {
uuid = in.readOptionalString();
host = in.readOptionalString();
transportAddress = in.readOptionalString();
ip = in.readOptionalString();
name = in.readOptionalString();
int size = in.readVInt();
ImmutableOpenMap.Builder<String, String> attributes = ImmutableOpenMap.builder(size);
for (int i = 0; i < size; i++) {
attributes.put(in.readOptionalString(), in.readOptionalString());
}
this.attributes = attributes.build();
}
public String getUUID() { public String getUUID() {
return uuid; return uuid;
} }
@ -208,19 +201,7 @@ public class MonitoringDoc implements Writeable<MonitoringDoc> {
@Override @Override
public Node readFrom(StreamInput in) throws IOException { public Node readFrom(StreamInput in) throws IOException {
Node node = new Node(); return new Node(in);
node.uuid = in.readOptionalString();
node.host = in.readOptionalString();
node.transportAddress = in.readOptionalString();
node.ip = in.readOptionalString();
node.name = in.readOptionalString();
int size = in.readVInt();
ImmutableOpenMap.Builder<String, String> attributes = ImmutableOpenMap.builder(size);
for (int i = 0; i < size; i++) {
attributes.put(in.readOptionalString(), in.readOptionalString());
}
node.attributes = attributes.build();
return node;
} }
@Override @Override

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.marvel.agent.exporter.ExportBulk; import org.elasticsearch.marvel.agent.exporter.ExportBulk;
import org.elasticsearch.marvel.agent.exporter.ExportException;
import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils; import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; import org.elasticsearch.marvel.agent.exporter.MonitoringDoc;
@ -79,7 +80,9 @@ public class HttpExporter extends Exporter {
public static final String SSL_TRUSTSTORE_ALGORITHM_SETTING = "truststore.algorithm"; public static final String SSL_TRUSTSTORE_ALGORITHM_SETTING = "truststore.algorithm";
public static final String SSL_HOSTNAME_VERIFICATION_SETTING = SSL_SETTING + ".hostname_verification"; public static final String SSL_HOSTNAME_VERIFICATION_SETTING = SSL_SETTING + ".hostname_verification";
/** Minimum supported version of the remote monitoring cluster **/ /**
* Minimum supported version of the remote monitoring cluster
**/
public static final Version MIN_SUPPORTED_CLUSTER_VERSION = Version.V_2_0_0_beta2; public static final Version MIN_SUPPORTED_CLUSTER_VERSION = Version.V_2_0_0_beta2;
private static final XContentType CONTENT_TYPE = XContentType.JSON; private static final XContentType CONTENT_TYPE = XContentType.JSON;
@ -89,19 +92,24 @@ public class HttpExporter extends Exporter {
final TimeValue connectionReadTimeout; final TimeValue connectionReadTimeout;
final BasicAuth auth; final BasicAuth auth;
/** https support * */ /**
* https support *
*/
final SSLSocketFactory sslSocketFactory; final SSLSocketFactory sslSocketFactory;
final boolean hostnameVerification; final boolean hostnameVerification;
final Environment env; final Environment env;
final ResolversRegistry resolvers; final ResolversRegistry resolvers;
final @Nullable TimeValue templateCheckTimeout; @Nullable
final TimeValue templateCheckTimeout;
volatile boolean checkedAndUploadedIndexTemplate = false; volatile boolean checkedAndUploadedIndexTemplate = false;
volatile boolean supportedClusterVersion = false; volatile boolean supportedClusterVersion = false;
/** Version number of built-in templates **/ /**
* Version number of built-in templates
**/
private final Integer templateVersion; private final Integer templateVersion;
boolean keepAlive; boolean keepAlive;
@ -218,9 +226,9 @@ public class HttpExporter extends Exporter {
logger.trace("http exporter [{}] - added index request [index={}, type={}, id={}]", logger.trace("http exporter [{}] - added index request [index={}, type={}, id={}]",
name(), index, type, id); name(), index, type, id);
} }
} else { } else if (logger.isTraceEnabled()) {
logger.warn("http exporter [{}] - unable to render monitoring document of type [{}]: no renderer found in registry", logger.trace("http exporter [{}] - no resolver found for monitoring document [class={}, id={}, version={}]",
name(), doc); name(), doc.getClass().getName(), doc.getMonitoringId(), doc.getMonitoringVersion());
} }
} catch (Exception e) { } catch (Exception e) {
logger.warn("http exporter [{}] - failed to render document [{}], skipping it", e, name(), doc); logger.warn("http exporter [{}] - failed to render document [{}], skipping it", e, name(), doc);
@ -318,7 +326,9 @@ public class HttpExporter extends Exporter {
return null; return null;
} }
/** open a connection to the given hosts, returning null when not successful * */ /**
* open a connection to the given hosts, returning null when not successful *
*/
private HttpURLConnection openConnection(String host, String method, String path, @Nullable String contentType) { private HttpURLConnection openConnection(String host, String method, String path, @Nullable String contentType) {
try { try {
final URL url = HttpExporterUtils.parseHostWithPath(host, path); final URL url = HttpExporterUtils.parseHostWithPath(host, path);
@ -450,7 +460,7 @@ public class HttpExporter extends Exporter {
// 200 means that the template has been found, 404 otherwise // 200 means that the template has been found, 404 otherwise
if (connection.getResponseCode() == 200) { if (connection.getResponseCode() == 200) {
logger.debug("monitoring template [{}] found",templateName); logger.debug("monitoring template [{}] found", templateName);
return true; return true;
} }
} catch (Exception e) { } catch (Exception e) {
@ -543,7 +553,9 @@ public class HttpExporter extends Exporter {
} }
} }
/** SSL Initialization * */ /**
* SSL Initialization *
*/
public SSLSocketFactory createSSLSocketFactory(Settings settings) { public SSLSocketFactory createSSLSocketFactory(Settings settings) {
if (settings.names().isEmpty()) { if (settings.names().isEmpty()) {
logger.trace("no ssl context configured"); logger.trace("no ssl context configured");
@ -693,11 +705,15 @@ public class HttpExporter extends Exporter {
} }
@Override @Override
public Bulk add(Collection<MonitoringDoc> docs) throws Exception { public Bulk add(Collection<MonitoringDoc> docs) throws ExportException {
try {
if ((docs != null) && (!docs.isEmpty())) {
if (connection == null) { if (connection == null) {
connection = openExportingConnection(); connection = openExportingConnection();
if (connection == null) {
throw new IllegalStateException("No connection available to export documents");
}
} }
if ((docs != null) && (!docs.isEmpty())) {
if (out == null) { if (out == null) {
out = connection.getOutputStream(); out = connection.getOutputStream();
} }
@ -716,24 +732,27 @@ public class HttpExporter extends Exporter {
} }
} }
} }
} catch (Exception e) {
throw new ExportException("failed to add documents to export bulk [{}]", name);
}
return this; return this;
} }
@Override @Override
public void flush() throws IOException { public void flush() throws ExportException {
if (connection != null) { if (connection != null) {
try {
flush(connection); flush(connection);
} catch (Exception e) {
throw new ExportException("failed to flush export bulk [{}]", e, name);
} finally {
connection = null; connection = null;
} }
} }
}
private void flush(HttpURLConnection connection) throws IOException { private void flush(HttpURLConnection connection) throws IOException {
try {
sendCloseExportingConnection(connection); sendCloseExportingConnection(connection);
} catch (IOException e) {
logger.error("failed sending data to [{}]: {}", connection.getURL(), ExceptionsHelper.detailedMessage(e));
throw e;
}
} }
} }

View File

@ -5,7 +5,6 @@
*/ */
package org.elasticsearch.marvel.agent.exporter.local; package org.elasticsearch.marvel.agent.exporter.local;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
@ -13,12 +12,13 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.marvel.agent.exporter.ExportBulk; import org.elasticsearch.marvel.agent.exporter.ExportBulk;
import org.elasticsearch.marvel.agent.exporter.ExportException;
import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; import org.elasticsearch.marvel.agent.exporter.MonitoringDoc;
import org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolver; import org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolver;
import org.elasticsearch.marvel.agent.resolver.ResolversRegistry; import org.elasticsearch.marvel.agent.resolver.ResolversRegistry;
import org.elasticsearch.marvel.support.init.proxy.MonitoringClientProxy; import org.elasticsearch.marvel.support.init.proxy.MonitoringClientProxy;
import java.io.IOException; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -43,7 +43,9 @@ public class LocalBulk extends ExportBulk {
} }
@Override @Override
public synchronized ExportBulk add(Collection<MonitoringDoc> docs) throws Exception { public synchronized ExportBulk add(Collection<MonitoringDoc> docs) throws ExportException {
ExportException exception = null;
for (MonitoringDoc doc : docs) { for (MonitoringDoc doc : docs) {
if (state.get() != State.ACTIVE) { if (state.get() != State.ACTIVE) {
return this; return this;
@ -54,7 +56,6 @@ public class LocalBulk extends ExportBulk {
try { try {
MonitoringIndexNameResolver<MonitoringDoc> resolver = resolvers.getResolver(doc); MonitoringIndexNameResolver<MonitoringDoc> resolver = resolvers.getResolver(doc);
if (resolver != null) {
IndexRequest request = new IndexRequest(resolver.index(doc), resolver.type(doc), resolver.id(doc)); IndexRequest request = new IndexRequest(resolver.index(doc), resolver.type(doc), resolver.id(doc));
request.source(resolver.source(doc, XContentType.SMILE)); request.source(resolver.source(doc, XContentType.SMILE));
requestBuilder.add(request); requestBuilder.add(request);
@ -63,33 +64,53 @@ public class LocalBulk extends ExportBulk {
logger.trace("local exporter [{}] - added index request [index={}, type={}, id={}]", logger.trace("local exporter [{}] - added index request [index={}, type={}, id={}]",
name, request.index(), request.type(), request.id()); name, request.index(), request.type(), request.id());
} }
} else {
logger.warn("local exporter [{}] - unable to render monitoring document of type [{}]: no renderer found in registry",
name, doc);
}
} catch (Exception e) { } catch (Exception e) {
logger.warn("local exporter [{}] - failed to add document [{}], skipping it", e, name, doc); if (exception == null) {
exception = new ExportException("failed to add documents to export bulk [{}]", name);
}
exception.addExportException(new ExportException("failed to add document [{}]", e, doc, name));
} }
} }
if (exception != null) {
throw exception;
}
return this; return this;
} }
@Override @Override
public void flush() throws IOException { public void flush() throws ExportException {
if (state.get() != State.ACTIVE || requestBuilder == null) { if (state.get() != State.ACTIVE || requestBuilder == null || requestBuilder.numberOfActions() == 0) {
return; return;
} }
try { try {
logger.trace("exporter [{}] - exporting {} documents", name, requestBuilder.numberOfActions()); logger.trace("exporter [{}] - exporting {} documents", name, requestBuilder.numberOfActions());
BulkResponse bulkResponse = requestBuilder.get(); BulkResponse bulkResponse = requestBuilder.get();
if (bulkResponse.hasFailures()) { if (bulkResponse.hasFailures()) {
throw new ElasticsearchException(buildFailureMessage(bulkResponse)); throwExportException(bulkResponse.getItems());
} }
} catch (Exception e) {
throw new ExportException("failed to flush export bulk [{}]", e, name);
} finally { } finally {
requestBuilder = null; requestBuilder = null;
} }
} }
void throwExportException(BulkItemResponse[] bulkItemResponses) {
ExportException exception = new ExportException("bulk [{}] reports failures when exporting documents", name);
Arrays.stream(bulkItemResponses)
.filter(BulkItemResponse::isFailed)
.map(item -> new ExportException(item.getFailure().getCause()))
.forEach(exception::addExportException);
if (exception.hasExportExceptions()) {
throw exception;
}
}
void terminate() { void terminate() {
state.set(State.TERMINATING); state.set(State.TERMINATING);
synchronized (this) { synchronized (this) {
@ -98,31 +119,6 @@ public class LocalBulk extends ExportBulk {
} }
} }
/**
* In case of something goes wrong and there's a lot of shards/indices,
* we limit the number of failures displayed in log.
*/
private String buildFailureMessage(BulkResponse bulkResponse) {
BulkItemResponse[] items = bulkResponse.getItems();
if (logger.isDebugEnabled() || (items.length < 100)) {
return bulkResponse.buildFailureMessage();
}
StringBuilder sb = new StringBuilder();
sb.append("failure in bulk execution, only the first 100 failures are printed:");
for (int i = 0; i < items.length && i < 100; i++) {
BulkItemResponse item = items[i];
if (item.isFailed()) {
sb.append("\n[").append(i)
.append("]: index [").append(item.getIndex()).append("], type [").append(item.getType())
.append("], id [").append(item.getId()).append("], message [").append(item.getFailureMessage())
.append("]");
}
}
return sb.toString();
}
enum State { enum State {
ACTIVE, ACTIVE,
TERMINATING, TERMINATING,

View File

@ -282,7 +282,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
.distinct() .distinct()
.toArray(String[]::new); .toArray(String[]::new);
MonitoringDoc monitoringDoc = new MonitoringDoc(MonitoredSystem.ES.getSystem(), Version.CURRENT.toString()); MonitoringDoc monitoringDoc = new MonitoringDoc(null, null);
monitoringDoc.setTimestamp(System.currentTimeMillis()); monitoringDoc.setTimestamp(System.currentTimeMillis());
// Get the names of the current monitoring indices // Get the names of the current monitoring indices

View File

@ -5,8 +5,10 @@
*/ */
package org.elasticsearch.marvel.agent.resolver; package org.elasticsearch.marvel.agent.resolver;
import org.elasticsearch.Version;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.MonitoredSystem; import org.elasticsearch.marvel.MonitoredSystem;
import org.elasticsearch.marvel.action.MonitoringBulkDoc;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterInfoMonitoringDoc; import org.elasticsearch.marvel.agent.collector.cluster.ClusterInfoMonitoringDoc;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMonitoringDoc; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMonitoringDoc;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateNodeMonitoringDoc; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateNodeMonitoringDoc;
@ -19,6 +21,7 @@ import org.elasticsearch.marvel.agent.collector.node.NodeStatsMonitoringDoc;
import org.elasticsearch.marvel.agent.collector.shards.ShardMonitoringDoc; import org.elasticsearch.marvel.agent.collector.shards.ShardMonitoringDoc;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils; import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; import org.elasticsearch.marvel.agent.exporter.MonitoringDoc;
import org.elasticsearch.marvel.agent.resolver.bulk.MonitoringBulkResolver;
import org.elasticsearch.marvel.agent.resolver.cluster.ClusterInfoResolver; import org.elasticsearch.marvel.agent.resolver.cluster.ClusterInfoResolver;
import org.elasticsearch.marvel.agent.resolver.cluster.ClusterStateNodeResolver; import org.elasticsearch.marvel.agent.resolver.cluster.ClusterStateNodeResolver;
import org.elasticsearch.marvel.agent.resolver.cluster.ClusterStateResolver; import org.elasticsearch.marvel.agent.resolver.cluster.ClusterStateResolver;
@ -46,8 +49,8 @@ public class ResolversRegistry implements Iterable<MonitoringIndexNameResolver>
// register built-in defaults resolvers // register built-in defaults resolvers
registerBuiltIn(ES, MarvelTemplateUtils.TEMPLATE_VERSION, settings); registerBuiltIn(ES, MarvelTemplateUtils.TEMPLATE_VERSION, settings);
// register resolvers for external applications, something like: // register resolvers for external applications
//registrations.add(resolveByIdVersion(MonitoringIds.KIBANA, "4.4.1", new KibanaDocResolver(KIBANA, 0, settings))); registerKibana(settings);
} }
/** /**
@ -66,6 +69,14 @@ public class ResolversRegistry implements Iterable<MonitoringIndexNameResolver>
registrations.add(resolveByClass(ShardMonitoringDoc.class, new ShardsResolver(id, version, settings))); registrations.add(resolveByClass(ShardMonitoringDoc.class, new ShardsResolver(id, version, settings)));
} }
/**
* Registers resolvers for Kibana
*/
private void registerKibana(Settings settings) {
final MonitoringBulkResolver kibana = new MonitoringBulkResolver(MonitoredSystem.KIBANA, 0, settings);
registrations.add(resolveByClassSystemVersion(MonitoringBulkDoc.class, MonitoredSystem.KIBANA, Version.CURRENT, kibana));
}
/** /**
* @return a Resolver that is able to resolver the given monitoring document * @return a Resolver that is able to resolver the given monitoring document
*/ */
@ -75,8 +86,7 @@ public class ResolversRegistry implements Iterable<MonitoringIndexNameResolver>
return registration.resolver(); return registration.resolver();
} }
} }
throw new IllegalArgumentException("No resolver found for monitoring document [class=" + document.getClass().getName() throw new IllegalArgumentException("No resolver found for monitoring document");
+ ", id=" + document.getMonitoringId() + ", version=" + document.getMonitoringVersion() + "]");
} }
@Override @Override
@ -88,6 +98,23 @@ public class ResolversRegistry implements Iterable<MonitoringIndexNameResolver>
return new Registration(resolver, type::isInstance); return new Registration(resolver, type::isInstance);
} }
static Registration resolveByClassSystemVersion(Class<? extends MonitoringDoc> type, MonitoredSystem system, Version version,
MonitoringIndexNameResolver resolver) {
return new Registration(resolver, doc -> {
try {
if (type.isInstance(doc) == false) {
return false;
}
if (system != MonitoredSystem.fromSystem(doc.getMonitoringId())) {
return false;
}
return version == Version.fromString(doc.getMonitoringVersion());
} catch (Exception e) {
return false;
}
});
}
static class Registration { static class Registration {
private final MonitoringIndexNameResolver resolver; private final MonitoringIndexNameResolver resolver;
@ -106,5 +133,4 @@ public class ResolversRegistry implements Iterable<MonitoringIndexNameResolver>
return resolver; return resolver;
} }
} }
} }

View File

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.resolver.bulk;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.marvel.MonitoredSystem;
import org.elasticsearch.marvel.action.MonitoringBulkDoc;
import org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolver;
import java.io.IOException;
public class MonitoringBulkResolver extends MonitoringIndexNameResolver.Timestamped<MonitoringBulkDoc> {
public MonitoringBulkResolver(MonitoredSystem id, int version, Settings settings) {
super(id, version, settings);
}
@Override
public String type(MonitoringBulkDoc document) {
return document.getType();
}
@Override
protected void buildXContent(MonitoringBulkDoc document, XContentBuilder builder, ToXContent.Params params) throws IOException {
BytesReference source = document.getSource();
if (source != null && source.length() > 0) {
builder.rawField(type(document), source);
}
}
}

View File

@ -5,8 +5,16 @@
*/ */
package org.elasticsearch.marvel.client; package org.elasticsearch.marvel.client;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.marvel.action.MonitoringBulkAction;
import org.elasticsearch.marvel.action.MonitoringBulkRequest;
import org.elasticsearch.marvel.action.MonitoringBulkRequestBuilder;
import org.elasticsearch.marvel.action.MonitoringBulkResponse;
import java.util.Map;
public class MonitoringClient { public class MonitoringClient {
@ -17,5 +25,36 @@ public class MonitoringClient {
this.client = client; this.client = client;
} }
// to be implemented: specific API for monitoring
/**
* Creates a request builder that bulk index monitoring documents.
*
* @return The request builder
*/
public MonitoringBulkRequestBuilder prepareMonitoringBulk() {
return new MonitoringBulkRequestBuilder(client);
}
/**
* Executes a bulk of index operations that concern monitoring documents.
*
* @param request The monitoring bulk request
* @param listener A listener to be notified with a result
*/
public void bulk(MonitoringBulkRequest request, ActionListener<MonitoringBulkResponse> listener) {
client.execute(MonitoringBulkAction.INSTANCE, request, listener);
}
/**
* Executes a bulk of index operations that concern monitoring documents.
*
* @param request The monitoring bulk request
*/
public ActionFuture<MonitoringBulkResponse> bulk(MonitoringBulkRequest request) {
return client.execute(MonitoringBulkAction.INSTANCE, request);
}
public MonitoringClient filterWithHeader(Map<String, String> headers) {
return new MonitoringClient(client.filterWithHeader(headers));
}
} }

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.rest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.client.MonitoringClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.xpack.XPackPlugin;
import java.util.Locale;
public abstract class MonitoringRestHandler extends BaseRestHandler {
protected static String URI_BASE = String.format(Locale.ROOT, "/_%s/monitoring", XPackPlugin.NAME);
public MonitoringRestHandler(Settings settings, Client client) {
super(settings, client);
}
@Override
protected final void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
handleRequest(request, channel, new MonitoringClient(client));
}
protected abstract void handleRequest(RestRequest request, RestChannel channel, MonitoringClient client) throws Exception;
}

View File

@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.rest.action;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.marvel.action.MonitoringBulkRequestBuilder;
import org.elasticsearch.marvel.action.MonitoringBulkResponse;
import org.elasticsearch.marvel.client.MonitoringClient;
import org.elasticsearch.marvel.rest.MonitoringRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestBuilderListener;
public class RestMonitoringBulkAction extends MonitoringRestHandler {
public static final String MONITORING_ID = "system_id";
public static final String MONITORING_VERSION = "system_version";
@Inject
public RestMonitoringBulkAction(Settings settings, RestController controller, Client client) {
super(settings, client);
controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/_bulk", this);
controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/_bulk", this);
controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/{index}/_bulk", this);
controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/{index}/_bulk", this);
controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/{index}/{type}/_bulk", this);
controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/{index}/{type}/_bulk", this);
}
@Override
protected void handleRequest(RestRequest request, RestChannel channel, MonitoringClient client) throws Exception {
String defaultIndex = request.param("index");
String defaultType = request.param("type");
String id = request.param(MONITORING_ID);
if (Strings.hasLength(id) == false) {
throw new IllegalArgumentException("no monitoring id for monitoring bulk request");
}
String version = request.param(MONITORING_VERSION);
if (Strings.hasLength(version) == false) {
throw new IllegalArgumentException("no monitoring version for monitoring bulk request");
}
if (!RestActions.hasBodyContent(request)) {
throw new ElasticsearchParseException("no body content for monitoring bulk request");
}
MonitoringBulkRequestBuilder requestBuilder = client.prepareMonitoringBulk();
requestBuilder.add(request.content(), id, version, defaultIndex, defaultType);
requestBuilder.execute(new RestBuilderListener<MonitoringBulkResponse>(channel) {
@Override
public RestResponse buildResponse(MonitoringBulkResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
builder.field(Fields.TOOK, response.getTookInMillis());
MonitoringBulkResponse.Error error = response.getError();
builder.field(Fields.ERRORS, error != null);
if (error != null) {
builder.field(Fields.ERROR, response.getError());
}
builder.endObject();
return new BytesRestResponse(response.status(), builder);
}
});
}
static final class Fields {
static final XContentBuilderString TOOK = new XContentBuilderString("took");
static final XContentBuilderString ERRORS = new XContentBuilderString("errors");
static final XContentBuilderString ERROR = new XContentBuilderString("error");
}
}

View File

@ -6,16 +6,11 @@
package org.elasticsearch.marvel.support.init.proxy; package org.elasticsearch.marvel.support.init.proxy;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.shield.InternalClient; import org.elasticsearch.shield.InternalClient;
import org.elasticsearch.xpack.common.init.proxy.ClientProxy; import org.elasticsearch.xpack.common.init.proxy.ClientProxy;
public class MonitoringClientProxy extends ClientProxy { public class MonitoringClientProxy extends ClientProxy {
@Inject
public MonitoringClientProxy() {
}
/** /**
* Creates a proxy to the given internal client (can be used for testing) * Creates a proxy to the given internal client (can be used for testing)
*/ */

View File

@ -1,7 +1,6 @@
{ {
"template": ".monitoring-data-${monitoring.template.version}", "template": ".monitoring-data-${monitoring.template.version}",
"settings": { "settings": {
"index.xpack.version": "${project.version}",
"index.number_of_shards": 1, "index.number_of_shards": 1,
"index.number_of_replicas": 1, "index.number_of_replicas": 1,
"index.codec": "best_compression", "index.codec": "best_compression",
@ -9,7 +8,10 @@
}, },
"mappings": { "mappings": {
"cluster_info": { "cluster_info": {
"enabled": false "enabled": false,
"_meta": {
"xpack.version": "${project.version}"
}
}, },
"node": { "node": {
"enabled": false "enabled": false

View File

@ -1,7 +1,6 @@
{ {
"template": ".monitoring-es-${monitoring.template.version}-*", "template": ".monitoring-es-${monitoring.template.version}-*",
"settings": { "settings": {
"index.xpack.version": "${project.version}",
"index.number_of_shards": 1, "index.number_of_shards": 1,
"index.number_of_replicas": 1, "index.number_of_replicas": 1,
"index.codec": "best_compression", "index.codec": "best_compression",

View File

@ -22,7 +22,8 @@ public class MarvelPluginClientTests extends ESTestCase {
.build(); .build();
Marvel plugin = new Marvel(settings); Marvel plugin = new Marvel(settings);
assertThat(plugin.isEnabled(), is(false)); assertThat(plugin.isEnabled(), is(true));
assertThat(plugin.isTransportClient(), is(true));
Collection<Module> modules = plugin.nodeModules(); Collection<Module> modules = plugin.nodeModules();
assertThat(modules.size(), is(0)); assertThat(modules.size(), is(0));
} }
@ -34,6 +35,7 @@ public class MarvelPluginClientTests extends ESTestCase {
.build(); .build();
Marvel plugin = new Marvel(settings); Marvel plugin = new Marvel(settings);
assertThat(plugin.isEnabled(), is(true)); assertThat(plugin.isEnabled(), is(true));
assertThat(plugin.isTransportClient(), is(false));
Collection<Module> modules = plugin.nodeModules(); Collection<Module> modules = plugin.nodeModules();
assertThat(modules.size(), is(5)); assertThat(modules.size(), is(5));
} }

View File

@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.action;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.marvel.agent.exporter.MonitoringDoc;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.Matchers.equalTo;
public class MonitoringBulkDocTests extends ESTestCase {
public void testSerialization() throws IOException {
int iterations = randomIntBetween(5, 50);
for (int i = 0; i < iterations; i++) {
MonitoringBulkDoc doc = newRandomMonitoringBulkDoc();
boolean hasSourceNode = randomBoolean();
if (hasSourceNode) {
doc.setSourceNode(newRandomSourceNode());
}
BytesStreamOutput output = new BytesStreamOutput();
Version outputVersion = randomVersion(random());
output.setVersion(outputVersion);
doc.writeTo(output);
StreamInput streamInput = StreamInput.wrap(output.bytes());
streamInput.setVersion(randomVersion(random()));
MonitoringBulkDoc doc2 = new MonitoringBulkDoc(streamInput);
assertThat(doc2.getMonitoringId(), equalTo(doc.getMonitoringId()));
assertThat(doc2.getMonitoringVersion(), equalTo(doc.getMonitoringVersion()));
assertThat(doc2.getClusterUUID(), equalTo(doc.getClusterUUID()));
assertThat(doc2.getTimestamp(), equalTo(doc.getTimestamp()));
assertThat(doc2.getSourceNode(), equalTo(doc.getSourceNode()));
assertThat(doc2.getIndex(), equalTo(doc.getIndex()));
assertThat(doc2.getType(), equalTo(doc.getType()));
assertThat(doc2.getId(), equalTo(doc.getId()));
if (doc.getSource() == null) {
assertThat(doc2.getSource(), equalTo(BytesArray.EMPTY));
} else {
assertThat(doc2.getSource(), equalTo(doc.getSource()));
}
}
}
private MonitoringBulkDoc newRandomMonitoringBulkDoc() {
MonitoringBulkDoc doc = new MonitoringBulkDoc(randomAsciiOfLength(2), randomAsciiOfLength(2));
if (frequently()) {
doc.setClusterUUID(randomAsciiOfLength(5));
doc.setType(randomAsciiOfLength(5));
}
if (randomBoolean()) {
doc.setTimestamp(System.currentTimeMillis());
doc.setSource(new BytesArray("{\"key\" : \"value\"}"));
}
if (rarely()) {
doc.setIndex(randomAsciiOfLength(5));
doc.setId(randomAsciiOfLength(2));
}
return doc;
}
private MonitoringDoc.Node newRandomSourceNode() {
String uuid = null;
String name = null;
String ip = null;
String transportAddress = null;
String host = null;
ImmutableOpenMap<String, String> attributes = null;
if (frequently()) {
uuid = randomAsciiOfLength(5);
name = randomAsciiOfLength(5);
}
if (randomBoolean()) {
ip = randomAsciiOfLength(5);
transportAddress = randomAsciiOfLength(5);
host = randomAsciiOfLength(3);
}
if (rarely()) {
int nbAttributes = randomIntBetween(0, 5);
ImmutableOpenMap.Builder<String, String> builder = ImmutableOpenMap.builder();
for (int i = 0; i < nbAttributes; i++) {
builder.put("key#" + i, String.valueOf(i));
}
attributes = builder.build();
}
return new MonitoringDoc.Node(uuid, host, transportAddress, ip, name, attributes);
}
}

View File

@ -0,0 +1,193 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import java.io.IOException;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.nullValue;
public class MonitoringBulkRequestTests extends ESTestCase {
private static final BytesArray SOURCE = new BytesArray("{\"key\" : \"value\"}");
public void testValidateRequestNoDocs() {
assertValidationErrors(new MonitoringBulkRequest(), hasItems("no monitoring documents added"));
}
public void testValidateRequestSingleDoc() {
MonitoringBulkDoc doc = new MonitoringBulkDoc(null, null);
assertValidationErrors(new MonitoringBulkRequest().add(doc), hasItems("monitored system id is missing for monitoring document [0]",
"monitored system version is missing for monitoring document [0]",
"type is missing for monitoring document [0]",
"source is missing for monitoring document [0]"));
doc = new MonitoringBulkDoc("id", null);
assertValidationErrors(new MonitoringBulkRequest().add(doc),
hasItems("monitored system version is missing for monitoring document [0]",
"type is missing for monitoring document [0]",
"source is missing for monitoring document [0]"));
doc = new MonitoringBulkDoc("id", "version");
assertValidationErrors(new MonitoringBulkRequest().add(doc), hasItems("type is missing for monitoring document [0]",
"source is missing for monitoring document [0]"));
doc.setType("type");
assertValidationErrors(new MonitoringBulkRequest().add(doc), hasItems("source is missing for monitoring document [0]"));
doc.setSource(SOURCE);
assertValidationErrors(new MonitoringBulkRequest().add(doc), nullValue());
}
public void testValidateRequestMultiDocs() {
MonitoringBulkRequest request = new MonitoringBulkRequest();
// Doc0 is complete
MonitoringBulkDoc doc0 = new MonitoringBulkDoc(randomAsciiOfLength(2), randomAsciiOfLength(2));
doc0.setType(randomAsciiOfLength(5));
doc0.setSource(SOURCE);
request.add(doc0);
// Doc1 has no type
MonitoringBulkDoc doc1 = new MonitoringBulkDoc(randomAsciiOfLength(2), randomAsciiOfLength(2));
doc1.setSource(SOURCE);
request.add(doc1);
// Doc2 has no source
MonitoringBulkDoc doc2 = new MonitoringBulkDoc(randomAsciiOfLength(2), randomAsciiOfLength(2));
doc2.setType(randomAsciiOfLength(5));
doc2.setSource(BytesArray.EMPTY);
request.add(doc2);
// Doc3 has no version
MonitoringBulkDoc doc3 = new MonitoringBulkDoc(randomAsciiOfLength(2), null);
doc3.setType(randomAsciiOfLength(5));
doc3.setSource(SOURCE);
request.add(doc3);
// Doc4 has no id
MonitoringBulkDoc doc4 = new MonitoringBulkDoc(null, randomAsciiOfLength(2));
doc4.setType(randomAsciiOfLength(5));
doc4.setSource(SOURCE);
request.add(doc4);
assertValidationErrors(request, hasItems("type is missing for monitoring document [1]",
"source is missing for monitoring document [2]",
"monitored system version is missing for monitoring document [3]",
"monitored system id is missing for monitoring document [4]"));
}
public void testAddSingleDoc() {
MonitoringBulkRequest request = new MonitoringBulkRequest();
final int nbDocs = randomIntBetween(1, 20);
for (int i = 0; i < nbDocs; i++) {
request.add(new MonitoringBulkDoc(String.valueOf(i), String.valueOf(i)));
}
assertThat(request.getDocs(), hasSize(nbDocs));
}
public void testAddMultipleDocs() throws Exception {
final int nbDocs = randomIntBetween(3, 20);
final XContentType xContentType = XContentType.JSON;
try (BytesStreamOutput content = new BytesStreamOutput()) {
try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType, content)) {
for (int i = 0; i < nbDocs; i++) {
builder.startObject().startObject("index").endObject().endObject().flush();
content.write(xContentType.xContent().streamSeparator());
builder.startObject().field("foo").value(i).endObject().flush();
content.write(xContentType.xContent().streamSeparator());
}
}
String defaultMonitoringId = randomBoolean() ? randomAsciiOfLength(2) : null;
String defaultMonitoringVersion = randomBoolean() ? randomAsciiOfLength(3) : null;
String defaultIndex = randomBoolean() ? randomAsciiOfLength(5) : null;
String defaultType = randomBoolean() ? randomAsciiOfLength(4) : null;
MonitoringBulkRequest request = new MonitoringBulkRequest();
request.add(content.bytes(), defaultMonitoringId, defaultMonitoringVersion, defaultIndex, defaultType);
assertThat(request.getDocs(), hasSize(nbDocs));
for (MonitoringBulkDoc doc : request.getDocs()) {
assertThat(doc.getMonitoringId(), equalTo(defaultMonitoringId));
assertThat(doc.getMonitoringVersion(), equalTo(defaultMonitoringVersion));
assertThat(doc.getIndex(), equalTo(defaultIndex));
assertThat(doc.getType(), equalTo(defaultType));
}
}
}
public void testSerialization() throws IOException {
MonitoringBulkRequest request = new MonitoringBulkRequest();
int numDocs = iterations(10, 30);
for (int i = 0; i < numDocs; i++) {
MonitoringBulkDoc doc = new MonitoringBulkDoc(randomAsciiOfLength(2), randomVersion(random()).toString());
doc.setType(randomFrom("type1", "type2", "type3"));
doc.setSource(SOURCE);
if (randomBoolean()) {
doc.setIndex("index");
}
if (randomBoolean()) {
doc.setId(randomAsciiOfLength(3));
}
if (rarely()) {
doc.setClusterUUID(randomAsciiOfLength(5));
}
request.add(doc);
}
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(randomVersion(random()));
request.writeTo(out);
StreamInput in = StreamInput.wrap(out.bytes());
in.setVersion(out.getVersion());
MonitoringBulkRequest request2 = new MonitoringBulkRequest();
request2.readFrom(in);
assertThat(request2.docs.size(), CoreMatchers.equalTo(request.docs.size()));
for (int i = 0; i < request2.docs.size(); i++) {
MonitoringBulkDoc doc = request.docs.get(i);
MonitoringBulkDoc doc2 = request2.docs.get(i);
assertThat(doc2.getMonitoringId(), equalTo(doc.getMonitoringId()));
assertThat(doc2.getMonitoringVersion(), equalTo(doc.getMonitoringVersion()));
assertThat(doc2.getClusterUUID(), equalTo(doc.getClusterUUID()));
assertThat(doc2.getIndex(), equalTo(doc.getIndex()));
assertThat(doc2.getType(), equalTo(doc.getType()));
assertThat(doc2.getId(), equalTo(doc.getId()));
assertThat(doc2.getSource(), equalTo(doc.getSource()));
}
}
@SuppressWarnings("unchecked")
private static <T> void assertValidationErrors(MonitoringBulkRequest request, Matcher<? super T> matcher) {
ActionRequestValidationException validation = request.validate();
if (validation != null) {
assertThat((T) validation.validationErrors(), matcher);
} else {
assertThat(null, matcher);
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.action;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.marvel.agent.exporter.ExportException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class MonitoringBulkResponseTests extends ESTestCase {
public void testResponseStatus() {
final long took = Math.abs(randomLong());
MonitoringBulkResponse response = new MonitoringBulkResponse(took);
assertThat(response.getTookInMillis(), equalTo(took));
assertThat(response.getError(), is(nullValue()));
assertThat(response.status(), equalTo(RestStatus.OK));
ExportException exception = new ExportException(randomAsciiOfLength(10));
response = new MonitoringBulkResponse(took, new MonitoringBulkResponse.Error(exception));
assertThat(response.getTookInMillis(), equalTo(took));
assertThat(response.getError(), is(notNullValue()));
assertThat(response.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
}
public void testSerialization() throws IOException {
int iterations = randomIntBetween(5, 50);
for (int i = 0; i < iterations; i++) {
MonitoringBulkResponse response;
if (randomBoolean()) {
response = new MonitoringBulkResponse(Math.abs(randomLong()));
} else {
Exception exception = randomFrom(
new ExportException(randomAsciiOfLength(5), new IllegalStateException(randomAsciiOfLength(5))),
new IllegalStateException(randomAsciiOfLength(5)),
new IllegalArgumentException(randomAsciiOfLength(5)));
response = new MonitoringBulkResponse(Math.abs(randomLong()), new MonitoringBulkResponse.Error(exception));
}
BytesStreamOutput output = new BytesStreamOutput();
Version outputVersion = randomVersion(random());
output.setVersion(outputVersion);
response.writeTo(output);
StreamInput streamInput = StreamInput.wrap(output.bytes());
streamInput.setVersion(randomVersion(random()));
MonitoringBulkResponse response2 = new MonitoringBulkResponse();
response2.readFrom(streamInput);
assertThat(response2.getTookInMillis(), equalTo(response.getTookInMillis()));
if (response.getError() == null) {
assertThat(response2.getError(), is(nullValue()));
} else {
assertThat(response2.getError(), is(notNullValue()));
}
}
}
}

View File

@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.marvel.MonitoredSystem;
import org.elasticsearch.marvel.agent.resolver.bulk.MonitoringBulkResolver;
import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.search.SearchHit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class MonitoringBulkTests extends MarvelIntegTestCase {
@Override
protected Settings transportClientSettings() {
return super.transportClientSettings();
}
public void testMonitoringBulkIndexing() throws Exception {
MonitoringBulkRequestBuilder requestBuilder = monitoringClient().prepareMonitoringBulk();
String[] types = {"type1", "type2", "type3"};
int numDocs = scaledRandomIntBetween(100, 5000);
for (int i = 0; i < numDocs; i++) {
MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString());
doc.setType(randomFrom(types));
doc.setSource(jsonBuilder().startObject().field("num", numDocs).endObject().bytes());
requestBuilder.add(doc);
}
MonitoringBulkResponse response = requestBuilder.get();
assertThat(response.getError(), is(nullValue()));
refresh();
SearchResponse searchResponse = client().prepareSearch().setTypes(types).setSize(numDocs).get();
assertHitCount(searchResponse, numDocs);
for (SearchHit searchHit : searchResponse.getHits()) {
Map<String, Object> source = searchHit.sourceAsMap();
assertNotNull(source.get(MonitoringBulkResolver.Fields.CLUSTER_UUID.underscore().toString()));
assertNotNull(source.get(MonitoringBulkResolver.Fields.TIMESTAMP.underscore().toString()));
assertNotNull(source.get(MonitoringBulkResolver.Fields.SOURCE_NODE.underscore().toString()));
}
}
/**
* This test creates N threads that execute a random number of monitoring bulk requests.
*/
public void testConcurrentRequests() throws Exception {
final Thread[] threads = new Thread[3 + randomInt(7)];
final List<Throwable> exceptions = new CopyOnWriteArrayList<>();
AtomicInteger total = new AtomicInteger(0);
logger.info("--> using {} concurrent clients to execute requests", threads.length);
for (int i = 0; i < threads.length; i++) {
final int nbRequests = randomIntBetween(3, 10);
threads[i] = new Thread(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.error("unexpected error in exporting thread", t);
exceptions.add(t);
}
@Override
protected void doRun() throws Exception {
for (int j = 0; j < nbRequests; j++) {
MonitoringBulkRequestBuilder requestBuilder = monitoringClient().prepareMonitoringBulk();
int numDocs = scaledRandomIntBetween(10, 1000);
for (int k = 0; k < numDocs; k++) {
MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString());
doc.setType("concurrent");
doc.setSource(jsonBuilder().startObject().field("num", k).endObject().bytes());
requestBuilder.add(doc);
}
total.addAndGet(numDocs);
MonitoringBulkResponse response = requestBuilder.get();
assertThat(response.getError(), is(nullValue()));
}
}
}, "export_thread_" + i);
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
assertThat(exceptions, empty());
refresh();
SearchResponse countResponse = client().prepareSearch().setTypes("concurrent").setSize(0).get();
assertHitCount(countResponse, total.get());
}
public void testUnsupportedSystem() throws Exception {
MonitoringBulkRequestBuilder requestBuilder = monitoringClient().prepareMonitoringBulk();
String[] types = {"type1", "type2", "type3"};
int totalDocs = randomIntBetween(10, 1000);
int unsupportedDocs = 0;
for (int i = 0; i < totalDocs; i++) {
MonitoringBulkDoc doc;
if (randomBoolean()) {
doc = new MonitoringBulkDoc("unknown", Version.CURRENT.toString());
unsupportedDocs++;
} else {
doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString());
}
doc.setType(randomFrom(types));
doc.setSource(jsonBuilder().startObject().field("num", i).endObject().bytes());
requestBuilder.add(doc);
}
MonitoringBulkResponse response = requestBuilder.get();
if (unsupportedDocs == 0) {
assertThat(response.getError(), is(nullValue()));
} else {
assertThat(response.getError(), is(notNullValue()));
}
refresh();
SearchResponse countResponse = client().prepareSearch().setTypes(types).setSize(0).get();
assertHitCount(countResponse, totalDocs - unsupportedDocs);
}
}

View File

@ -0,0 +1,293 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.marvel.MarvelSettings;
import org.elasticsearch.marvel.MonitoredSystem;
import org.elasticsearch.marvel.agent.exporter.ExportException;
import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.exporter.MonitoringDoc;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsEqual.equalTo;
public class TransportMonitoringBulkActionTests extends ESTestCase {
private static ThreadPool threadPool;
@Rule
public ExpectedException expectedException = ExpectedException.none();
private ClusterService clusterService;
private TransportService transportService;
private CapturingExporters exportService;
private TransportMonitoringBulkAction action;
@BeforeClass
public static void beforeClass() {
threadPool = new ThreadPool(TransportMonitoringBulkActionTests.class.getSimpleName());
}
@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}
@Before
public void setUp() throws Exception {
super.setUp();
CapturingTransport transport = new CapturingTransport();
clusterService = new ClusterService(Settings.EMPTY, null, new ClusterSettings(Settings.EMPTY,
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool,
new ClusterName(TransportMonitoringBulkActionTests.class.getName()));
clusterService.setLocalNode(new DiscoveryNode("node", DummyTransportAddress.INSTANCE, Version.CURRENT));
clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
public void connectToAddedNodes(ClusterChangedEvent event) {
// skip
}
@Override
public void disconnectFromRemovedNodes(ClusterChangedEvent event) {
// skip
}
});
clusterService.setClusterStatePublisher((event, ackListener) -> {});
clusterService.start();
transportService = new TransportService(transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
exportService = new CapturingExporters();
action = new TransportMonitoringBulkAction(
Settings.EMPTY,
threadPool,
clusterService,
transportService,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver(Settings.EMPTY),
exportService
);
}
@After
public void tearDown() throws Exception {
super.tearDown();
clusterService.close();
transportService.close();
}
public void testGlobalBlock() throws Exception {
expectedException.expect(ExecutionException.class);
expectedException.expect(hasToString(containsString("ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/2/no master]")));
final ClusterBlocks.Builder block = ClusterBlocks.builder().addGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ALL);
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("add blocks to cluster state", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// make sure we increment versions as listener may depend on it for change
return ClusterState.builder(currentState).blocks(block).version(currentState.version() + 1).build();
}
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail("unexpected exception: " + t);
}
});
try {
latch.await();
} catch (InterruptedException e) {
throw new ElasticsearchException("unexpected interruption", e);
}
MonitoringBulkRequest request = randomRequest();
action.execute(request).get();
}
public void testEmptyRequest() throws Exception {
expectedException.expect(ExecutionException.class);
expectedException.expect(hasToString(containsString("no monitoring documents added")));
MonitoringBulkRequest request = randomRequest(0);
action.execute(request).get();
assertThat(exportService.getExported(), hasSize(0));
}
public void testBasicRequest() throws Exception {
MonitoringBulkRequest request = randomRequest();
action.execute(request).get();
assertThat(exportService.getExported(), hasSize(request.getDocs().size()));
}
public void testAsyncActionPrepareDocs() throws Exception {
final PlainActionFuture<MonitoringBulkResponse> listener = new PlainActionFuture<>();
final MonitoringBulkRequest request = randomRequest();
Collection<MonitoringDoc> results = action.new AsyncAction(request, listener, exportService, clusterService)
.prepareForExport(request.getDocs());
assertThat(results, hasSize(request.getDocs().size()));
for (MonitoringDoc exported : results) {
assertThat(exported.getClusterUUID(), equalTo(clusterService.state().metaData().clusterUUID()));
assertThat(exported.getTimestamp(), greaterThan(0L));
assertThat(exported.getSourceNode(), notNullValue());
assertThat(exported.getSourceNode().getUUID(), equalTo(clusterService.localNode().getId()));
assertThat(exported.getSourceNode().getName(), equalTo(clusterService.localNode().getName()));
}
}
public void testAsyncActionExecuteExport() throws Exception {
final PlainActionFuture<MonitoringBulkResponse> listener = new PlainActionFuture<>();
final MonitoringBulkRequest request = randomRequest();
final Collection<MonitoringDoc> docs = Collections.unmodifiableCollection(request.getDocs());
action.new AsyncAction(request, listener, exportService, clusterService).executeExport(docs, 0L, listener);
assertThat(listener.get().getError(), nullValue());
Collection<MonitoringDoc> exported = exportService.getExported();
assertThat(exported, hasSize(request.getDocs().size()));
}
public void testAsyncActionExportThrowsException() throws Exception {
final PlainActionFuture<MonitoringBulkResponse> listener = new PlainActionFuture<>();
final MonitoringBulkRequest request = randomRequest();
final Exporters exporters = new ConsumingExporters(docs -> {
throw new IllegalStateException();
});
action.new AsyncAction(request, listener, exporters, clusterService).start();
assertThat(listener.get().getError(), notNullValue());
assertThat(listener.get().getError().getCause(), instanceOf(IllegalStateException.class));
}
/**
* @return a new MonitoringBulkRequest instance with random number of documents
*/
private static MonitoringBulkRequest randomRequest() throws IOException {
return randomRequest(scaledRandomIntBetween(1, 100));
}
/**
* @return a new MonitoringBulkRequest instance with given number of documents
*/
private static MonitoringBulkRequest randomRequest(final int numDocs) throws IOException {
MonitoringBulkRequest request = new MonitoringBulkRequest();
for (int i = 0; i < numDocs; i++) {
MonitoringBulkDoc doc = new MonitoringBulkDoc(randomFrom(MonitoredSystem.values()).getSystem(),
randomVersion(random()).toString());
doc.setType(randomFrom("type1", "type2"));
doc.setSource(jsonBuilder().startObject().field("num", i).endObject().bytes());
request.add(doc);
}
return request;
}
/**
* A Exporters implementation that captures the documents to export
*/
class CapturingExporters extends Exporters {
private final Collection<MonitoringDoc> exported = ConcurrentCollections.newConcurrentSet();
public CapturingExporters() {
super(Settings.EMPTY, Collections.emptyMap(), clusterService,
new ClusterSettings(Settings.EMPTY, Collections.singleton(MarvelSettings.EXPORTERS_SETTINGS)));
}
@Override
public synchronized void export(Collection<MonitoringDoc> docs) throws ExportException {
exported.addAll(docs);
}
public Collection<MonitoringDoc> getExported() {
return exported;
}
}
/**
* A Exporters implementation that applies a Consumer when exporting documents
*/
class ConsumingExporters extends Exporters {
private final Consumer<Collection<? extends MonitoringDoc>> consumer;
public ConsumingExporters(Consumer<Collection<? extends MonitoringDoc>> consumer) {
super(Settings.EMPTY, Collections.emptyMap(), clusterService,
new ClusterSettings(Settings.EMPTY, Collections.singleton(MarvelSettings.EXPORTERS_SETTINGS)));
this.consumer = consumer;
}
@Override
public synchronized void export(Collection<MonitoringDoc> docs) throws ExportException {
consumer.accept(docs);
}
}
public static void setState(ClusterService clusterService, ClusterState clusterState) {
}
}

View File

@ -327,7 +327,6 @@ public class ExportersTests extends ESTestCase {
} }
} }
static class TestFactory extends Exporter.Factory<TestFactory.TestExporter> { static class TestFactory extends Exporter.Factory<TestFactory.TestExporter> {
public TestFactory(String type, boolean singleton) { public TestFactory(String type, boolean singleton) {
super(type, singleton); super(type, singleton);
@ -424,13 +423,13 @@ public class ExportersTests extends ESTestCase {
} }
@Override @Override
public ExportBulk add(Collection<MonitoringDoc> docs) throws Exception { public ExportBulk add(Collection<MonitoringDoc> docs) throws ExportException {
count.addAndGet(docs.size()); count.addAndGet(docs.size());
return this; return this;
} }
@Override @Override
public void flush() throws Exception { public void flush() throws ExportException {
} }
AtomicInteger getCount() { AtomicInteger getCount() {

View File

@ -43,7 +43,7 @@ public class MonitoringDocTests extends ESTestCase {
StreamInput streamInput = StreamInput.wrap(output.bytes()); StreamInput streamInput = StreamInput.wrap(output.bytes());
streamInput.setVersion(randomVersion(random())); streamInput.setVersion(randomVersion(random()));
MonitoringDoc monitoringDoc2 = MonitoringDoc.readMonitoringDoc(streamInput); MonitoringDoc monitoringDoc2 = new MonitoringDoc(streamInput);
assertThat(monitoringDoc2.getMonitoringId(), equalTo(monitoringDoc.getMonitoringId())); assertThat(monitoringDoc2.getMonitoringId(), equalTo(monitoringDoc.getMonitoringId()));
assertThat(monitoringDoc2.getMonitoringVersion(), equalTo(monitoringDoc.getMonitoringVersion())); assertThat(monitoringDoc2.getMonitoringVersion(), equalTo(monitoringDoc.getMonitoringVersion()));
@ -64,7 +64,7 @@ public class MonitoringDocTests extends ESTestCase {
public void testSetSourceNode() { public void testSetSourceNode() {
int iterations = randomIntBetween(5, 50); int iterations = randomIntBetween(5, 50);
for (int i = 0; i < iterations; i++) { for (int i = 0; i < iterations; i++) {
MonitoringDoc monitoringDoc = new MonitoringDoc(); MonitoringDoc monitoringDoc = new MonitoringDoc(null, null);
if (randomBoolean()) { if (randomBoolean()) {
DiscoveryNode discoveryNode = newRandomDiscoveryNode(); DiscoveryNode discoveryNode = newRandomDiscoveryNode();

View File

@ -18,6 +18,7 @@ import org.elasticsearch.marvel.MarvelSettings;
import org.elasticsearch.marvel.MonitoredSystem; import org.elasticsearch.marvel.MonitoredSystem;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMonitoringDoc; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMonitoringDoc;
import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMonitoringDoc; import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMonitoringDoc;
import org.elasticsearch.marvel.agent.exporter.ExportException;
import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.Exporters; import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils; import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
@ -39,7 +40,6 @@ import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.dataTemplateName; import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.dataTemplateName;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.indexTemplateName; import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.indexTemplateName;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
@ -167,7 +167,14 @@ public class LocalExporterTests extends MarvelIntegTestCase {
logger.debug("--> exporting a second monitoring doc"); logger.debug("--> exporting a second monitoring doc");
exporter.export(Collections.singletonList(newRandomMarvelDoc())); exporter.export(Collections.singletonList(newRandomMarvelDoc()));
} catch (ElasticsearchException e) { } catch (ElasticsearchException e) {
assertThat(e.getMessage(), allOf(containsString("failure in bulk execution"), containsString("IndexClosedException[closed]"))); assertThat(e.getMessage(), containsString("failed to flush export bulk [_local]"));
assertThat(e.getCause(), instanceOf(ExportException.class));
ExportException cause = (ExportException) e.getCause();
assertTrue(cause.hasExportExceptions());
for (ExportException c : cause) {
assertThat(c.getMessage(), containsString("IndexClosedException[closed]"));
}
assertNull(exporter.getBulk().requestBuilder); assertNull(exporter.getBulk().requestBuilder);
} }
} }

View File

@ -41,7 +41,7 @@ import static org.hamcrest.Matchers.startsWith;
public abstract class MonitoringIndexNameResolverTestCase<M extends MonitoringDoc, R extends MonitoringIndexNameResolver<M>> public abstract class MonitoringIndexNameResolverTestCase<M extends MonitoringDoc, R extends MonitoringIndexNameResolver<M>>
extends ESTestCase { extends ESTestCase {
private final ResolversRegistry resolversRegistry = new ResolversRegistry(Settings.EMPTY); protected final ResolversRegistry resolversRegistry = new ResolversRegistry(Settings.EMPTY);
/** /**
* @return the {@link MonitoringIndexNameResolver} to test * @return the {@link MonitoringIndexNameResolver} to test

View File

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.resolver.bulk;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.marvel.MonitoredSystem;
import org.elasticsearch.marvel.action.MonitoringBulkDoc;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolverTestCase;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class MonitoringBulkResolverTests extends MonitoringIndexNameResolverTestCase<MonitoringBulkDoc, MonitoringBulkResolver> {
@Override
protected MonitoringBulkDoc newMarvelDoc() {
MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString());
doc.setClusterUUID(randomAsciiOfLength(5));
doc.setTimestamp(Math.abs(randomLong()));
doc.setSourceNode(new DiscoveryNode("id", DummyTransportAddress.INSTANCE, Version.CURRENT));
doc.setType("kibana_stats");
doc.setSource(new BytesArray("{\"field1\" : \"value1\"}"));
return doc;
}
@Override
protected boolean checkResolvedId() {
return false;
}
@Override
protected boolean checkFilters() {
return false;
}
public void testMonitoringBulkResolver() throws Exception {
MonitoringBulkDoc doc = newMarvelDoc();
doc.setTimestamp(1437580442979L);
if (randomBoolean()) {
doc.setIndex(randomAsciiOfLength(5));
}
if (randomBoolean()) {
doc.setId(randomAsciiOfLength(35));
}
if (randomBoolean()) {
doc.setClusterUUID(randomAsciiOfLength(5));
}
MonitoringBulkResolver resolver = newResolver();
assertThat(resolver.index(doc), equalTo(".monitoring-kibana-0-2015.07.22"));
assertThat(resolver.type(doc), equalTo(doc.getType()));
assertThat(resolver.id(doc), nullValue());
assertSource(resolver.source(doc, XContentType.JSON),
"cluster_uuid",
"timestamp",
"source_node",
"kibana_stats",
"kibana_stats.field1");
}
}

View File

@ -444,7 +444,7 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
" cluster: [ 'cluster:monitor/nodes/info', 'cluster:monitor/state', 'cluster:monitor/health', 'cluster:monitor/stats'," + " cluster: [ 'cluster:monitor/nodes/info', 'cluster:monitor/state', 'cluster:monitor/health', 'cluster:monitor/stats'," +
" 'cluster:admin/settings/update', 'cluster:admin/repository/delete', 'cluster:monitor/nodes/liveness'," + " 'cluster:admin/settings/update', 'cluster:admin/repository/delete', 'cluster:monitor/nodes/liveness'," +
" 'indices:admin/template/get', 'indices:admin/template/put', 'indices:admin/template/delete'," + " 'indices:admin/template/get', 'indices:admin/template/put', 'indices:admin/template/delete'," +
" 'cluster:monitor/task']\n" + " 'cluster:monitor/task', 'cluster:admin/xpack/monitoring/bulk' ]\n" +
" indices:\n" + " indices:\n" +
" - names: '*'\n" + " - names: '*'\n" +
" privileges: [ all ]\n" + " privileges: [ all ]\n" +

View File

@ -163,6 +163,7 @@ public class XPackPlugin extends Plugin {
public void onModule(NetworkModule module) { public void onModule(NetworkModule module) {
licensing.onModule(module); licensing.onModule(module);
marvel.onModule(module);
shield.onModule(module); shield.onModule(module);
watcher.onModule(module); watcher.onModule(module);
graph.onModule(module); graph.onModule(module);
@ -170,6 +171,7 @@ public class XPackPlugin extends Plugin {
public void onModule(ActionModule module) { public void onModule(ActionModule module) {
licensing.onModule(module); licensing.onModule(module);
marvel.onModule(module);
shield.onModule(module); shield.onModule(module);
watcher.onModule(module); watcher.onModule(module);
graph.onModule(module); graph.onModule(module);