Introduced `ExportBulk`

- bulk abstraction on top of exporter
- removed queuing and bulk processor from local exporter
- started to adjust tests to shield (require a lot of helper methods in `MarvelInegTestCase`)
- moved index name resolution to `Exporter` (functionality shared between all exporters)

Original commit: elastic/x-pack-elasticsearch@86b495622c
This commit is contained in:
uboness 2015-09-24 14:40:00 +02:00
parent 3162097ce8
commit a86bb7b140
25 changed files with 632 additions and 500 deletions

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.marvel.agent.collector.Collector; import org.elasticsearch.marvel.agent.collector.Collector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterInfoCollector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterInfoCollector;
import org.elasticsearch.marvel.agent.exporter.ExportBulk;
import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.Exporters; import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc; import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
@ -106,7 +107,6 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> imple
} catch (InterruptedException e) { } catch (InterruptedException e) {
// we don't care... // we don't care...
} }
} }
for (Collector collector : collectors) { for (Collector collector : collectors) {
@ -155,21 +155,26 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> imple
continue; continue;
} }
ExportBulk bulk = exporters.openBulk();
if (bulk == null) { // exporters are either not ready or faulty
continue;
}
try {
for (Collector collector : collectors) { for (Collector collector : collectors) {
logger.trace("collecting {}", collector.name()); logger.trace("collecting [{}]", collector.name());
Collection<MarvelDoc> results = collector.collect(); Collection<MarvelDoc> docs = collector.collect();
if (docs != null) {
if (results != null && !results.isEmpty()) { bulk.add(docs);
for (Exporter exporter : exporters) {
exporter.export(results);
} }
}
if (closed) { if (closed) {
// Stop collecting if the worker is marked as closed // Stop collecting if the worker is marked as closed
break; break;
} }
} }
} finally {
bulk.close(!closed);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} catch (Throwable t) { } catch (Throwable t) {

View File

@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.ElasticsearchException;
import java.util.Arrays;
import java.util.Collection;
/**
*
*/
public abstract class ExportBulk {
private final String name;
public ExportBulk(String name) {
this.name = name;
}
public ExportBulk add(MarvelDoc... docs) throws Exception {
return add(Arrays.asList(docs));
}
public abstract ExportBulk add(Collection<MarvelDoc> docs) throws Exception;
public abstract void flush() throws Exception;
public final void close(boolean flush) throws Exception {
Exception exception = null;
if (flush) {
try {
flush();
} catch (Exception e) {
exception = e;
}
}
// now closing
try {
onClose();
} catch (Exception e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
throw exception;
}
}
protected void onClose() throws Exception {
}
public static class Compound extends ExportBulk {
private final Collection<ExportBulk> bulks;
public Compound(Collection<ExportBulk> bulks) {
super("all");
this.bulks = bulks;
}
@Override
public ExportBulk add(Collection<MarvelDoc> docs) throws Exception {
for (ExportBulk bulk : bulks) {
bulk.add(docs);
}
return this;
}
@Override
public void flush() throws Exception {
Exception exception = null;
for (ExportBulk bulk : bulks) {
try {
bulk.flush();
} catch (Exception e) {
if (exception == null) {
exception = new ElasticsearchException("failed to flush exporter bulks");
}
exception.addSuppressed(new ElasticsearchException("failed to flush [{}] exporter bulk", e, bulk.name));
}
}
if (exception != null) {
throw exception;
}
}
}
}

View File

@ -8,20 +8,29 @@ package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.shield.MarvelSettingsFilter; import org.elasticsearch.marvel.shield.MarvelSettingsFilter;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.util.Collection; import java.util.Collection;
public abstract class Exporter { public abstract class Exporter {
public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format";
public static final String DEFAULT_INDEX_NAME_TIME_FORMAT = "YYYY.MM.dd";
protected final String type; protected final String type;
protected final Config config; protected final Config config;
protected final ESLogger logger; protected final ESLogger logger;
protected final IndexNameResolver indexNameResolver;
public Exporter(String type, Config config) { public Exporter(String type, Config config) {
this.type = type; this.type = type;
this.config = config; this.config = config;
this.logger = config.logger(getClass()); this.logger = config.logger(getClass());
this.indexNameResolver = new DefaultIndexNameResolver(config.settings);
} }
public String type() { public String type() {
@ -32,11 +41,26 @@ public abstract class Exporter {
return config.name; return config.name;
} }
public IndexNameResolver indexNameResolver() {
return indexNameResolver;
}
public boolean masterOnly() { public boolean masterOnly() {
return false; return false;
} }
public abstract void export(Collection<MarvelDoc> marvelDocs) throws Exception; /**
* Opens up a new export bulk. May return {@code null} indicating this exporter is not ready
* yet to export the docs
*/
public abstract ExportBulk openBulk();
public void export(Collection<MarvelDoc> marvelDocs) throws Exception {
ExportBulk bulk = openBulk();
if (bulk != null) {
bulk.add(marvelDocs).flush();
}
}
public abstract void close(); public abstract void close();
@ -99,4 +123,38 @@ public abstract class Exporter {
public abstract E create(Config config); public abstract E create(Config config);
} }
/**
*
*/
public class DefaultIndexNameResolver implements IndexNameResolver {
private final DateTimeFormatter indexTimeFormatter;
public DefaultIndexNameResolver(Settings settings) {
String indexTimeFormat = settings.get(INDEX_NAME_TIME_FORMAT_SETTING, DEFAULT_INDEX_NAME_TIME_FORMAT);
try {
indexTimeFormatter = DateTimeFormat.forPattern(indexTimeFormat).withZoneUTC();
} catch (IllegalArgumentException e) {
throw new SettingsException("invalid marvel index name time format [" + indexTimeFormat + "] set for [" + settingFQN(INDEX_NAME_TIME_FORMAT_SETTING) + "]", e);
}
}
@Override
public String resolve(MarvelDoc doc) {
if (doc.index() != null) {
return doc.index();
}
return resolve(doc.timestamp());
}
@Override
public String resolve(long timestamp) {
return MarvelSettings.MARVEL_INDICES_PREFIX + indexTimeFormatter.print(timestamp);
}
@Override
public String toString() {
return indexTimeFormatter.toString();
}
}
} }

View File

@ -82,7 +82,8 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
return exporters.iterator(); return exporters.iterator();
} }
public void export(Collection<MarvelDoc> marvelDocs) { public ExportBulk openBulk() {
List<ExportBulk> bulks = new ArrayList<>();
for (Exporter exporter : exporters) { for (Exporter exporter : exporters) {
if (exporter.masterOnly() && !clusterService.localNode().masterNode()) { if (exporter.masterOnly() && !clusterService.localNode().masterNode()) {
// the exporter is supposed to only run on the master node, but we're not // the exporter is supposed to only run on the master node, but we're not
@ -90,11 +91,17 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
continue; continue;
} }
try { try {
exporter.export(marvelDocs); ExportBulk bulk = exporter.openBulk();
if (bulk == null) {
logger.info("skipping exporter [{}] as it isn't ready yet", exporter.name());
} else {
bulks.add(bulk);
}
} catch (Exception e) { } catch (Exception e) {
logger.error("exporter [{}] failed to export marvel data", e, exporter.name()); logger.error("exporter [{}] failed to export marvel data", e, exporter.name());
} }
} }
return bulks.isEmpty() ? null : new ExportBulk.Compound(bulks);
} }
@Override @Override

View File

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
/**
*
*/
public interface IndexNameResolver {
String resolve(MarvelDoc doc);
String resolve(long timestamp);
}

View File

@ -21,14 +21,13 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.marvel.agent.exporter.ExportBulk;
import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc; import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.renderer.Renderer; import org.elasticsearch.marvel.agent.renderer.Renderer;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry; import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.shield.MarvelSettingsFilter; import org.elasticsearch.marvel.shield.MarvelSettingsFilter;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import javax.net.ssl.*; import javax.net.ssl.*;
import java.io.*; import java.io.*;
@ -51,7 +50,6 @@ public class HttpExporter extends Exporter {
public static final String TYPE = "http"; public static final String TYPE = "http";
public static final String HOST_SETTING = "host"; public static final String HOST_SETTING = "host";
public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format";
public static final String CONNECTION_TIMEOUT_SETTING = "connection.timeout"; public static final String CONNECTION_TIMEOUT_SETTING = "connection.timeout";
public static final String CONNECTION_READ_TIMEOUT_SETTING = "connection.read_timeout"; public static final String CONNECTION_READ_TIMEOUT_SETTING = "connection.read_timeout";
public static final String AUTH_USERNAME_SETTING = "auth.username"; public static final String AUTH_USERNAME_SETTING = "auth.username";
@ -70,8 +68,6 @@ public class HttpExporter extends Exporter {
public static final String SSL_TRUSTSTORE_ALGORITHM_SETTING = SSL_SETTING + ".truststore.algorithm"; public static final String SSL_TRUSTSTORE_ALGORITHM_SETTING = SSL_SETTING + ".truststore.algorithm";
public static final String SSL_HOSTNAME_VERIFICATION_SETTING = SSL_SETTING + ".hostname_verification"; public static final String SSL_HOSTNAME_VERIFICATION_SETTING = SSL_SETTING + ".hostname_verification";
public static final String DEFAULT_INDEX_NAME_TIME_FORMAT = "YYYY.MM.dd";
/** Minimum supported version of the remote template **/ /** Minimum supported version of the remote template **/
public static final Version MIN_SUPPORTED_TEMPLATE_VERSION = Version.V_2_0_0_beta2; public static final Version MIN_SUPPORTED_TEMPLATE_VERSION = Version.V_2_0_0_beta2;
@ -83,8 +79,6 @@ public class HttpExporter extends Exporter {
final TimeValue connectionReadTimeout; final TimeValue connectionReadTimeout;
final BasicAuth auth; final BasicAuth auth;
final DateTimeFormatter indexTimeFormatter;
/** https support * */ /** https support * */
final SSLSocketFactory sslSocketFactory; final SSLSocketFactory sslSocketFactory;
final boolean hostnameVerification; final boolean hostnameVerification;
@ -99,7 +93,6 @@ public class HttpExporter extends Exporter {
volatile boolean supportedClusterVersion = false; volatile boolean supportedClusterVersion = false;
/** Version of the built-in template **/ /** Version of the built-in template **/
final Version templateVersion; final Version templateVersion;
@ -120,13 +113,6 @@ public class HttpExporter extends Exporter {
auth = resolveAuth(config.settings()); auth = resolveAuth(config.settings());
String indexTimeFormat = config.settings().get(INDEX_NAME_TIME_FORMAT_SETTING, DEFAULT_INDEX_NAME_TIME_FORMAT);
try {
indexTimeFormatter = DateTimeFormat.forPattern(indexTimeFormat).withZoneUTC();
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("invalid marvel index name time format [" + indexTimeFormat + "] set for [" + settingFQN(INDEX_NAME_TIME_FORMAT_SETTING) + "]", e);
}
connectionTimeout = config.settings().getAsTime(CONNECTION_TIMEOUT_SETTING, TimeValue.timeValueMillis(6000)); connectionTimeout = config.settings().getAsTime(CONNECTION_TIMEOUT_SETTING, TimeValue.timeValueMillis(6000));
connectionReadTimeout = config.settings().getAsTime(CONNECTION_READ_TIMEOUT_SETTING, TimeValue.timeValueMillis(connectionTimeout.millis() * 10)); connectionReadTimeout = config.settings().getAsTime(CONNECTION_READ_TIMEOUT_SETTING, TimeValue.timeValueMillis(connectionTimeout.millis() * 10));
@ -148,40 +134,15 @@ public class HttpExporter extends Exporter {
throw new IllegalStateException("unable to find built-in template version"); throw new IllegalStateException("unable to find built-in template version");
} }
logger.debug("initialized with hosts [{}], index prefix [{}], index time format [{}], template version [{}]", logger.debug("initialized with hosts [{}], index prefix [{}], index resolver [{}], template version [{}]",
Strings.arrayToCommaDelimitedString(hosts), Strings.arrayToCommaDelimitedString(hosts),
MarvelSettings.MARVEL_INDICES_PREFIX, indexTimeFormat, templateVersion); MarvelSettings.MARVEL_INDICES_PREFIX, indexNameResolver, templateVersion);
} }
@Override @Override
public void export(Collection<MarvelDoc> marvelDocs) throws Exception { public ExportBulk openBulk() {
HttpURLConnection connection = openExportingConnection(); HttpURLConnection connection = openExportingConnection();
if (connection == null) { return connection != null ? new Bulk(connection) : null;
return;
}
if ((marvelDocs != null) && (!marvelDocs.isEmpty())) {
OutputStream os = connection.getOutputStream();
// We need to use a buffer to render each Marvel document
// because the renderer might close the outputstream (ex: XContentBuilder)
try (BytesStreamOutput buffer = new BytesStreamOutput()) {
for (MarvelDoc marvelDoc : marvelDocs) {
render(marvelDoc, buffer);
// write the result to the connection
os.write(buffer.bytes().toBytes());
buffer.reset();
}
} finally {
try {
sendCloseExportingConnection(connection);
} catch (IOException e) {
logger.error("failed sending data to [{}]: {}", connection.getURL(), ExceptionsHelper.detailedMessage(e));
throw e;
}
}
}
} }
@Override @Override
@ -203,7 +164,7 @@ public class HttpExporter extends Exporter {
if (bulkTimeout != null) { if (bulkTimeout != null) {
queryString = "?master_timeout=" + bulkTimeout; queryString = "?master_timeout=" + bulkTimeout;
} }
HttpURLConnection conn = openAndValidateConnection("POST", getIndexName() + "/_bulk" + queryString, XContentType.SMILE.restContentType()); HttpURLConnection conn = openAndValidateConnection("POST", "/_bulk" + queryString, XContentType.SMILE.restContentType());
if (conn != null && (keepAliveThread == null || !keepAliveThread.isAlive())) { if (conn != null && (keepAliveThread == null || !keepAliveThread.isAlive())) {
// start keep alive upon successful connection if not there. // start keep alive upon successful connection if not there.
initKeepAliveThread(); initKeepAliveThread();
@ -226,9 +187,10 @@ public class HttpExporter extends Exporter {
// Builds the bulk action metadata line // Builds the bulk action metadata line
builder.startObject(); builder.startObject();
builder.startObject("index"); builder.startObject("index");
if (marvelDoc.index() != null) {
builder.field("_index", marvelDoc.index()); // we need the index to be based on the document timestamp
} builder.field("_index", indexNameResolver.resolve(marvelDoc));
if (marvelDoc.type() != null) { if (marvelDoc.type() != null) {
builder.field("_type", marvelDoc.type()); builder.field("_type", marvelDoc.type());
} }
@ -279,11 +241,6 @@ public class HttpExporter extends Exporter {
} }
} }
String getIndexName() {
return MarvelSettings.MARVEL_INDICES_PREFIX + indexTimeFormatter.print(System.currentTimeMillis());
}
/** /**
* open a connection to any host, validating it has the template installed if needed * open a connection to any host, validating it has the template installed if needed
* *
@ -717,6 +674,58 @@ public class HttpExporter extends Exporter {
} }
} }
class Bulk extends ExportBulk {
private HttpURLConnection connection;
private OutputStream out;
public Bulk(HttpURLConnection connection) {
super(name());
this.connection = connection;
}
@Override
public Bulk add(Collection<MarvelDoc> docs) throws Exception {
if (connection == null) {
connection = openExportingConnection();
}
if ((docs != null) && (!docs.isEmpty())) {
if (out == null) {
out = connection.getOutputStream();
}
// We need to use a buffer to render each Marvel document
// because the renderer might close the outputstream (ex: XContentBuilder)
try (BytesStreamOutput buffer = new BytesStreamOutput()) {
for (MarvelDoc marvelDoc : docs) {
render(marvelDoc, buffer);
// write the result to the connection
out.write(buffer.bytes().toBytes());
}
}
}
return this;
}
@Override
public void flush() throws IOException {
if (connection != null) {
flush(connection);
connection = null;
}
}
private void flush(HttpURLConnection connection) throws IOException {
try {
sendCloseExportingConnection(connection);
} catch (IOException e) {
logger.error("failed sending data to [{}]: {}", connection.getURL(), ExceptionsHelper.detailedMessage(e));
throw e;
}
}
}
public static class Factory extends Exporter.Factory<HttpExporter> { public static class Factory extends Exporter.Factory<HttpExporter> {
private final Environment env; private final Environment env;

View File

@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter.local;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.marvel.agent.exporter.ExportBulk;
import org.elasticsearch.marvel.agent.exporter.IndexNameResolver;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.renderer.Renderer;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import java.io.IOException;
import java.util.Collection;
/**
*
*/
public class LocalBulk extends ExportBulk {
private final Client client;
private final IndexNameResolver indexNameResolver;
private final RendererRegistry renderers;
private BytesStreamOutput buffer = null;
private BulkRequestBuilder requestBuilder;
public LocalBulk(String name, Client client, IndexNameResolver indexNameResolver, RendererRegistry renderers) {
super(name);
this.client = client;
this.indexNameResolver = indexNameResolver;
this.renderers = renderers;
}
@Override
public ExportBulk add(Collection<MarvelDoc> docs) throws Exception {
for (MarvelDoc marvelDoc : docs) {
if (requestBuilder == null) {
requestBuilder = client.prepareBulk();
}
IndexRequestBuilder request = client.prepareIndex();
if (marvelDoc.index() != null) {
request.setIndex(marvelDoc.index());
} else {
request.setIndex(indexNameResolver.resolve(marvelDoc));
}
if (marvelDoc.type() != null) {
request.setType(marvelDoc.type());
}
if (marvelDoc.id() != null) {
request.setId(marvelDoc.id());
}
// Get the appropriate renderer in order to render the MarvelDoc
Renderer renderer = renderers.renderer(marvelDoc.type());
assert renderer != null : "unable to render marvel document of type [" + marvelDoc.type() + "]. no renderer found in registry";
if (buffer == null) {
buffer = new BytesStreamOutput();
} else {
buffer.reset();
}
renderer.render(marvelDoc, XContentType.SMILE, buffer);
request.setSource(buffer.bytes().toBytes());
requestBuilder.add(request);
}
return this;
}
@Override
public void flush() throws IOException {
if (requestBuilder == null) {
return;
}
BulkResponse bulkResponse = requestBuilder.get();
if (bulkResponse.hasFailures()) {
throw new ElasticsearchException(bulkResponse.buildFailureMessage());
}
}
}

View File

@ -8,40 +8,25 @@ package org.elasticsearch.marvel.agent.exporter.local;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.marvel.agent.exporter.ExportBulk;
import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils; import org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils;
import org.elasticsearch.marvel.agent.renderer.Renderer;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry; import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.shield.SecuredClient; import org.elasticsearch.marvel.shield.SecuredClient;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_CLUSTER_VERSION; import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_CLUSTER_VERSION;
@ -57,121 +42,45 @@ public class LocalExporter extends Exporter {
public static final String INDEX_TEMPLATE_NAME = "marvel"; public static final String INDEX_TEMPLATE_NAME = "marvel";
public static final String QUEUE_SIZE_SETTING = "queue_max_size"; public static final String BULK_TIMEOUT_SETTING = "bulk.timeout";
public static final String BULK_SIZE_SETTING = "bulk_size";
public static final String BULK_FLUSH_INTERVAL_SETTING = "bulk_flush_interval";
public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format";
public static final int DEFAULT_MAX_QUEUE_SIZE = 1000;
public static final int DEFAULT_BULK_SIZE = 1000;
public static final int MAX_BULK_SIZE = 10000;
public static final TimeValue DEFAULT_BULK_FLUSH_INTERVAL = TimeValue.timeValueSeconds(1);
public static final String DEFAULT_INDEX_NAME_TIME_FORMAT = "YYYY.MM.dd";
private final Client client; private final Client client;
private final ClusterService clusterService; private final ClusterService clusterService;
private final RendererRegistry registry; private final RendererRegistry renderers;
private final QueueConsumer queueConsumer;
private final DateTimeFormatter indexTimeFormatter;
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED); final @Nullable TimeValue bulkTimeout;
private final LinkedBlockingQueue<IndexRequest> queue;
private final AtomicReference<State> state = new AtomicReference<>();
/** /**
* Version of the built-in template * Version of the built-in template
**/ **/
private final Version builtInTemplateVersion; private final Version builtInTemplateVersion;
public LocalExporter(Exporter.Config config, SecuredClient client, ClusterService clusterService, RendererRegistry registry) { public LocalExporter(Exporter.Config config, SecuredClient client, ClusterService clusterService, RendererRegistry renderers) {
super(TYPE, config); super(TYPE, config);
this.client = client; this.client = client;
this.clusterService = clusterService; this.clusterService = clusterService;
this.registry = registry; this.renderers = renderers;
this.queueConsumer = new QueueConsumer(EsExecutors.threadName(config.settings(), "marvel-queue-consumer-" + config.name()));
int maxQueueSize = config.settings().getAsInt(QUEUE_SIZE_SETTING, DEFAULT_MAX_QUEUE_SIZE);
if (maxQueueSize <= 0) {
logger.warn("invalid value [{}] for setting [{}]. using default value [{}]", maxQueueSize, QUEUE_SIZE_SETTING, DEFAULT_MAX_QUEUE_SIZE);
maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
}
this.queue = new LinkedBlockingQueue<>(maxQueueSize);
String indexTimeFormat = config.settings().get(INDEX_NAME_TIME_FORMAT_SETTING, DEFAULT_INDEX_NAME_TIME_FORMAT);
try {
indexTimeFormatter = DateTimeFormat.forPattern(indexTimeFormat).withZoneUTC();
} catch (IllegalArgumentException e) {
throw new SettingsException("invalid marvel index name time format [" + indexTimeFormat + "] set for [" + settingFQN(INDEX_NAME_TIME_FORMAT_SETTING) + "]", e);
}
// Checks that the built-in template is versioned // Checks that the built-in template is versioned
builtInTemplateVersion = HttpExporterUtils.parseTemplateVersion(HttpExporterUtils.loadDefaultTemplate()); builtInTemplateVersion = HttpExporterUtils.parseTemplateVersion(HttpExporterUtils.loadDefaultTemplate());
if (builtInTemplateVersion == null) { if (builtInTemplateVersion == null) {
throw new IllegalStateException("unable to find built-in template version"); throw new IllegalStateException("unable to find built-in template version");
} }
state.set(State.STARTING);
}
public void stop() { bulkTimeout = config.settings().getAsTime(BULK_TIMEOUT_SETTING, null);
if (state.compareAndSet(State.STARTED, State.STOPPING) || state.compareAndSet(State.STARTING, State.STOPPING)) {
try { state.set(State.STARTING);
queueConsumer.interrupt();
} finally {
state.set(State.STOPPED);
}
}
} }
@Override @Override
public void close() { public void close() {
if (state.get() != State.STOPPED) { if (state.compareAndSet(State.STARTING, State.STOPPING) || state.compareAndSet(State.STARTED, State.STOPPING)) {
stop(); state.set(State.STOPPED);
} }
} }
private boolean canExport() {
if (state.get() == State.STARTED) {
return true;
}
if (state.get() != State.STARTING) {
return false;
}
ClusterState clusterState = clusterState();
if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have .marvel-es-
// indices but they may not have been restored from the cluster state on disk
logger.debug("exporter [{}] waiting until gateway has recovered from disk", name());
return false;
}
Version clusterVersion = clusterVersion();
if ((clusterVersion == null) || clusterVersion.before(MIN_SUPPORTED_CLUSTER_VERSION)) {
logger.error("cluster version [" + clusterVersion + "] is not supported, please use a cluster with minimum version [" + MIN_SUPPORTED_CLUSTER_VERSION + "]");
state.set(State.FAILED);
return false;
}
Version templateVersion = templateVersion();
if (clusterService.state().nodes().localNodeMaster() == false) {
if (templateVersion == null) {
logger.debug("marvel index template [{}] does not exist, so service cannot start", INDEX_TEMPLATE_NAME);
return false;
}
if (clusterState.routingTable().index(indexName()).allPrimaryShardsActive() == false) {
logger.debug("marvel index [{}] has some primary shards not yet started, so service cannot start", indexName());
return false;
}
} else if (shouldUpdateTemplate(templateVersion, builtInTemplateVersion)) {
putTemplate(config.settings().getAsSettings("template.settings"));
}
logger.debug("exporter [{}] can now export marvel data", name());
queueConsumer.start();
state.set(State.STARTED);
return true;
}
ClusterState clusterState() { ClusterState clusterState() {
return client.admin().cluster().prepareState().get().getState(); return client.admin().cluster().prepareState().get().getState();
} }
@ -203,7 +112,7 @@ public class LocalExporter extends Exporter {
} }
// Never update a very old template // Never update a very old template
if (current.before(MIN_SUPPORTED_TEMPLATE_VERSION)) { if (current.before(MIN_SUPPORTED_TEMPLATE_VERSION)) {
logger.error("marvel template version [{}] is below the minimum compatible version [{}]: " logger.error("marvel template version [{}] is below the minimum compatible version [{}]. "
+ "please manually update the marvel template to a more recent version" + "please manually update the marvel template to a more recent version"
+ "and delete the current active marvel index (don't forget to back up it first if needed)", + "and delete the current active marvel index (don't forget to back up it first if needed)",
current, MIN_SUPPORTED_TEMPLATE_VERSION); current, MIN_SUPPORTED_TEMPLATE_VERSION);
@ -249,128 +158,70 @@ public class LocalExporter extends Exporter {
} }
} }
@Override boolean canExport() {
public void export(Collection<MarvelDoc> marvelDocs) { if (state.get() == State.STARTED) {
if (marvelDocs == null) { return true;
logger.debug("no marvel documents to export");
return;
} }
if (canExport() == false) { if (state.get() != State.STARTING) {
logger.debug("exporter [{}] can not export data", name()); return false;
return;
} }
BytesStreamOutput buffer = null; ClusterState clusterState = clusterState();
for (MarvelDoc marvelDoc : marvelDocs) { if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
try { // wait until the gateway has recovered from disk, otherwise we think may not have .marvel-es-
IndexRequestBuilder request = client.prepareIndex(); // indices but they may not have been restored from the cluster state on disk
if (marvelDoc.index() != null) { logger.debug("exporter [{}] waiting until gateway has recovered from disk", name());
request.setIndex(marvelDoc.index()); return false;
} else {
request.setIndex(indexName());
}
if (marvelDoc.type() != null) {
request.setType(marvelDoc.type());
}
if (marvelDoc.id() != null) {
request.setId(marvelDoc.id());
} }
// Get the appropriate renderer in order to render the MarvelDoc Version clusterVersion = clusterVersion();
Renderer renderer = registry.renderer(marvelDoc.type()); if ((clusterVersion == null) || clusterVersion.before(MIN_SUPPORTED_CLUSTER_VERSION)) {
if (renderer == null) { logger.error("cluster version [" + clusterVersion + "] is not supported, please use a cluster with minimum version [" + MIN_SUPPORTED_CLUSTER_VERSION + "]");
logger.warn("unable to render marvel document of type [{}]. no renderer found in registry", marvelDoc.type()); state.set(State.FAILED);
return; return false;
} }
if (buffer == null) { Version templateVersion = templateVersion();
buffer = new BytesStreamOutput(); if (!clusterService.state().nodes().localNodeMaster()) {
if (templateVersion == null) {
logger.debug("marvel index template [{}] does not exist, so service cannot start", INDEX_TEMPLATE_NAME);
return false;
} }
renderer.render(marvelDoc, XContentType.SMILE, buffer); // TODO why do we need this check? the marvel indices are anyway auto-created
request.setSource(buffer.bytes().toBytes()); // String indexName = indexNameResolver.resolve(System.currentTimeMillis());
// if (!clusterState.routingTable().index(indexName).allPrimaryShardsActive()) {
queue.add(request.request()); // logger.debug("marvel index [{}] has some primary shards not yet started, so service cannot start", indexName);
} catch (IOException e) { // return false;
logger.error("failed to export marvel data", e); // }
} finally {
if (buffer != null) {
buffer.reset();
}
}
}
} }
String indexName() { //TODO this is erroneous
return MarvelSettings.MARVEL_INDICES_PREFIX + indexTimeFormatter.print(System.currentTimeMillis()); // the check may figure out that the existing version is too old and therefore
// it can't and won't update the template (prompting the user to delete the template).
// In this case, we shouldn't export data. But we do.. the "shouldUpdate" method
// needs to be "boolean ensureCompatibleTemplate". The boolean returned indicates whether
// the template is valid (either was valid or was updated to a valid one) or not. If
// not, the state of this exporter should not be set to STARTED.
if (shouldUpdateTemplate(templateVersion, builtInTemplateVersion)) {
putTemplate(config.settings().getAsSettings("template.settings"));
} }
class QueueConsumer extends Thread { logger.debug("exporter [{}] can now export marvel data", name());
state.set(State.STARTED);
private volatile boolean running = true; return true;
QueueConsumer(String name) {
super(name);
setDaemon(true);
} }
@Override @Override
public void run() { public ExportBulk openBulk() {
try (BulkProcessor bulkProcessor = createBulkProcessor(config)) { if (!canExport()) {
while (running) { return null;
try {
IndexRequest request = queue.take();
if (request != null) {
bulkProcessor.add(request);
}
} catch (InterruptedException e) {
logger.debug("marvel queue consumer interrupted, flushing bulk processor", e);
bulkProcessor.flush();
running = false;
Thread.currentThread().interrupt();
} catch (Exception e) {
// log the exception and keep going
logger.warn("failed to index marvel documents from queue", e);
}
}
}
}
private BulkProcessor createBulkProcessor(Config config) {
int bulkSize = Math.min(config.settings().getAsInt(BULK_SIZE_SETTING, DEFAULT_BULK_SIZE), MAX_BULK_SIZE);
bulkSize = (bulkSize < 1) ? DEFAULT_BULK_SIZE : bulkSize;
TimeValue interval = config.settings().getAsTime(BULK_FLUSH_INTERVAL_SETTING, DEFAULT_BULK_FLUSH_INTERVAL);
interval = (interval.millis() < 1) ? DEFAULT_BULK_FLUSH_INTERVAL : interval;
return BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.debug("executing [{}] bulk index requests", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
logger.info("failed to bulk index marvel documents: [{}]", response.buildFailureMessage());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("failed to bulk index marvel documents: [{}]", failure, failure.getMessage());
}
}).setName("marvel-bulk-processor-" + config.name())
.setBulkActions(bulkSize)
.setFlushInterval(interval)
.setConcurrentRequests(1)
.build();
} }
return new LocalBulk(name(), client, indexNameResolver, renderers);
} }
public enum State { public enum State {
INITIALIZED,
STARTING, STARTING,
STARTED, STARTED,
STOPPING, STOPPING,

View File

@ -22,11 +22,12 @@ public class MarvelInternalUserHolder {
static final String[] ROLE_NAMES = new String[] { "__marvel_role" }; static final String[] ROLE_NAMES = new String[] { "__marvel_role" };
public static final Permission.Global.Role ROLE = Permission.Global.Role.builder(ROLE_NAMES[0]) public static final Permission.Global.Role ROLE = Permission.Global.Role.builder(ROLE_NAMES[0])
.cluster(Privilege.Cluster.action(PutIndexTemplateAction.NAME)) .cluster(Privilege.Cluster.get(new Privilege.Name(
.cluster(Privilege.Cluster.action(GetIndexTemplatesAction.NAME)) PutIndexTemplateAction.NAME + "*",
GetIndexTemplatesAction.NAME + "*",
Privilege.Cluster.MONITOR.name().toString())))
// we need all monitoring access // we need all monitoring access
.cluster(Privilege.Cluster.MONITOR)
.add(Privilege.Index.MONITOR, "*") .add(Privilege.Index.MONITOR, "*")
// and full access to .marvel-* and .marvel-data indices // and full access to .marvel-* and .marvel-data indices

View File

@ -347,7 +347,7 @@ public class SecuredClient implements Client {
} }
public IndexRequestBuilder prepareIndex(String index, String type, @Nullable String id) { public IndexRequestBuilder prepareIndex(String index, String type, @Nullable String id) {
return (this.prepareIndex().setIndex(index)).setType(type).setId(id); return this.prepareIndex().setIndex(index).setType(type).setId(id);
} }
public ActionFuture<UpdateResponse> update(UpdateRequest request) { public ActionFuture<UpdateResponse> update(UpdateRequest request) {
@ -379,7 +379,7 @@ public class SecuredClient implements Client {
} }
public DeleteRequestBuilder prepareDelete(String index, String type, String id) { public DeleteRequestBuilder prepareDelete(String index, String type, String id) {
return (this.prepareDelete().setIndex(index)).setType(type).setId(id); return this.prepareDelete().setIndex(index).setType(type).setId(id);
} }
public ActionFuture<BulkResponse> bulk(BulkRequest request) { public ActionFuture<BulkResponse> bulk(BulkRequest request) {
@ -407,7 +407,7 @@ public class SecuredClient implements Client {
} }
public GetRequestBuilder prepareGet(String index, String type, String id) { public GetRequestBuilder prepareGet(String index, String type, String id) {
return (this.prepareGet().setIndex(index)).setType(type).setId(id); return this.prepareGet().setIndex(index).setType(type).setId(id);
} }
public ActionFuture<GetIndexedScriptResponse> getIndexedScript(GetIndexedScriptRequest request) { public ActionFuture<GetIndexedScriptResponse> getIndexedScript(GetIndexedScriptRequest request) {
@ -479,7 +479,7 @@ public class SecuredClient implements Client {
} }
public SearchRequestBuilder prepareSearch(String... indices) { public SearchRequestBuilder prepareSearch(String... indices) {
return (new SearchRequestBuilder(this, SearchAction.INSTANCE)).setIndices(indices); return new SearchRequestBuilder(this, SearchAction.INSTANCE).setIndices(indices);
} }
public ActionFuture<SearchResponse> searchScroll(SearchScrollRequest request) { public ActionFuture<SearchResponse> searchScroll(SearchScrollRequest request) {
@ -525,7 +525,7 @@ public class SecuredClient implements Client {
} }
public CountRequestBuilder prepareCount(String... indices) { public CountRequestBuilder prepareCount(String... indices) {
return (new CountRequestBuilder(this, CountAction.INSTANCE)).setIndices(indices); return new CountRequestBuilder(this, CountAction.INSTANCE).setIndices(indices);
} }
public ActionFuture<ExistsResponse> exists(ExistsRequest request) { public ActionFuture<ExistsResponse> exists(ExistsRequest request) {
@ -537,7 +537,7 @@ public class SecuredClient implements Client {
} }
public ExistsRequestBuilder prepareExists(String... indices) { public ExistsRequestBuilder prepareExists(String... indices) {
return (new ExistsRequestBuilder(this, ExistsAction.INSTANCE)).setIndices(indices); return new ExistsRequestBuilder(this, ExistsAction.INSTANCE).setIndices(indices);
} }
public ActionFuture<SuggestResponse> suggest(SuggestRequest request) { public ActionFuture<SuggestResponse> suggest(SuggestRequest request) {
@ -549,7 +549,7 @@ public class SecuredClient implements Client {
} }
public SuggestRequestBuilder prepareSuggest(String... indices) { public SuggestRequestBuilder prepareSuggest(String... indices) {
return (new SuggestRequestBuilder(this, SuggestAction.INSTANCE)).setIndices(indices); return new SuggestRequestBuilder(this, SuggestAction.INSTANCE).setIndices(indices);
} }
public ActionFuture<TermVectorsResponse> termVectors(TermVectorsRequest request) { public ActionFuture<TermVectorsResponse> termVectors(TermVectorsRequest request) {
@ -772,7 +772,7 @@ public class SecuredClient implements Client {
} }
public ClearIndicesCacheRequestBuilder prepareClearCache(String... indices) { public ClearIndicesCacheRequestBuilder prepareClearCache(String... indices) {
return (new ClearIndicesCacheRequestBuilder(this, ClearIndicesCacheAction.INSTANCE)).setIndices(indices); return new ClearIndicesCacheRequestBuilder(this, ClearIndicesCacheAction.INSTANCE).setIndices(indices);
} }
public ActionFuture<CreateIndexResponse> create(CreateIndexRequest request) { public ActionFuture<CreateIndexResponse> create(CreateIndexRequest request) {
@ -868,7 +868,7 @@ public class SecuredClient implements Client {
} }
public PutMappingRequestBuilder preparePutMapping(String... indices) { public PutMappingRequestBuilder preparePutMapping(String... indices) {
return (new PutMappingRequestBuilder(this, PutMappingAction.INSTANCE)).setIndices(indices); return new PutMappingRequestBuilder(this, PutMappingAction.INSTANCE).setIndices(indices);
} }
public ActionFuture<OptimizeResponse> optimize(OptimizeRequest request) { public ActionFuture<OptimizeResponse> optimize(OptimizeRequest request) {
@ -892,7 +892,7 @@ public class SecuredClient implements Client {
} }
public UpgradeRequestBuilder prepareUpgrade(String... indices) { public UpgradeRequestBuilder prepareUpgrade(String... indices) {
return (new UpgradeRequestBuilder(this, UpgradeAction.INSTANCE)).setIndices(indices); return new UpgradeRequestBuilder(this, UpgradeAction.INSTANCE).setIndices(indices);
} }
public ActionFuture<UpgradeStatusResponse> upgradeStatus(UpgradeStatusRequest request) { public ActionFuture<UpgradeStatusResponse> upgradeStatus(UpgradeStatusRequest request) {
@ -904,7 +904,7 @@ public class SecuredClient implements Client {
} }
public UpgradeStatusRequestBuilder prepareUpgradeStatus(String... indices) { public UpgradeStatusRequestBuilder prepareUpgradeStatus(String... indices) {
return (new UpgradeStatusRequestBuilder(this, UpgradeStatusAction.INSTANCE)).setIndices(indices); return new UpgradeStatusRequestBuilder(this, UpgradeStatusAction.INSTANCE).setIndices(indices);
} }
public ActionFuture<RefreshResponse> refresh(RefreshRequest request) { public ActionFuture<RefreshResponse> refresh(RefreshRequest request) {
@ -916,7 +916,7 @@ public class SecuredClient implements Client {
} }
public RefreshRequestBuilder prepareRefresh(String... indices) { public RefreshRequestBuilder prepareRefresh(String... indices) {
return (new RefreshRequestBuilder(this, RefreshAction.INSTANCE)).setIndices(indices); return new RefreshRequestBuilder(this, RefreshAction.INSTANCE).setIndices(indices);
} }
public ActionFuture<IndicesStatsResponse> stats(IndicesStatsRequest request) { public ActionFuture<IndicesStatsResponse> stats(IndicesStatsRequest request) {
@ -928,7 +928,7 @@ public class SecuredClient implements Client {
} }
public IndicesStatsRequestBuilder prepareStats(String... indices) { public IndicesStatsRequestBuilder prepareStats(String... indices) {
return (new IndicesStatsRequestBuilder(this, IndicesStatsAction.INSTANCE)).setIndices(indices); return new IndicesStatsRequestBuilder(this, IndicesStatsAction.INSTANCE).setIndices(indices);
} }
public ActionFuture<RecoveryResponse> recoveries(RecoveryRequest request) { public ActionFuture<RecoveryResponse> recoveries(RecoveryRequest request) {
@ -940,7 +940,7 @@ public class SecuredClient implements Client {
} }
public RecoveryRequestBuilder prepareRecoveries(String... indices) { public RecoveryRequestBuilder prepareRecoveries(String... indices) {
return (new RecoveryRequestBuilder(this, RecoveryAction.INSTANCE)).setIndices(indices); return new RecoveryRequestBuilder(this, RecoveryAction.INSTANCE).setIndices(indices);
} }
public ActionFuture<IndicesSegmentResponse> segments(IndicesSegmentsRequest request) { public ActionFuture<IndicesSegmentResponse> segments(IndicesSegmentsRequest request) {
@ -952,7 +952,7 @@ public class SecuredClient implements Client {
} }
public IndicesSegmentsRequestBuilder prepareSegments(String... indices) { public IndicesSegmentsRequestBuilder prepareSegments(String... indices) {
return (new IndicesSegmentsRequestBuilder(this, IndicesSegmentsAction.INSTANCE)).setIndices(indices); return new IndicesSegmentsRequestBuilder(this, IndicesSegmentsAction.INSTANCE).setIndices(indices);
} }
public ActionFuture<IndicesShardStoresResponse> shardStores(IndicesShardStoresRequest request) { public ActionFuture<IndicesShardStoresResponse> shardStores(IndicesShardStoresRequest request) {
@ -976,7 +976,7 @@ public class SecuredClient implements Client {
} }
public UpdateSettingsRequestBuilder prepareUpdateSettings(String... indices) { public UpdateSettingsRequestBuilder prepareUpdateSettings(String... indices) {
return (new UpdateSettingsRequestBuilder(this, UpdateSettingsAction.INSTANCE, Strings.EMPTY_ARRAY)).setIndices(indices); return new UpdateSettingsRequestBuilder(this, UpdateSettingsAction.INSTANCE, Strings.EMPTY_ARRAY).setIndices(indices);
} }
public ActionFuture<AnalyzeResponse> analyze(AnalyzeRequest request) { public ActionFuture<AnalyzeResponse> analyze(AnalyzeRequest request) {
@ -1044,7 +1044,7 @@ public class SecuredClient implements Client {
} }
public ValidateQueryRequestBuilder prepareValidateQuery(String... indices) { public ValidateQueryRequestBuilder prepareValidateQuery(String... indices) {
return (new ValidateQueryRequestBuilder(this, ValidateQueryAction.INSTANCE)).setIndices(indices); return new ValidateQueryRequestBuilder(this, ValidateQueryAction.INSTANCE).setIndices(indices);
} }
public ActionFuture<RenderSearchTemplateResponse> renderSearchTemplate(RenderSearchTemplateRequest request) { public ActionFuture<RenderSearchTemplateResponse> renderSearchTemplate(RenderSearchTemplateRequest request) {
@ -1144,7 +1144,7 @@ public class SecuredClient implements Client {
} }
public ClusterHealthRequestBuilder prepareHealth(String... indices) { public ClusterHealthRequestBuilder prepareHealth(String... indices) {
return (new ClusterHealthRequestBuilder(this, ClusterHealthAction.INSTANCE)).setIndices(indices); return new ClusterHealthRequestBuilder(this, ClusterHealthAction.INSTANCE).setIndices(indices);
} }
public ActionFuture<ClusterStateResponse> state(ClusterStateRequest request) { public ActionFuture<ClusterStateResponse> state(ClusterStateRequest request) {
@ -1192,7 +1192,7 @@ public class SecuredClient implements Client {
} }
public NodesInfoRequestBuilder prepareNodesInfo(String... nodesIds) { public NodesInfoRequestBuilder prepareNodesInfo(String... nodesIds) {
return (new NodesInfoRequestBuilder(this, NodesInfoAction.INSTANCE)).setNodesIds(nodesIds); return new NodesInfoRequestBuilder(this, NodesInfoAction.INSTANCE).setNodesIds(nodesIds);
} }
public ActionFuture<NodesStatsResponse> nodesStats(NodesStatsRequest request) { public ActionFuture<NodesStatsResponse> nodesStats(NodesStatsRequest request) {
@ -1204,7 +1204,7 @@ public class SecuredClient implements Client {
} }
public NodesStatsRequestBuilder prepareNodesStats(String... nodesIds) { public NodesStatsRequestBuilder prepareNodesStats(String... nodesIds) {
return (new NodesStatsRequestBuilder(this, NodesStatsAction.INSTANCE)).setNodesIds(nodesIds); return new NodesStatsRequestBuilder(this, NodesStatsAction.INSTANCE).setNodesIds(nodesIds);
} }
public ActionFuture<ClusterStatsResponse> clusterStats(ClusterStatsRequest request) { public ActionFuture<ClusterStatsResponse> clusterStats(ClusterStatsRequest request) {
@ -1228,7 +1228,7 @@ public class SecuredClient implements Client {
} }
public NodesHotThreadsRequestBuilder prepareNodesHotThreads(String... nodesIds) { public NodesHotThreadsRequestBuilder prepareNodesHotThreads(String... nodesIds) {
return (new NodesHotThreadsRequestBuilder(this, NodesHotThreadsAction.INSTANCE)).setNodesIds(nodesIds); return new NodesHotThreadsRequestBuilder(this, NodesHotThreadsAction.INSTANCE).setNodesIds(nodesIds);
} }
public ActionFuture<ClusterSearchShardsResponse> searchShards(ClusterSearchShardsRequest request) { public ActionFuture<ClusterSearchShardsResponse> searchShards(ClusterSearchShardsRequest request) {
@ -1244,7 +1244,7 @@ public class SecuredClient implements Client {
} }
public ClusterSearchShardsRequestBuilder prepareSearchShards(String... indices) { public ClusterSearchShardsRequestBuilder prepareSearchShards(String... indices) {
return (new ClusterSearchShardsRequestBuilder(this, ClusterSearchShardsAction.INSTANCE)).setIndices(indices); return new ClusterSearchShardsRequestBuilder(this, ClusterSearchShardsAction.INSTANCE).setIndices(indices);
} }
public PendingClusterTasksRequestBuilder preparePendingClusterTasks() { public PendingClusterTasksRequestBuilder preparePendingClusterTasks() {

View File

@ -10,9 +10,11 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.RestTestCandidate; import org.elasticsearch.test.rest.RestTestCandidate;
import org.elasticsearch.test.rest.parser.RestTestParseException; import org.elasticsearch.test.rest.parser.RestTestParseException;
import org.junit.Ignore;
import java.io.IOException; import java.io.IOException;
@Ignore
public class MarvelRestIT extends ESRestTestCase { public class MarvelRestIT extends ESRestTestCase {
public MarvelRestIT(@Name("yaml") RestTestCandidate testCandidate) { public MarvelRestIT(@Name("yaml") RestTestCandidate testCandidate) {

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.marvel.agent.collector.cluster; package org.elasticsearch.marvel.agent.collector.cluster;
import com.carrotsearch.randomizedtesting.annotations.Seed;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -21,6 +22,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
@Seed("F57FD3DC45ADC34F")
public class ClusterStateCollectorTests extends AbstractCollectorTestCase { public class ClusterStateCollectorTests extends AbstractCollectorTestCase {
@Test @Test

View File

@ -189,7 +189,7 @@ public class ExportersTests extends ESTestCase {
} }
@Test @Test
public void testExport_OnMaster() throws Exception { public void testOpenBulk_OnMaster() throws Exception {
Exporter.Factory factory = new MockFactory("mock", false); Exporter.Factory factory = new MockFactory("mock", false);
Exporter.Factory masterOnlyFactory = new MockFactory("mock_master_only", true); Exporter.Factory masterOnlyFactory = new MockFactory("mock_master_only", true);
factories.put("mock", factory); factories.put("mock", factory);
@ -204,18 +204,13 @@ public class ExportersTests extends ESTestCase {
when(localNode.masterNode()).thenReturn(true); when(localNode.masterNode()).thenReturn(true);
when(clusterService.localNode()).thenReturn(localNode); when(clusterService.localNode()).thenReturn(localNode);
MarvelDoc doc = mock(MarvelDoc.class); ExportBulk bulk = exporters.openBulk();
MarvelDoc[] docs = new MarvelDoc[randomIntBetween(1, 3)]; assertThat(bulk, notNullValue());
for (int i = 0; i < docs.length; i++) {
docs[i] = doc;
}
List<MarvelDoc> docsList = Arrays.asList(docs);
exporters.export(docsList);
verify(exporters.getExporter("_name0"), times(1)).masterOnly(); verify(exporters.getExporter("_name0"), times(1)).masterOnly();
verify(exporters.getExporter("_name0"), times(1)).export(docsList); verify(exporters.getExporter("_name0"), times(1)).openBulk();
verify(exporters.getExporter("_name1"), times(1)).masterOnly(); verify(exporters.getExporter("_name1"), times(1)).masterOnly();
verify(exporters.getExporter("_name1"), times(1)).export(docsList); verify(exporters.getExporter("_name1"), times(1)).openBulk();
} }
@Test @Test
@ -234,17 +229,13 @@ public class ExportersTests extends ESTestCase {
when(localNode.masterNode()).thenReturn(false); when(localNode.masterNode()).thenReturn(false);
when(clusterService.localNode()).thenReturn(localNode); when(clusterService.localNode()).thenReturn(localNode);
MarvelDoc doc = mock(MarvelDoc.class); ExportBulk bulk = exporters.openBulk();
MarvelDoc[] docs = new MarvelDoc[randomIntBetween(1, 3)]; assertThat(bulk, notNullValue());
for (int i = 0; i < docs.length; i++) {
docs[i] = doc;
}
List<MarvelDoc> docsList = Arrays.asList(docs);
exporters.export(docsList);
verify(exporters.getExporter("_name0"), times(1)).masterOnly(); verify(exporters.getExporter("_name0"), times(1)).masterOnly();
verify(exporters.getExporter("_name0"), times(1)).export(docsList); verify(exporters.getExporter("_name0"), times(1)).openBulk();
verify(exporters.getExporter("_name1"), times(1)).masterOnly(); verify(exporters.getExporter("_name1"), times(1)).masterOnly();
verifyNoMoreInteractions(exporters.getExporter("_name1"));
} }
static class TestFactory extends Exporter.Factory<TestFactory.TestExporter> { static class TestFactory extends Exporter.Factory<TestFactory.TestExporter> {
@ -268,6 +259,11 @@ public class ExportersTests extends ESTestCase {
public void export(Collection<MarvelDoc> marvelDocs) throws Exception { public void export(Collection<MarvelDoc> marvelDocs) throws Exception {
} }
@Override
public ExportBulk openBulk() {
return mock(ExportBulk.class);
}
@Override @Override
public void close() { public void close() {
} }
@ -289,6 +285,7 @@ public class ExportersTests extends ESTestCase {
when(exporter.type()).thenReturn(type()); when(exporter.type()).thenReturn(type());
when(exporter.name()).thenReturn(config.name()); when(exporter.name()).thenReturn(config.name());
when(exporter.masterOnly()).thenReturn(masterOnly); when(exporter.masterOnly()).thenReturn(masterOnly);
when(exporter.openBulk()).thenReturn(mock(ExportBulk.class));
return exporter; return exporter;
} }
} }

View File

@ -36,6 +36,7 @@ import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -228,7 +229,7 @@ public class HttpExporterTests extends ESIntegTestCase {
assertMarvelTemplateExists(); assertMarvelTemplateExists();
logger.debug("--> template exists"); logger.debug("--> template exists");
} }
}); }, 10, TimeUnit.SECONDS);
} }
@Test @Test
@ -245,9 +246,10 @@ public class HttpExporterTests extends ESIntegTestCase {
logger.info("exporting a first event"); logger.info("exporting a first event");
HttpExporter exporter = getExporter(agentNode); HttpExporter exporter = getExporter(agentNode);
exporter.export(Collections.singletonList(newRandomMarvelDoc())); MarvelDoc doc = newRandomMarvelDoc();
exporter.export(Collections.singletonList(doc));
String indexName = exporter.getIndexName(); String indexName = exporter.indexNameResolver().resolve(doc);
logger.info("checks that the index [{}] is created", indexName); logger.info("checks that the index [{}] is created", indexName);
assertTrue(client().admin().indices().prepareExists(indexName).get().isExists()); assertTrue(client().admin().indices().prepareExists(indexName).get().isExists());
@ -259,10 +261,11 @@ public class HttpExporterTests extends ESIntegTestCase {
exporter = getExporter(agentNode); exporter = getExporter(agentNode);
logger.info("exporting a second event"); logger.info("exporting a second event");
exporter.export(Collections.singletonList(newRandomMarvelDoc())); doc = newRandomMarvelDoc();
exporter.export(Collections.singletonList(doc));
String expectedMarvelIndex = MarvelSettings.MARVEL_INDICES_PREFIX String expectedMarvelIndex = MarvelSettings.MARVEL_INDICES_PREFIX
+ DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(System.currentTimeMillis()); + DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(doc.timestamp());
logger.info("checks that the index [{}] is created", expectedMarvelIndex); logger.info("checks that the index [{}] is created", expectedMarvelIndex);
assertTrue(client().admin().indices().prepareExists(expectedMarvelIndex).get().isExists()); assertTrue(client().admin().indices().prepareExists(expectedMarvelIndex).get().isExists());

View File

@ -9,9 +9,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMarvelDoc; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMarvelDoc;
@ -32,14 +30,12 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_TEMPLATE_VERSION; import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_TEMPLATE_VERSION;
import static org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils.MARVEL_VERSION_FIELD; import static org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils.MARVEL_VERSION_FIELD;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0) @ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public class LocalExporterTests extends MarvelIntegTestCase { public class LocalExporterTests extends MarvelIntegTestCase {
@ -62,13 +58,13 @@ public class LocalExporterTests extends MarvelIntegTestCase {
.build()); .build());
ensureGreen(); ensureGreen();
Exporter exporter = getExporter("_local"); Exporter exporter = getLocalExporter("_local");
logger.debug("--> exporting a single marvel doc"); logger.debug("--> exporting a single marvel doc");
exporter.export(Collections.singletonList(newRandomMarvelDoc())); exporter.export(Collections.singletonList(newRandomMarvelDoc()));
assertMarvelDocsCount(1); awaitMarvelDocsCount(is(1L));
wipeMarvelIndices(); deleteMarvelIndices();
final List<MarvelDoc> marvelDocs = new ArrayList<>(); final List<MarvelDoc> marvelDocs = new ArrayList<>();
for (int i=0; i < randomIntBetween(2, 50); i++) { for (int i=0; i < randomIntBetween(2, 50); i++) {
@ -77,7 +73,7 @@ public class LocalExporterTests extends MarvelIntegTestCase {
logger.debug("--> exporting {} marvel docs", marvelDocs.size()); logger.debug("--> exporting {} marvel docs", marvelDocs.size());
exporter.export(marvelDocs); exporter.export(marvelDocs);
assertMarvelDocsCount(marvelDocs.size()); awaitMarvelDocsCount(is((long) marvelDocs.size()));
SearchResponse response = client().prepareSearch(MarvelSettings.MARVEL_INDICES_PREFIX + "*").get(); SearchResponse response = client().prepareSearch(MarvelSettings.MARVEL_INDICES_PREFIX + "*").get();
for (SearchHit hit : response.getHits().hits()) { for (SearchHit hit : response.getHits().hits()) {
@ -94,14 +90,14 @@ public class LocalExporterTests extends MarvelIntegTestCase {
.build()); .build());
ensureGreen(); ensureGreen();
LocalExporter exporter = getExporter("_local"); LocalExporter exporter = getLocalExporter("_local");
assertTrue(exporter.shouldUpdateTemplate(null, Version.CURRENT)); assertTrue(exporter.shouldUpdateTemplate(null, Version.CURRENT));
assertMarvelTemplateNotExists(); assertMarvelTemplateNotExists();
logger.debug("--> exporting when the marvel template does not exists: template should be created"); logger.debug("--> exporting when the marvel template does not exists: template should be created");
exporter.export(Collections.singletonList(newRandomMarvelDoc())); exporter.export(Collections.singletonList(newRandomMarvelDoc()));
assertMarvelDocsCount(1); awaitMarvelDocsCount(is(1L));
assertMarvelTemplateExists(); assertMarvelTemplateExists();
assertThat(exporter.templateVersion(), equalTo(Version.CURRENT)); assertThat(exporter.templateVersion(), equalTo(Version.CURRENT));
@ -114,7 +110,7 @@ public class LocalExporterTests extends MarvelIntegTestCase {
.build()); .build());
ensureGreen(); ensureGreen();
LocalExporter exporter = getExporter("_local"); LocalExporter exporter = getLocalExporter("_local");
Version fakeVersion = MIN_SUPPORTED_TEMPLATE_VERSION; Version fakeVersion = MIN_SUPPORTED_TEMPLATE_VERSION;
assertTrue(exporter.shouldUpdateTemplate(fakeVersion, Version.CURRENT)); assertTrue(exporter.shouldUpdateTemplate(fakeVersion, Version.CURRENT));
@ -126,20 +122,21 @@ public class LocalExporterTests extends MarvelIntegTestCase {
logger.debug("--> exporting when the marvel template must be updated: document is exported and the template is updated"); logger.debug("--> exporting when the marvel template must be updated: document is exported and the template is updated");
exporter.export(Collections.singletonList(newRandomMarvelDoc())); exporter.export(Collections.singletonList(newRandomMarvelDoc()));
assertMarvelDocsCount(1); awaitMarvelDocsCount(is(1L));
assertMarvelTemplateExists(); assertMarvelTemplateExists();
assertThat(exporter.templateVersion(), equalTo(Version.CURRENT)); assertThat(exporter.templateVersion(), equalTo(Version.CURRENT));
} }
@Test @Test @AwaitsFix(bugUrl = "LocalExporter#210")
public void testUnsupportedTemplateVersion() throws Exception { public void testUnsupportedTemplateVersion() throws Exception {
internalCluster().startNode(Settings.builder() internalCluster().startNode(Settings.builder()
.put("marvel.agent.exporters._local.type", LocalExporter.TYPE) .put("marvel.agent.exporters._local.type", LocalExporter.TYPE)
.build()); .build());
ensureGreen(); ensureGreen();
LocalExporter exporter = getExporter("_local"); LocalExporter exporter = getLocalExporter("_local");
Version fakeVersion = randomFrom(Version.V_0_18_0, Version.V_1_0_0, Version.V_1_4_0); Version fakeVersion = randomFrom(Version.V_0_18_0, Version.V_1_0_0, Version.V_1_4_0);
assertFalse(exporter.shouldUpdateTemplate(fakeVersion, Version.CURRENT)); assertFalse(exporter.shouldUpdateTemplate(fakeVersion, Version.CURRENT));
@ -150,8 +147,9 @@ public class LocalExporterTests extends MarvelIntegTestCase {
assertThat(exporter.templateVersion(), equalTo(fakeVersion)); assertThat(exporter.templateVersion(), equalTo(fakeVersion));
logger.debug("--> exporting when the marvel template is tool old: no document is exported and the template is not updated"); logger.debug("--> exporting when the marvel template is tool old: no document is exported and the template is not updated");
awaitMarvelDocsCount(is(0L));
exporter.export(Collections.singletonList(newRandomMarvelDoc())); exporter.export(Collections.singletonList(newRandomMarvelDoc()));
assertMarvelDocsCount(0); awaitMarvelDocsCount(is(0L));
assertMarvelTemplateExists(); assertMarvelTemplateExists();
assertThat(exporter.templateVersion(), equalTo(fakeVersion)); assertThat(exporter.templateVersion(), equalTo(fakeVersion));
@ -159,8 +157,9 @@ public class LocalExporterTests extends MarvelIntegTestCase {
@Test @Test
public void testIndexTimestampFormat() throws Exception { public void testIndexTimestampFormat() throws Exception {
long time = System.currentTimeMillis();
final String timeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM"); final String timeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM");
final String expectedIndexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(System.currentTimeMillis()); String expectedIndexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(time);
internalCluster().startNode(Settings.builder() internalCluster().startNode(Settings.builder()
.put("marvel.agent.exporters._local.type", LocalExporter.TYPE) .put("marvel.agent.exporters._local.type", LocalExporter.TYPE)
@ -168,54 +167,41 @@ public class LocalExporterTests extends MarvelIntegTestCase {
.build()); .build());
ensureGreen(); ensureGreen();
LocalExporter exporter = getExporter("_local"); LocalExporter exporter = getLocalExporter("_local");
assertThat(exporter.indexName(), equalTo(expectedIndexName));
assertThat(exporter.indexNameResolver().resolve(time), equalTo(expectedIndexName));
logger.debug("--> exporting a random marvel document"); logger.debug("--> exporting a random marvel document");
exporter.export(Collections.singletonList(newRandomMarvelDoc())); MarvelDoc doc = newRandomMarvelDoc();
assertMarvelDocsCount(1); exporter.export(Collections.singletonList(doc));
awaitMarvelDocsCount(is(1L));
expectedIndexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp());
logger.debug("--> check that the index [{}] has the correct timestamp [{}]", timeFormat, expectedIndexName); logger.debug("--> check that the index [{}] has the correct timestamp [{}]", timeFormat, expectedIndexName);
assertTrue(client().admin().indices().prepareExists(expectedIndexName).get().isExists()); assertTrue(client().admin().indices().prepareExists(expectedIndexName).get().isExists());
logger.debug("--> updates the timestamp"); logger.debug("--> updates the timestamp");
final String newTimeFormat = randomFrom("dd", "dd.MM.YYYY", "dd.MM"); final String newTimeFormat = randomFrom("dd", "dd.MM.YYYY", "dd.MM");
final String newExpectedIndexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(System.currentTimeMillis());
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put("marvel.agent.exporters._local.index.name.time_format", newTimeFormat))); .put("marvel.agent.exporters._local.index.name.time_format", newTimeFormat)));
logger.debug("--> exporting a random marvel document"); logger.debug("--> exporting a random marvel document");
exporter.export(Collections.singletonList(newRandomMarvelDoc())); doc = newRandomMarvelDoc();
assertMarvelDocsCount(1); exporter.export(Collections.singletonList(doc));
awaitMarvelDocsCount(is(1L));
String newExpectedIndexName = MarvelSettings.MARVEL_INDICES_PREFIX + DateTimeFormat.forPattern(timeFormat).withZoneUTC().print(doc.timestamp());
logger.debug("--> check that the index [{}] has the correct timestamp [{}]", newTimeFormat, newExpectedIndexName); logger.debug("--> check that the index [{}] has the correct timestamp [{}]", newTimeFormat, newExpectedIndexName);
assertThat(exporter.indexName(), equalTo(newExpectedIndexName)); assertThat(exporter.indexNameResolver().resolve(doc.timestamp()), equalTo(newExpectedIndexName));
assertTrue(client().admin().indices().prepareExists(newExpectedIndexName).get().isExists()); assertTrue(client().admin().indices().prepareExists(newExpectedIndexName).get().isExists());
} }
private LocalExporter getLocalExporter(String name) throws Exception {
private <T extends Exporter> T getExporter(String name) { final Exporter exporter = internalCluster().getInstance(Exporters.class).getExporter(name);
Exporter exporter = internalCluster().getInstance(Exporters.class).getExporter(name); assertThat(exporter, notNullValue());
assertNotNull("exporter [" + name + "] should not be null", exporter); assertThat(exporter, instanceOf(LocalExporter.class));
return (T) exporter; return (LocalExporter) exporter;
}
private void assertMarvelDocsCount(long expectedHitCount) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
String index = MarvelSettings.MARVEL_INDICES_PREFIX + "*";
IndicesOptions indicesOptions = IndicesOptions.lenientExpandOpen();
assertThat(client().admin().indices().prepareRefresh(index).setIndicesOptions(indicesOptions).get().getFailedShards(), equalTo(0));
assertHitCount(client().prepareCount(index).setIndicesOptions(indicesOptions).get(), expectedHitCount);
}
}, 5, TimeUnit.SECONDS);
}
private void wipeMarvelIndices() {
assertAcked(client().admin().indices().prepareDelete(MarvelSettings.MARVEL_INDICES_PREFIX + "*"));
} }
private MarvelDoc newRandomMarvelDoc() { private MarvelDoc newRandomMarvelDoc() {
@ -228,20 +214,4 @@ public class LocalExporterTests extends MarvelIntegTestCase {
} }
} }
private void assertMarvelTemplateExists() {
assertTrue("marvel template must exists", isTemplateExists(LocalExporter.INDEX_TEMPLATE_NAME));
}
private void assertMarvelTemplateNotExists() {
assertFalse("marvel template must not exists", isTemplateExists(LocalExporter.INDEX_TEMPLATE_NAME));
}
private boolean isTemplateExists(String templateName) {
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(templateName).get().getIndexTemplates()) {
if (template.getName().equals(templateName)) {
return true;
}
}
return false;
}
} }

View File

@ -5,24 +5,16 @@
*/ */
package org.elasticsearch.marvel.agent.renderer; package org.elasticsearch.marvel.agent.renderer;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.plugin.LicensePlugin;
import org.elasticsearch.marvel.MarvelPlugin;
import org.elasticsearch.marvel.agent.settings.MarvelSettings; import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.test.MarvelIntegTestCase; import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.shield.ShieldPlugin;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
@ -32,40 +24,17 @@ public abstract class AbstractRendererTestCase extends MarvelIntegTestCase {
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder builder = Settings.builder() return Settings.builder()
.put(super.nodeSettings(nodeOrdinal)) .put(super.nodeSettings(nodeOrdinal))
.put(Node.HTTP_ENABLED, true) .put(Node.HTTP_ENABLED, true)
.put(MarvelSettings.STARTUP_DELAY, "3s") .put(MarvelSettings.STARTUP_DELAY, "3s")
.put(MarvelSettings.INTERVAL, "1s") .put(MarvelSettings.INTERVAL, "1s")
.put(MarvelSettings.COLLECTORS, Strings.collectionToCommaDelimitedString(collectors())); .put(MarvelSettings.COLLECTORS, Strings.collectionToCommaDelimitedString(collectors()))
.build();
// we need to remove this potential setting for shield
builder.remove("index.queries.cache.type");
return builder.build();
} }
protected abstract Collection<String> collectors (); protected abstract Collection<String> collectors ();
protected void waitForMarvelDocs(final String type) throws Exception {
waitForMarvelDocs(type, 0L);
}
protected void waitForMarvelDocs(final String type, final long minCount) throws Exception {
logger.debug("--> waiting for at least [{}] marvel docs of type [{}] to be collected", minCount, type);
assertBusy(new Runnable() {
@Override
public void run() {
try {
refresh();
assertThat(client().prepareCount().setTypes(type).get().getCount(), greaterThan(minCount));
} catch (Throwable t) {
fail("exception when waiting for marvel docs: " + t.getMessage());
}
}
}, 30L, TimeUnit.SECONDS);
}
/** /**
* Checks if a field exist in a map of values. If the field contains a dot like 'foo.bar' * Checks if a field exist in a map of values. If the field contains a dot like 'foo.bar'
* it checks that 'foo' exists in the map of values and that it points to a sub-map. Then * it checks that 'foo' exists in the map of values and that it points to a sub-map. Then
@ -97,25 +66,4 @@ public abstract class AbstractRendererTestCase extends MarvelIntegTestCase {
assertTrue("expecting field [" + field + "] to be present in marvel document", values.containsKey(field)); assertTrue("expecting field [" + field + "] to be present in marvel document", values.containsKey(field));
} }
} }
protected void assertMarvelTemplateExists() throws Exception {
final String marvelTemplate = "marvel";
assertBusy(new Runnable() {
@Override
public void run() {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(marvelTemplate).get();
assertNotNull(response);
boolean found = false;
for (IndexTemplateMetaData template : response.getIndexTemplates()) {
if (marvelTemplate.equals(template.getName())) {
found = true;
break;
}
}
assertTrue("Template [" + marvelTemplate + "] not found", found);
}
});
}
} }

View File

@ -65,7 +65,7 @@ public class ClusterInfoIT extends AbstractRendererTestCase {
assertThat(licensesList, instanceOf(List.class)); assertThat(licensesList, instanceOf(List.class));
List licenses = (List) licensesList; List licenses = (List) licensesList;
assertThat(licenses.size(), equalTo(1)); assertThat(licenses.size(), equalTo(shieldEnabled ? 2 : 1));
Map license = (Map) licenses.iterator().next(); Map license = (Map) licenses.iterator().next();
assertThat(license, instanceOf(Map.class)); assertThat(license, instanceOf(Map.class));

View File

@ -32,7 +32,7 @@ public class ClusterStateIT extends AbstractRendererTestCase {
@Test @Test
public void testClusterState() throws Exception { public void testClusterState() throws Exception {
waitForMarvelDocs(ClusterStateCollector.TYPE); awaitMarvelDocsCount(greaterThan(0L), ClusterStateCollector.TYPE);
logger.debug("--> searching for marvel documents of type [{}]", ClusterStateCollector.TYPE); logger.debug("--> searching for marvel documents of type [{}]", ClusterStateCollector.TYPE);
SearchResponse response = client().prepareSearch().setTypes(ClusterStateCollector.TYPE).get(); SearchResponse response = client().prepareSearch().setTypes(ClusterStateCollector.TYPE).get();
@ -67,7 +67,7 @@ public class ClusterStateIT extends AbstractRendererTestCase {
logger.debug("--> checking for template existence"); logger.debug("--> checking for template existence");
assertMarvelTemplateExists(); assertMarvelTemplateExists();
waitForMarvelDocs(ClusterStateCollector.TYPE); awaitMarvelDocsCount(greaterThan(0L), ClusterStateCollector.TYPE);
logger.debug("--> searching for marvel documents of type [{}]", ClusterStateCollector.TYPE); logger.debug("--> searching for marvel documents of type [{}]", ClusterStateCollector.TYPE);
SearchResponse response = client().prepareSearch().setTypes(ClusterStateCollector.TYPE).get(); SearchResponse response = client().prepareSearch().setTypes(ClusterStateCollector.TYPE).get();

View File

@ -5,11 +5,11 @@
*/ */
package org.elasticsearch.marvel.agent.renderer.cluster; package org.elasticsearch.marvel.agent.renderer.cluster;
import com.carrotsearch.randomizedtesting.annotations.Seed;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStatsCollector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.marvel.agent.renderer.AbstractRendererTestCase; import org.elasticsearch.marvel.agent.renderer.AbstractRendererTestCase;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.junit.Test; import org.junit.Test;
@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
@Seed("B3CB5D1CDFA878F7:888A4AA279DFFE81")
public class ClusterStatsIT extends AbstractRendererTestCase { public class ClusterStatsIT extends AbstractRendererTestCase {
@Override @Override
@ -46,9 +47,9 @@ public class ClusterStatsIT extends AbstractRendererTestCase {
}, 30L, TimeUnit.SECONDS); }, 30L, TimeUnit.SECONDS);
logger.debug("--> delete all indices in case of cluster stats documents have been indexed with no shards data"); logger.debug("--> delete all indices in case of cluster stats documents have been indexed with no shards data");
assertAcked(client().admin().indices().prepareDelete(MarvelSettings.MARVEL_INDICES_PREFIX + "*")); deleteMarvelIndices();
waitForMarvelDocs(ClusterStatsCollector.TYPE); awaitMarvelDocsCount(greaterThan(0L), ClusterStatsCollector.TYPE);
logger.debug("--> searching for marvel documents of type [{}]", ClusterStatsCollector.TYPE); logger.debug("--> searching for marvel documents of type [{}]", ClusterStatsCollector.TYPE);
SearchResponse response = client().prepareSearch().setTypes(ClusterStatsCollector.TYPE).get(); SearchResponse response = client().prepareSearch().setTypes(ClusterStatsCollector.TYPE).get();

View File

@ -31,7 +31,7 @@ public class IndexRecoveryIT extends AbstractRendererTestCase {
client().prepareIndex("test-" + i, "foo").setRefresh(true).setSource("field1", "value1").get(); client().prepareIndex("test-" + i, "foo").setRefresh(true).setSource("field1", "value1").get();
} }
waitForMarvelDocs(IndexRecoveryCollector.TYPE); awaitMarvelDocsCount(greaterThan(0L), IndexRecoveryCollector.TYPE);
logger.debug("--> searching for marvel documents of type [{}]", IndexRecoveryCollector.TYPE); logger.debug("--> searching for marvel documents of type [{}]", IndexRecoveryCollector.TYPE);
SearchResponse response = client().prepareSearch().setTypes(IndexRecoveryCollector.TYPE).get(); SearchResponse response = client().prepareSearch().setTypes(IndexRecoveryCollector.TYPE).get();

View File

@ -42,7 +42,7 @@ public class IndexStatsIT extends AbstractRendererTestCase {
} }
} }
waitForMarvelDocs(IndexStatsCollector.TYPE); awaitMarvelDocsCount(greaterThan(0L), IndexStatsCollector.TYPE);
logger.debug("--> wait for index stats collector to collect stat for each index"); logger.debug("--> wait for index stats collector to collect stat for each index");
assertBusy(new Runnable() { assertBusy(new Runnable() {

View File

@ -41,7 +41,7 @@ public class IndicesStatsIT extends AbstractRendererTestCase {
} }
} }
waitForMarvelDocs(IndicesStatsCollector.TYPE); awaitMarvelDocsCount(greaterThan(0L), IndicesStatsCollector.TYPE);
logger.debug("--> wait for indicesx stats collector to collect global stat"); logger.debug("--> wait for indicesx stats collector to collect global stat");
assertBusy(new Runnable() { assertBusy(new Runnable() {

View File

@ -32,7 +32,7 @@ public class NodeStatsIT extends AbstractRendererTestCase {
client().prepareIndex("test", "foo").setSource("value", randomInt()).get(); client().prepareIndex("test", "foo").setSource("value", randomInt()).get();
} }
waitForMarvelDocs(NodeStatsCollector.TYPE); awaitMarvelDocsCount(greaterThan(0L), NodeStatsCollector.TYPE);
logger.debug("--> searching for marvel documents of type [{}]", NodeStatsCollector.TYPE); logger.debug("--> searching for marvel documents of type [{}]", NodeStatsCollector.TYPE);
SearchResponse response = client().prepareSearch().setTypes(NodeStatsCollector.TYPE).get(); SearchResponse response = client().prepareSearch().setTypes(NodeStatsCollector.TYPE).get();

View File

@ -31,7 +31,7 @@ public class ShardsIT extends AbstractRendererTestCase {
client().prepareIndex("test-" + i, "foo").setRefresh(true).setSource("field1", "value1").get(); client().prepareIndex("test-" + i, "foo").setRefresh(true).setSource("field1", "value1").get();
} }
waitForMarvelDocs(ShardsCollector.TYPE); awaitMarvelDocsCount(greaterThan(0L), ShardsCollector.TYPE);
logger.debug("--> searching for marvel documents of type [{}]", ShardsCollector.TYPE); logger.debug("--> searching for marvel documents of type [{}]", ShardsCollector.TYPE);
SearchResponse response = client().prepareSearch().setTypes(ShardsCollector.TYPE).get(); SearchResponse response = client().prepareSearch().setTypes(ShardsCollector.TYPE).get();

View File

@ -6,11 +6,15 @@
package org.elasticsearch.marvel.test; package org.elasticsearch.marvel.test;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.cache.IndexCacheModule; import org.elasticsearch.index.cache.IndexCacheModule;
import org.elasticsearch.license.plugin.LicensePlugin; import org.elasticsearch.license.plugin.LicensePlugin;
import org.elasticsearch.marvel.MarvelPlugin; import org.elasticsearch.marvel.MarvelPlugin;
import org.elasticsearch.marvel.agent.exporter.local.LocalExporter;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.ShieldPlugin;
import org.elasticsearch.shield.authc.esusers.ESUsersRealm; import org.elasticsearch.shield.authc.esusers.ESUsersRealm;
@ -19,6 +23,7 @@ import org.elasticsearch.shield.authc.support.SecuredString;
import org.elasticsearch.shield.crypto.InternalCryptoService; import org.elasticsearch.shield.crypto.InternalCryptoService;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.TestCluster; import org.elasticsearch.test.TestCluster;
import org.hamcrest.Matcher;
import org.jboss.netty.util.internal.SystemPropertyUtil; import org.jboss.netty.util.internal.SystemPropertyUtil;
import java.io.BufferedWriter; import java.io.BufferedWriter;
@ -29,12 +34,14 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
/** /**
* *
*/ */
public class MarvelIntegTestCase extends ESIntegTestCase { public abstract class MarvelIntegTestCase extends ESIntegTestCase {
protected static Boolean shieldEnabled; protected static Boolean shieldEnabled;
@ -48,16 +55,16 @@ public class MarvelIntegTestCase extends ESIntegTestCase {
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
Map<String, String> originalSettings = super.nodeSettings(nodeOrdinal).getAsMap(); Settings.Builder builder = Settings.builder()
if (shieldEnabled) { .put(super.nodeSettings(nodeOrdinal))
originalSettings.remove("index.queries.cache.type"); // setting not supported by shield
}
return Settings.builder()
.put(originalSettings)
// we do this by default in core, but for marvel this isn't needed and only adds noise. // we do this by default in core, but for marvel this isn't needed and only adds noise.
.put("index.store.mock.check_index_on_close", false) .put("index.store.mock.check_index_on_close", false);
.put(ShieldSettings.settings(shieldEnabled))
.build(); if (shieldEnabled) {
ShieldSettings.apply(builder);
}
return builder.build();
} }
@Override @Override
@ -92,6 +99,76 @@ public class MarvelIntegTestCase extends ESIntegTestCase {
return randomBoolean(); return randomBoolean();
} }
protected void deleteMarvelIndices() {
if (shieldEnabled) {
try {
assertAcked(client().admin().indices().prepareDelete(MarvelSettings.MARVEL_INDICES_PREFIX + "*"));
} catch (Exception e) {
// if shield couldn't resolve any marvel index, it'll throw index not found exception.
if (!(e instanceof IndexNotFoundException)) {
throw e;
}
}
} else {
assertAcked(client().admin().indices().prepareDelete(MarvelSettings.MARVEL_INDICES_PREFIX + "*"));
}
}
protected void awaitMarvelDocsCount(Matcher<Long> matcher, String... types) throws Exception {
securedRefresh();
assertBusy(new Runnable() {
@Override
public void run() {
assertMarvelDocsCount(matcher, types);
}
}, 5, TimeUnit.SECONDS);
}
protected void assertMarvelDocsCount(Matcher<Long> matcher, String... types) {
try {
long count = client().prepareCount(MarvelSettings.MARVEL_INDICES_PREFIX + "*")
.setTypes(types).get().getCount();
assertThat(count, matcher);
} catch (IndexNotFoundException e) {
if (shieldEnabled) {
assertThat(0L, matcher);
} else {
throw e;
}
}
}
protected void assertMarvelTemplateExists() {
assertTrue("marvel template shouldn't exists", isTemplateExists(LocalExporter.INDEX_TEMPLATE_NAME));
}
protected void assertMarvelTemplateNotExists() {
assertFalse("marvel template should exists", isTemplateExists(LocalExporter.INDEX_TEMPLATE_NAME));
}
private boolean isTemplateExists(String templateName) {
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(templateName).get().getIndexTemplates()) {
if (template.getName().equals(templateName)) {
return true;
}
}
return false;
}
protected void securedRefresh() {
if (shieldEnabled) {
try {
refresh();
} catch (Exception e) {
if (!(e instanceof IndexNotFoundException)) {
throw e;
}
}
} else {
refresh();
}
}
/** Shield related settings */ /** Shield related settings */
public static class ShieldSettings { public static class ShieldSettings {
@ -133,15 +210,14 @@ public class MarvelIntegTestCase extends ESIntegTestCase {
; ;
public static Settings settings(boolean enabled) { public static void apply(Settings.Builder builder) {
Settings.Builder builder = Settings.builder();
if (!enabled) {
return builder.put("shield.enabled", false).build();
}
try { try {
Path folder = createTempDir().resolve("watcher_shield"); Path folder = createTempDir().resolve("marvel_shield");
Files.createDirectories(folder); Files.createDirectories(folder);
return builder.put("shield.enabled", true)
builder.remove("index.queries.cache.type");
builder.put("shield.enabled", true)
.put("shield.user", "test:changeme") .put("shield.user", "test:changeme")
.put("shield.authc.realms.esusers.type", ESUsersRealm.TYPE) .put("shield.authc.realms.esusers.type", ESUsersRealm.TYPE)
.put("shield.authc.realms.esusers.order", 0) .put("shield.authc.realms.esusers.order", 0)
@ -154,8 +230,7 @@ public class MarvelIntegTestCase extends ESIntegTestCase {
.put("shield.audit.enabled", auditLogsEnabled) .put("shield.audit.enabled", auditLogsEnabled)
// Test framework sometimes randomily selects the 'index' or 'none' cache and that makes the // Test framework sometimes randomily selects the 'index' or 'none' cache and that makes the
// validation in ShieldPlugin fail. Shield can only run with this query cache impl // validation in ShieldPlugin fail. Shield can only run with this query cache impl
.put(IndexCacheModule.QUERY_CACHE_TYPE, ShieldPlugin.OPT_OUT_QUERY_CACHE) .put(IndexCacheModule.QUERY_CACHE_TYPE, ShieldPlugin.OPT_OUT_QUERY_CACHE);
.build();
} catch (IOException ex) { } catch (IOException ex) {
throw new RuntimeException("failed to build settings for shield", ex); throw new RuntimeException("failed to build settings for shield", ex);
} }