Merge Marvel 2.0

Original commit: elastic/x-pack-elasticsearch@ce68b3b983
This commit is contained in:
uboness 2015-07-17 18:43:52 +02:00
commit 315055012f
33 changed files with 3542 additions and 0 deletions

View File

@ -0,0 +1,14 @@
ELASTICSEARCH CONFIDENTIAL
__________________
[2014] Elasticsearch Incorporated. All Rights Reserved.
NOTICE: All information contained herein is, and remains
the property of Elasticsearch Incorporated and its suppliers,
if any. The intellectual and technical concepts contained
herein are proprietary to Elasticsearch Incorporated
and its suppliers and may be covered by U.S. and Foreign Patents,
patents in process, and are protected by trade secret or copyright law.
Dissemination of this information or reproduction of this material
is strictly forbidden unless prior written permission is obtained
from Elasticsearch Incorporated.

View File

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<additionalHeaders>
<javadoc_style>
<firstLine>/*</firstLine>
<beforeEachLine> * </beforeEachLine>
<endLine> */EOL</endLine>
<!--skipLine></skipLine-->
<firstLineDetectionPattern>(\s|\t)*/\*.*$</firstLineDetectionPattern>
<lastLineDetectionPattern>.*\*/(\s|\t)*$</lastLineDetectionPattern>
<allowBlankLines>false</allowBlankLines>
<isMultiline>true</isMultiline>
</javadoc_style>
</additionalHeaders>

42
marvel/pom.xml Normal file
View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-marvel</artifactId>
<name>Elastic X-Plugins - Marvel</name>
<parent>
<groupId>org.elasticsearch</groupId>
<artifactId>x-plugins</artifactId>
<version>2.0.0-beta1-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-license-plugin</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
<includes>
<include>**/*.properties</include>
<include>marvel_index_template.json</include>
</includes>
</resource>
</resources>
</build>
</project>

View File

@ -0,0 +1,17 @@
<assembly>
<id>plugin</id>
<formats>
<format>zip</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<useTransitiveFiltering>true</useTransitiveFiltering>
<excludes>
<exclude>org.elasticsearch:elasticsearch</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.Scopes;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.marvel.agent.AgentService;
import org.elasticsearch.marvel.agent.collector.CollectorModule;
import org.elasticsearch.marvel.agent.exporter.ExporterModule;
import org.elasticsearch.marvel.license.LicenseModule;
public class MarvelModule extends AbstractModule implements SpawnModules {
@Override
protected void configure() {
bind(AgentService.class).in(Scopes.SINGLETON);
}
@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(new LicenseModule(), new CollectorModule(), new ExporterModule());
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.AgentService;
import org.elasticsearch.marvel.license.LicenseService;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.tribe.TribeService;
import java.util.Collection;
public class MarvelPlugin extends AbstractPlugin {
private static final ESLogger logger = Loggers.getLogger(MarvelPlugin.class);
public static final String NAME = "marvel";
public static final String ENABLED = NAME + ".enabled";
private final boolean enabled;
public MarvelPlugin(Settings settings) {
this.enabled = marvelEnabled(settings);
}
@Override
public String name() {
return NAME;
}
@Override
public String description() {
return "Elasticsearch Marvel";
}
public boolean isEnabled() {
return enabled;
}
@Override
public Collection<Class<? extends Module>> modules() {
if (!enabled) {
return ImmutableList.of();
}
return ImmutableList.<Class<? extends Module>>of(MarvelModule.class);
}
@Override
public Collection<Class<? extends LifecycleComponent>> services() {
if (!enabled) {
return ImmutableList.of();
}
return ImmutableList.<Class<? extends LifecycleComponent>>of(LicenseService.class, AgentService.class);
}
public static boolean marvelEnabled(Settings settings) {
String tribe = settings.get(TribeService.TRIBE_NAME);
if (tribe != null) {
logger.trace("marvel cannot be started on tribe node [{}]", tribe);
return false;
}
if (!"node".equals(settings.get(Client.CLIENT_TYPE_SETTING))) {
logger.trace("marvel cannot be started on a transport client");
return false;
}
return settings.getAsBoolean(ENABLED, true);
}
}

View File

@ -0,0 +1,204 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.license.plugin.LicenseVersion;
import java.io.IOException;
import java.io.Serializable;
public class MarvelVersion implements Serializable {
// The logic for ID is: XXYYZZAA, where XX is major version, YY is minor version, ZZ is revision, and AA is Beta/RC indicator
// AA values below 50 are beta builds, and below 99 are RC builds, with 99 indicating a release
// the (internal) format of the id is there so we can easily do after/before checks on the id
public static final int V_2_0_0_Beta1_ID = /*00*/2000001;
public static final MarvelVersion V_2_0_0_Beta1 = new MarvelVersion(V_2_0_0_Beta1_ID, true, Version.V_2_0_0_Beta1, LicenseVersion.V_1_0_0);
public static final MarvelVersion CURRENT = V_2_0_0_Beta1;
public static MarvelVersion readVersion(StreamInput in) throws IOException {
return fromId(in.readVInt());
}
public static MarvelVersion fromId(int id) {
switch (id) {
case V_2_0_0_Beta1_ID:
return V_2_0_0_Beta1;
default:
return new MarvelVersion(id, null, Version.CURRENT, LicenseVersion.CURRENT);
}
}
public static void writeVersion(MarvelVersion version, StreamOutput out) throws IOException {
out.writeVInt(version.id);
}
/**
* Returns the smallest version between the 2.
*/
public static MarvelVersion smallest(MarvelVersion version1, MarvelVersion version2) {
return version1.id < version2.id ? version1 : version2;
}
/**
* Returns the version given its string representation, current version if the argument is null or empty
*/
public static MarvelVersion fromString(String version) {
if (!Strings.hasLength(version)) {
return MarvelVersion.CURRENT;
}
String[] parts = version.split("[\\.-]");
if (parts.length < 3 || parts.length > 4) {
throw new IllegalArgumentException("the version needs to contain major, minor and revision, and optionally the build");
}
try {
//we reverse the version id calculation based on some assumption as we can't reliably reverse the modulo
int major = Integer.parseInt(parts[0]) * 1000000;
int minor = Integer.parseInt(parts[1]) * 10000;
int revision = Integer.parseInt(parts[2]) * 100;
int build = 99;
if (parts.length == 4) {
String buildStr = parts[3];
if (buildStr.startsWith("beta")) {
build = Integer.parseInt(buildStr.substring(4));
}
if (buildStr.startsWith("rc")) {
build = Integer.parseInt(buildStr.substring(2)) + 50;
}
}
return fromId(major + minor + revision + build);
} catch(NumberFormatException e) {
throw new IllegalArgumentException("unable to parse version " + version, e);
}
}
public final int id;
public final byte major;
public final byte minor;
public final byte revision;
public final byte build;
public final Boolean snapshot;
public final Version minEsCompatibilityVersion;
public final LicenseVersion minLicenseCompatibilityVersion;
MarvelVersion(int id, @Nullable Boolean snapshot, Version minEsCompatibilityVersion, LicenseVersion minLicenseCompatibilityVersion) {
this.id = id;
this.major = (byte) ((id / 1000000) % 100);
this.minor = (byte) ((id / 10000) % 100);
this.revision = (byte) ((id / 100) % 100);
this.build = (byte) (id % 100);
this.snapshot = snapshot;
this.minEsCompatibilityVersion = minEsCompatibilityVersion;
this.minLicenseCompatibilityVersion = minLicenseCompatibilityVersion;
}
public boolean snapshot() {
return snapshot != null && snapshot;
}
public boolean after(MarvelVersion version) {
return version.id < id;
}
public boolean onOrAfter(MarvelVersion version) {
return version.id <= id;
}
public boolean before(MarvelVersion version) {
return version.id > id;
}
public boolean onOrBefore(MarvelVersion version) {
return version.id >= id;
}
public boolean compatibleWith(MarvelVersion version) {
return version.onOrAfter(minimumCompatibilityVersion());
}
public boolean compatibleWith(Version esVersion) {
return esVersion.onOrAfter(minEsCompatibilityVersion);
}
/**
* Returns the minimum compatible version based on the current
* version. Ie a node needs to have at least the return version in order
* to communicate with a node running the current version. The returned version
* is in most of the cases the smallest major version release unless the current version
* is a beta or RC release then the version itself is returned.
*/
public MarvelVersion minimumCompatibilityVersion() {
return MarvelVersion.smallest(this, fromId(major * 1000000 + 99));
}
/**
* @return The minimum elasticsearch version this marvel version is compatible with.
*/
public Version minimumEsCompatiblityVersion() {
return minEsCompatibilityVersion;
}
/**
* @return The minimum license plugin version this marvel version is compatible with.
*/
public LicenseVersion minimumLicenseCompatibilityVersion() {
return minLicenseCompatibilityVersion;
}
/**
* Just the version number (without -SNAPSHOT if snapshot).
*/
public String number() {
StringBuilder sb = new StringBuilder();
sb.append(major).append('.').append(minor).append('.').append(revision);
if (build < 50) {
sb.append("-beta").append(build);
} else if (build < 99) {
sb.append("-rc").append(build - 50);
}
return sb.toString();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(number());
if (snapshot()) {
sb.append("-SNAPSHOT");
}
return sb.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MarvelVersion that = (MarvelVersion) o;
if (id != that.id) return false;
return true;
}
@Override
public int hashCode() {
return id;
}
}

View File

@ -0,0 +1,215 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.marvel.agent.collector.Collector;
import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.license.LicenseService;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.Collection;
import java.util.Set;
public class AgentService extends AbstractLifecycleComponent<AgentService> implements NodeSettingsService.Listener {
private static final String SETTINGS_BASE = "marvel.agent.";
public static final String SETTINGS_INTERVAL = SETTINGS_BASE + "interval";
public static final String SETTINGS_INDICES = SETTINGS_BASE + "indices";
public static final String SETTINGS_ENABLED = SETTINGS_BASE + "enabled";
public static final String SETTINGS_STATS_TIMEOUT = SETTINGS_BASE + "stats.timeout";
public static final String SETTINGS_INDICES_STATS_TIMEOUT = SETTINGS_BASE + "stats.indices.timeout";
private volatile ExportingWorker exportingWorker;
private volatile Thread workerThread;
private volatile long samplingInterval;
volatile private String[] indicesToExport = Strings.EMPTY_ARRAY;
private volatile TimeValue indicesStatsTimeout;
private volatile TimeValue clusterStatsTimeout;
private final Collection<Collector> collectors;
private final Collection<Exporter> exporters;
@Inject
public AgentService(Settings settings, NodeSettingsService nodeSettingsService,
@ClusterDynamicSettings DynamicSettings dynamicSettings,
LicenseService licenseService,
Set<Collector> collectors, Set<Exporter> exporters) {
super(settings);
this.samplingInterval = settings.getAsTime(SETTINGS_INTERVAL, TimeValue.timeValueSeconds(10)).millis();
this.indicesToExport = settings.getAsArray(SETTINGS_INDICES, this.indicesToExport, true);
TimeValue statsTimeout = settings.getAsTime(SETTINGS_STATS_TIMEOUT, TimeValue.timeValueMinutes(10));
indicesStatsTimeout = settings.getAsTime(SETTINGS_INDICES_STATS_TIMEOUT, statsTimeout);
if (settings.getAsBoolean(SETTINGS_ENABLED, true)) {
this.collectors = ImmutableSet.copyOf(collectors);
this.exporters = ImmutableSet.copyOf(exporters);
} else {
this.collectors = ImmutableSet.of();
this.exporters = ImmutableSet.of();
logger.info("collecting disabled by settings");
}
nodeSettingsService.addListener(this);
dynamicSettings.addDynamicSetting(SETTINGS_INTERVAL);
dynamicSettings.addDynamicSetting(SETTINGS_INDICES + ".*"); // array settings
dynamicSettings.addDynamicSetting(SETTINGS_STATS_TIMEOUT);
dynamicSettings.addDynamicSetting(SETTINGS_INDICES_STATS_TIMEOUT);
logger.trace("marvel is running in [{}] mode", licenseService.mode());
}
protected void applyIntervalSettings() {
if (samplingInterval <= 0) {
logger.info("data sampling is disabled due to interval settings [{}]", samplingInterval);
if (workerThread != null) {
// notify worker to stop on its leisure, not to disturb an exporting operation
exportingWorker.closed = true;
exportingWorker = null;
workerThread = null;
}
} else if (workerThread == null || !workerThread.isAlive()) {
exportingWorker = new ExportingWorker();
workerThread = new Thread(exportingWorker, EsExecutors.threadName(settings, "marvel.exporters"));
workerThread.setDaemon(true);
workerThread.start();
}
}
@Override
protected void doStart() {
if (exporters.size() == 0) {
return;
}
for (Collector collector : collectors) {
collector.start();
}
for (Exporter exporter : exporters) {
exporter.start();
}
applyIntervalSettings();
}
@Override
protected void doStop() {
if (exporters.size() == 0) {
return;
}
if (workerThread != null && workerThread.isAlive()) {
exportingWorker.closed = true;
workerThread.interrupt();
try {
workerThread.join(60000);
} catch (InterruptedException e) {
// we don't care...
}
}
for (Collector collector : collectors) {
collector.stop();
}
for (Exporter exporter : exporters) {
exporter.stop();
}
}
@Override
protected void doClose() {
for (Collector collector : collectors) {
collector.close();
}
for (Exporter exporter : exporters) {
exporter.close();
}
}
// used for testing
public Collection<Exporter> getExporters() {
return exporters;
}
@Override
public void onRefreshSettings(Settings settings) {
TimeValue newSamplingInterval = settings.getAsTime(SETTINGS_INTERVAL, null);
if (newSamplingInterval != null && newSamplingInterval.millis() != samplingInterval) {
logger.info("sampling interval updated to [{}]", newSamplingInterval);
samplingInterval = newSamplingInterval.millis();
applyIntervalSettings();
}
String[] indices = settings.getAsArray(SETTINGS_INDICES, null, true);
if (indices != null) {
logger.info("sampling indices updated to [{}]", Strings.arrayToCommaDelimitedString(indices));
indicesToExport = indices;
}
TimeValue statsTimeout = settings.getAsTime(SETTINGS_STATS_TIMEOUT, TimeValue.timeValueMinutes(10));
TimeValue newTimeValue = settings.getAsTime(SETTINGS_INDICES_STATS_TIMEOUT, statsTimeout);
if (!indicesStatsTimeout.equals(newTimeValue)) {
logger.info("indices stats timeout updated to [{}]", newTimeValue);
indicesStatsTimeout = newTimeValue;
}
}
class ExportingWorker implements Runnable {
volatile boolean closed = false;
@Override
public void run() {
while (!closed) {
// sleep first to allow node to complete initialization before collecting the first start
try {
Thread.sleep(samplingInterval);
if (closed) {
continue;
}
for (Collector collector : collectors) {
logger.trace("collecting {}", collector.name());
Collection<MarvelDoc> results = collector.collect();
if (results != null && !results.isEmpty()) {
for (Exporter exporter : exporters) {
exporter.export(results);
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Throwable t) {
logger.error("Background thread had an uncaught exception:", t);
}
}
logger.debug("worker shutdown");
}
}
}

View File

@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.collector;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import java.util.Collection;
public abstract class AbstractCollector<T> extends AbstractLifecycleComponent<T> implements Collector<T> {
private final String name;
protected final ClusterService clusterService;
@Inject
public AbstractCollector(Settings settings, String name, ClusterService clusterService) {
super(settings);
this.name = name;
this.clusterService = clusterService;
}
@Override
public String name() {
return name;
}
@Override
public T start() {
logger.debug("starting collector [{}]", name());
return super.start();
}
@Override
protected void doStart() {
}
/**
* Indicates if the current collector should
* be executed on master node only.
*/
protected boolean masterOnly() {
return false;
}
@Override
public Collection<MarvelDoc> collect() {
if (masterOnly() && !clusterService.state().nodes().localNodeMaster()) {
logger.trace("collector [{}] runs on master only", name());
return null;
}
try {
return doCollect();
} catch (ElasticsearchTimeoutException e) {
logger.error("collector [{}] timed out when collecting data");
} catch (Exception e) {
logger.error("collector [{}] throws exception when collecting data", e, name());
}
return null;
}
protected abstract Collection<MarvelDoc> doCollect() throws Exception;
@Override
public T stop() {
logger.debug("stopping collector [{}]", name());
return super.stop();
}
@Override
protected void doStop() {
}
@Override
public void close() {
logger.trace("closing collector [{}]", name());
super.close();
}
@Override
protected void doClose() {
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.collector;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import java.util.Collection;
public interface Collector<T> extends LifecycleComponent<T> {
String name();
Collection<MarvelDoc> collect();
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.collector;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.marvel.agent.collector.indices.IndexCollector;
import java.util.HashSet;
import java.util.Set;
public class CollectorModule extends AbstractModule {
private final Set<Class<? extends Collector>> collectors = new HashSet<>();
public CollectorModule() {
// Registers default collectors
registerCollector(IndexCollector.class);
}
@Override
protected void configure() {
Multibinder<Collector> binder = Multibinder.newSetBinder(binder(), Collector.class);
for (Class<? extends Collector> collector : collectors) {
bind(collector).asEagerSingleton();
binder.addBinding().to(collector);
}
}
public void registerCollector(Class<? extends Collector> collector) {
collectors.add(collector);
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.collector.indices;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.collector.AbstractCollector;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import java.util.Collection;
/**
* Collector for indices statistics.
*
* This collector runs on the master node only and collect a {@link IndexMarvelDoc} document
* for each existing index in the cluster.
*/
public class IndexCollector extends AbstractCollector<IndexCollector> {
public static final String NAME = "index-collector";
protected static final String TYPE = "marvel_index";
private final ClusterName clusterName;
private final Client client;
@Inject
public IndexCollector(Settings settings, ClusterService clusterService, ClusterName clusterName, Client client) {
super(settings, NAME, clusterService);
this.client = client;
this.clusterName = clusterName;
}
@Override
protected boolean masterOnly() {
return true;
}
@Override
protected Collection<MarvelDoc> doCollect() throws Exception {
ImmutableList.Builder<MarvelDoc> results = ImmutableList.builder();
IndicesStatsResponse indicesStats = client.admin().indices().prepareStats().all()
.setStore(true)
.setIndexing(true)
.setDocs(true)
.get();
long timestamp = System.currentTimeMillis();
for (IndexStats indexStats : indicesStats.getIndices().values()) {
results.add(buildMarvelDoc(clusterName.value(), TYPE, timestamp, indexStats));
}
return results.build();
}
protected MarvelDoc buildMarvelDoc(String clusterName, String type, long timestamp, IndexStats indexStats) {
return IndexMarvelDoc.createMarvelDoc(clusterName, type, timestamp,
indexStats.getIndex(),
indexStats.getTotal().getDocs().getCount(),
indexStats.getTotal().getStore().sizeInBytes(), indexStats.getTotal().getStore().throttleTime().millis(),
indexStats.getTotal().getIndexing().getTotal().getThrottleTimeInMillis());
}
}

View File

@ -0,0 +1,172 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.collector.indices;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class IndexMarvelDoc extends MarvelDoc<IndexMarvelDoc> {
private final String index;
private final Docs docs;
private final Store store;
private final Indexing indexing;
public IndexMarvelDoc(String clusterName, String type, long timestamp,
String index, Docs docs, Store store, Indexing indexing) {
super(clusterName, type, timestamp);
this.index = index;
this.docs = docs;
this.store = store;
this.indexing = indexing;
}
@Override
public IndexMarvelDoc payload() {
return this;
}
public String getIndex() {
return index;
}
public Docs getDocs() {
return docs;
}
public Store getStore() {
return store;
}
public Indexing getIndexing() {
return indexing;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
super.toXContent(builder, params);
builder.field(Fields.INDEX, index);
if (docs != null) {
docs.toXContent(builder, params);
}
if (store != null) {
store.toXContent(builder, params);
}
if (indexing != null) {
indexing.toXContent(builder, params);
}
builder.endObject();
return builder;
}
public static IndexMarvelDoc createMarvelDoc(String clusterName, String type, long timestamp,
String index, long docsCount, long storeSizeInBytes, long storeThrottleTimeInMillis, long indexingThrottleTimeInMillis) {
return new IndexMarvelDoc(clusterName, type, timestamp, index,
new Docs(docsCount),
new Store(storeSizeInBytes, storeThrottleTimeInMillis),
new Indexing(indexingThrottleTimeInMillis));
}
static final class Fields {
static final XContentBuilderString INDEX = new XContentBuilderString("index");
}
static class Docs implements ToXContent {
private final long count;
Docs(long count) {
this.count = count;
}
public long getCount() {
return count;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.DOCS);
builder.field(Fields.COUNT, count);
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString DOCS = new XContentBuilderString("docs");
static final XContentBuilderString COUNT = new XContentBuilderString("count");
}
}
static class Store implements ToXContent {
private final long sizeInBytes;
private final long throttleTimeInMillis;
public Store(long sizeInBytes, long throttleTimeInMillis) {
this.sizeInBytes = sizeInBytes;
this.throttleTimeInMillis = throttleTimeInMillis;
}
public long getSizeInBytes() {
return sizeInBytes;
}
public long getThrottleTimeInMillis() {
return throttleTimeInMillis;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.STORE);
builder.field(Fields.SIZE_IN_BYTES, sizeInBytes);
builder.timeValueField(Fields.THROTTLE_TIME_IN_MILLIS, Fields.THROTTLE_TIME, new TimeValue(throttleTimeInMillis, TimeUnit.MILLISECONDS));
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString STORE = new XContentBuilderString("store");
static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes");
static final XContentBuilderString THROTTLE_TIME = new XContentBuilderString("throttle_time");
static final XContentBuilderString THROTTLE_TIME_IN_MILLIS = new XContentBuilderString("throttle_time_in_millis");
}
}
static class Indexing implements ToXContent {
private final long throttleTimeInMillis;
public Indexing(long throttleTimeInMillis) {
this.throttleTimeInMillis = throttleTimeInMillis;
}
public long getThrottleTimeInMillis() {
return throttleTimeInMillis;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.INDEXING);
builder.timeValueField(Fields.THROTTLE_TIME_IN_MILLIS, Fields.THROTTLE_TIME, new TimeValue(throttleTimeInMillis, TimeUnit.MILLISECONDS));
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString INDEXING = new XContentBuilderString("indexing");
static final XContentBuilderString THROTTLE_TIME = new XContentBuilderString("throttle_time");
static final XContentBuilderString THROTTLE_TIME_IN_MILLIS = new XContentBuilderString("throttle_time_in_millis");
}
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.util.Collection;
public abstract class AbstractExporter<T> extends AbstractLifecycleComponent<T> implements Exporter<T> {
private final String name;
protected final ClusterService clusterService;
@Inject
public AbstractExporter(Settings settings, String name, ClusterService clusterService) {
super(settings);
this.name = name;
this.clusterService = clusterService;
}
@Override
public String name() {
return name;
}
@Override
public T start() {
logger.debug("starting exporter [{}]", name());
return super.start();
}
@Override
protected void doStart() {
}
protected boolean masterOnly() {
return false;
}
@Override
public void export(Collection<MarvelDoc> marvelDocs) {
if (masterOnly() && !clusterService.state().nodes().localNodeMaster()) {
logger.trace("exporter [{}] runs on master only", name());
return;
}
if (marvelDocs == null) {
logger.debug("no objects to export for [{}]", name());
return;
}
try {
doExport(marvelDocs);
} catch (Exception e) {
logger.error("export [{}] throws exception when exporting data", e, name());
}
}
protected abstract void doExport(Collection<MarvelDoc> marvelDocs) throws Exception;
@Override
public T stop() {
logger.debug("stopping exporter [{}]", name());
return super.stop();
}
@Override
protected void doStop() {
}
@Override
public void close() {
logger.trace("closing exporter [{}]", name());
super.close();
}
@Override
protected void doClose() {
}
}

View File

@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.common.component.LifecycleComponent;
import java.util.Collection;
public interface Exporter<T> extends LifecycleComponent<T> {
String name();
void export(Collection<MarvelDoc> marvelDocs);
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import java.util.HashSet;
import java.util.Set;
public class ExporterModule extends AbstractModule {
private final Set<Class<? extends Exporter>> exporters = new HashSet<>();
public ExporterModule() {
// Registers default exporter
registerExporter(HttpESExporter.class);
}
@Override
protected void configure() {
Multibinder<Exporter> binder = Multibinder.newSetBinder(binder(), Exporter.class);
for (Class<? extends Exporter> exporter : exporters) {
bind(exporter).asEagerSingleton();
binder.addBinding().to(exporter);
}
}
public void registerExporter(Class<? extends Exporter> exporter) {
exporters.add(exporter);
}
}

View File

@ -0,0 +1,652 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import com.google.common.io.ByteStreams;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.smile.SmileXContent;
import org.elasticsearch.env.Environment;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.marvel.agent.support.AgentUtils;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import javax.net.ssl.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
public class HttpESExporter extends AbstractExporter<HttpESExporter> implements NodeSettingsService.Listener {
private static final String NAME = "es_exporter";
private static final String SETTINGS_PREFIX = "marvel.agent.exporter.es.";
public static final String SETTINGS_HOSTS = SETTINGS_PREFIX + "hosts";
public static final String SETTINGS_INDEX_PREFIX = SETTINGS_PREFIX + "index.prefix";
public static final String SETTINGS_INDEX_TIME_FORMAT = SETTINGS_PREFIX + "index.timeformat";
public static final String SETTINGS_TIMEOUT = SETTINGS_PREFIX + "timeout";
public static final String SETTINGS_READ_TIMEOUT = SETTINGS_PREFIX + "read_timeout";
// es level timeout used when checking and writing templates (used to speed up tests)
public static final String SETTINGS_CHECK_TEMPLATE_TIMEOUT = SETTINGS_PREFIX + ".template.master_timeout";
// es level timeout used for bulk indexing (used to speed up tests)
public static final String SETTINGS_BULK_TIMEOUT = SETTINGS_PREFIX + ".bulk.timeout";
volatile String[] hosts;
volatile boolean boundToLocalNode = false;
final String indexPrefix;
final DateTimeFormatter indexTimeFormatter;
volatile int timeoutInMillis;
volatile int readTimeoutInMillis;
/** https support * */
final SSLSocketFactory sslSocketFactory;
volatile boolean hostnameVerification;
final ClusterService clusterService;
final ClusterName clusterName;
final NodeService nodeService;
final Environment environment;
HttpServer httpServer;
final boolean httpEnabled;
@Nullable
final TimeValue templateCheckTimeout;
@Nullable
final TimeValue bulkTimeout;
volatile boolean checkedAndUploadedIndexTemplate = false;
final ConnectionKeepAliveWorker keepAliveWorker;
Thread keepAliveThread;
@Inject
public HttpESExporter(Settings settings, ClusterService clusterService, ClusterName clusterName,
@ClusterDynamicSettings DynamicSettings dynamicSettings,
NodeSettingsService nodeSettingsService,
NodeService nodeService, Environment environment) {
super(settings, NAME, clusterService);
this.clusterService = clusterService;
this.clusterName = clusterName;
this.nodeService = nodeService;
this.environment = environment;
httpEnabled = settings.getAsBoolean(Node.HTTP_ENABLED, true);
hosts = settings.getAsArray(SETTINGS_HOSTS, Strings.EMPTY_ARRAY);
validateHosts(hosts);
indexPrefix = settings.get(SETTINGS_INDEX_PREFIX, ".marvel");
String indexTimeFormat = settings.get(SETTINGS_INDEX_TIME_FORMAT, "YYYY.MM.dd");
indexTimeFormatter = DateTimeFormat.forPattern(indexTimeFormat).withZoneUTC();
timeoutInMillis = (int) settings.getAsTime(SETTINGS_TIMEOUT, new TimeValue(6000)).millis();
readTimeoutInMillis = (int) settings.getAsTime(SETTINGS_READ_TIMEOUT, new TimeValue(timeoutInMillis * 10)).millis();
templateCheckTimeout = settings.getAsTime(SETTINGS_CHECK_TEMPLATE_TIMEOUT, null);
bulkTimeout = settings.getAsTime(SETTINGS_CHECK_TEMPLATE_TIMEOUT, null);
keepAliveWorker = new ConnectionKeepAliveWorker();
dynamicSettings.addDynamicSetting(SETTINGS_HOSTS);
dynamicSettings.addDynamicSetting(SETTINGS_HOSTS + ".*");
dynamicSettings.addDynamicSetting(SETTINGS_TIMEOUT);
dynamicSettings.addDynamicSetting(SETTINGS_READ_TIMEOUT);
dynamicSettings.addDynamicSetting(SETTINGS_SSL_HOSTNAME_VERIFICATION);
nodeSettingsService.addListener(this);
if (!settings.getByPrefix(SETTINGS_SSL_PREFIX).getAsMap().isEmpty()) {
sslSocketFactory = createSSLSocketFactory(settings);
} else {
logger.trace("no ssl context configured");
sslSocketFactory = null;
}
hostnameVerification = settings.getAsBoolean(SETTINGS_SSL_HOSTNAME_VERIFICATION, true);
logger.debug("initialized with targets: {}, index prefix [{}], index time format [{}]",
AgentUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts)), indexPrefix, indexTimeFormat);
}
static private void validateHosts(String[] hosts) {
for (String host : hosts) {
try {
AgentUtils.parseHostWithPath(host, "");
} catch (URISyntaxException e) {
throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + AgentUtils.santizeUrlPwds(host) + "]." +
" error: [" + AgentUtils.santizeUrlPwds(e.getMessage()) + "]");
} catch (MalformedURLException e) {
throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + AgentUtils.santizeUrlPwds(host) + "]." +
" error: [" + AgentUtils.santizeUrlPwds(e.getMessage()) + "]");
}
}
}
@Override
public String name() {
return NAME;
}
@Inject(optional = true)
public void setHttpServer(HttpServer httpServer) {
this.httpServer = httpServer;
}
private HttpURLConnection openExportingConnection() {
logger.trace("setting up an export connection");
String queryString = "";
if (bulkTimeout != null) {
queryString = "?master_timeout=" + bulkTimeout;
}
HttpURLConnection conn = openAndValidateConnection("POST", getIndexName() + "/_bulk" + queryString, XContentType.SMILE.restContentType());
if (conn != null && (keepAliveThread == null || !keepAliveThread.isAlive())) {
// start keep alive upon successful connection if not there.
initKeepAliveThread();
}
return conn;
}
private void addMarvelDocToConnection(HttpURLConnection conn,
MarvelDoc marvelDoc) throws IOException {
OutputStream os = conn.getOutputStream();
// TODO: find a way to disable builder's substream flushing or something neat solution
XContentBuilder builder = XContentFactory.smileBuilder();
builder.startObject().startObject("index")
.field("_type", marvelDoc.type())
.endObject().endObject();
builder.close();
builder.bytes().writeTo(os);
os.write(SmileXContent.smileXContent.streamSeparator());
builder = XContentFactory.smileBuilder();
builder.humanReadable(false);
marvelDoc.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.close();
builder.bytes().writeTo(os);
os.write(SmileXContent.smileXContent.streamSeparator());
}
@SuppressWarnings("unchecked")
private void sendCloseExportingConnection(HttpURLConnection conn) throws IOException {
logger.trace("sending content");
OutputStream os = conn.getOutputStream();
os.close();
if (conn.getResponseCode() != 200) {
logConnectionError("remote target didn't respond with 200 OK", conn);
return;
}
InputStream inputStream = conn.getInputStream();
try (XContentParser parser = XContentType.SMILE.xContent().createParser(inputStream)) {
Map<String, Object> response = parser.map();
if (response.get("items") != null) {
ArrayList<Object> list = (ArrayList<Object>) response.get("items");
for (Object itemObject : list) {
Map<String, Object> actions = (Map<String, Object>) itemObject;
for (String actionKey : actions.keySet()) {
Map<String, Object> action = (Map<String, Object>) actions.get(actionKey);
if (action.get("error") != null) {
logger.error("{} failure (index:[{}] type: [{}]): {}", actionKey, action.get("_index"), action.get("_type"), action.get("error"));
}
}
}
}
}
}
@Override
protected void doExport(Collection<MarvelDoc> marvelDocs) throws Exception {
HttpURLConnection conn = openExportingConnection();
if (conn == null) {
return;
}
try {
for (MarvelDoc marvelDoc : marvelDocs) {
addMarvelDocToConnection(conn, marvelDoc);
}
sendCloseExportingConnection(conn);
} catch (IOException e) {
logger.error("error sending data to [{}]: {}", AgentUtils.santizeUrlPwds(conn.getURL()), AgentUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(e)));
}
}
@Override
protected void doStart() {
// not initializing keep alive worker here but rather upon first exporting.
// In the case we are sending metrics to the same ES as where the plugin is hosted
// we want to give it some time to start.
}
@Override
protected void doStop() {
if (keepAliveThread != null && keepAliveThread.isAlive()) {
keepAliveWorker.closed = true;
keepAliveThread.interrupt();
try {
keepAliveThread.join(6000);
} catch (InterruptedException e) {
// don't care.
}
}
}
@Override
protected void doClose() {
}
// used for testing
String[] getHosts() {
return hosts;
}
private String getIndexName() {
return indexPrefix + "-" + indexTimeFormatter.print(System.currentTimeMillis());
}
/**
* open a connection to any host, validating it has the template installed if needed
*
* @return a url connection to the selected host or null if no current host is available.
*/
private HttpURLConnection openAndValidateConnection(String method, String path) {
return openAndValidateConnection(method, path, null);
}
/**
* open a connection to any host, validating it has the template installed if needed
*
* @return a url connection to the selected host or null if no current host is available.
*/
private HttpURLConnection openAndValidateConnection(String method, String path, String contentType) {
if (hosts.length == 0) {
// Due to how Guice injection works and because HttpServer can be optional,
// we can't be 100% sure that the HttpServer is created when the ESExporter
// instance is created. This is specially true in integration tests.
// So if HttpServer is enabled in settings we can safely use the NodeService
// to retrieve the bound address.
BoundTransportAddress boundAddress = null;
if (httpEnabled) {
if ((httpServer != null) && (httpServer.lifecycleState() == Lifecycle.State.STARTED)) {
logger.debug("deriving host setting from httpServer");
boundAddress = httpServer.info().address();
} else if (nodeService.info().getHttp() != null) {
logger.debug("deriving host setting from node info API");
boundAddress = nodeService.info().getHttp().address();
}
} else {
logger.warn("http server is not enabled no hosts are manually configured");
return null;
}
String[] extractedHosts = AgentUtils.extractHostsFromAddress(boundAddress, logger);
if (extractedHosts == null || extractedHosts.length == 0) {
return null;
}
hosts = extractedHosts;
logger.trace("auto-resolved hosts to {}", extractedHosts);
boundToLocalNode = true;
}
// it's important to have boundToLocalNode persistent to prevent calls during shutdown (causing ugly exceptions)
if (boundToLocalNode && (httpServer != null) && (httpServer.lifecycleState() != Lifecycle.State.STARTED)) {
logger.debug("local node http server is not started. can't connect");
return null;
}
// out of for to move faulty hosts to the end
int hostIndex = 0;
try {
for (; hostIndex < hosts.length; hostIndex++) {
String host = hosts[hostIndex];
if (!checkedAndUploadedIndexTemplate) {
// check templates first on the host
checkedAndUploadedIndexTemplate = checkAndUploadIndexTemplate(host);
if (!checkedAndUploadedIndexTemplate) {
continue;
}
}
HttpURLConnection connection = openConnection(host, method, path, contentType);
if (connection != null) {
return connection;
}
// failed hosts - reset template check , someone may have restarted the target cluster and deleted
// it's data folder. be safe.
checkedAndUploadedIndexTemplate = false;
}
} finally {
if (hostIndex > 0 && hostIndex < hosts.length) {
logger.debug("moving [{}] failed hosts to the end of the list", hostIndex);
String[] newHosts = new String[hosts.length];
System.arraycopy(hosts, hostIndex, newHosts, 0, hosts.length - hostIndex);
System.arraycopy(hosts, 0, newHosts, hosts.length - hostIndex, hostIndex);
hosts = newHosts;
logger.debug("preferred target host is now [{}]", AgentUtils.santizeUrlPwds(hosts[0]));
}
}
logger.error("could not connect to any configured elasticsearch instances: [{}]", AgentUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts)));
return null;
}
/** open a connection to the given hosts, returning null when not successful * */
private HttpURLConnection openConnection(String host, String method, String path, @Nullable String contentType) {
try {
final URL url = AgentUtils.parseHostWithPath(host, path);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
if (conn instanceof HttpsURLConnection && sslSocketFactory != null) {
HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
httpsConn.setSSLSocketFactory(sslSocketFactory);
if (!hostnameVerification) {
httpsConn.setHostnameVerifier(TrustAllHostnameVerifier.INSTANCE);
}
}
conn.setRequestMethod(method);
conn.setConnectTimeout(timeoutInMillis);
conn.setReadTimeout(readTimeoutInMillis);
if (contentType != null) {
conn.setRequestProperty("Content-Type", contentType);
}
if (url.getUserInfo() != null) {
String basicAuth = "Basic " + Base64.encodeBytes(url.getUserInfo().getBytes("ISO-8859-1"));
conn.setRequestProperty("Authorization", basicAuth);
}
conn.setUseCaches(false);
if (method.equalsIgnoreCase("POST") || method.equalsIgnoreCase("PUT")) {
conn.setDoOutput(true);
}
conn.connect();
return conn;
} catch (URISyntaxException e) {
logErrorBasedOnLevel(e, "error parsing host [{}]", AgentUtils.santizeUrlPwds(host));
} catch (IOException e) {
logErrorBasedOnLevel(e, "error connecting to [{}]", AgentUtils.santizeUrlPwds(host));
}
return null;
}
private void logErrorBasedOnLevel(Throwable t, String msg, Object... params) {
logger.error(msg + " [" + AgentUtils.santizeUrlPwds(t.getMessage()) + "]", params);
if (logger.isDebugEnabled()) {
logger.debug(msg + ". full error details:\n[{}]", params, AgentUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(t)));
}
}
/**
* Checks if the index templates already exist and if not uploads it
* Any critical error that should prevent data exporting is communicated via an exception.
*
* @return true if template exists or was uploaded successfully.
*/
private boolean checkAndUploadIndexTemplate(final String host) {
byte[] template;
try {
template = Streams.copyToBytesFromClasspath("/marvel_index_template.json");
} catch (IOException e) {
// throwing an exception to stop exporting process - we don't want to send data unless
// we put in the template for it.
throw new RuntimeException("failed to load marvel_index_template.json", e);
}
try {
int expectedVersion = AgentUtils.parseIndexVersionFromTemplate(template);
if (expectedVersion < 0) {
throw new RuntimeException("failed to find an index version in pre-configured index template");
}
String queryString = "";
if (templateCheckTimeout != null) {
queryString = "?timeout=" + templateCheckTimeout;
}
HttpURLConnection conn = openConnection(host, "GET", "_template/marvel" + queryString, null);
if (conn == null) {
return false;
}
boolean hasTemplate = false;
if (conn.getResponseCode() == 200) {
// verify content.
InputStream is = conn.getInputStream();
byte[] existingTemplate = ByteStreams.toByteArray(is);
is.close();
int foundVersion = AgentUtils.parseIndexVersionFromTemplate(existingTemplate);
if (foundVersion < 0) {
logger.warn("found an existing index template but couldn't extract it's version. leaving it as is.");
hasTemplate = true;
} else if (foundVersion >= expectedVersion) {
logger.debug("accepting existing index template (version [{}], needed [{}])", foundVersion, expectedVersion);
hasTemplate = true;
} else {
logger.debug("replacing existing index template (version [{}], needed [{}])", foundVersion, expectedVersion);
}
}
// nothing there, lets create it
if (!hasTemplate) {
logger.debug("uploading index template");
conn = openConnection(host, "PUT", "_template/marvel" + queryString, XContentType.JSON.restContentType());
OutputStream os = conn.getOutputStream();
Streams.copy(template, os);
if (!(conn.getResponseCode() == 200 || conn.getResponseCode() == 201)) {
logConnectionError("error adding the marvel template to [" + host + "]", conn);
} else {
hasTemplate = true;
}
conn.getInputStream().close(); // close and release to connection pool.
}
return hasTemplate;
} catch (IOException e) {
logger.error("failed to verify/upload the marvel template to [{}]:\n{}", AgentUtils.santizeUrlPwds(host), AgentUtils.santizeUrlPwds(e.getMessage()));
return false;
}
}
private void logConnectionError(String msg, HttpURLConnection conn) {
InputStream inputStream = conn.getErrorStream();
String err = "";
if (inputStream != null) {
java.util.Scanner s = new java.util.Scanner(inputStream, "UTF-8").useDelimiter("\\A");
err = s.hasNext() ? s.next() : "";
}
try {
logger.error("{} response code [{} {}]. content: [{}]",
AgentUtils.santizeUrlPwds(msg), conn.getResponseCode(),
AgentUtils.santizeUrlPwds(conn.getResponseMessage()),
AgentUtils.santizeUrlPwds(err));
} catch (IOException e) {
logger.error("{}. connection had an error while reporting the error. tough life.", AgentUtils.santizeUrlPwds(msg));
}
}
@Override
public void onRefreshSettings(Settings settings) {
TimeValue newTimeout = settings.getAsTime(SETTINGS_TIMEOUT, null);
if (newTimeout != null) {
logger.info("connection timeout set to [{}]", newTimeout);
timeoutInMillis = (int) newTimeout.millis();
}
newTimeout = settings.getAsTime(SETTINGS_READ_TIMEOUT, null);
if (newTimeout != null) {
logger.info("connection read timeout set to [{}]", newTimeout);
readTimeoutInMillis = (int) newTimeout.millis();
}
String[] newHosts = settings.getAsArray(SETTINGS_HOSTS, null);
if (newHosts != null) {
logger.info("hosts set to [{}]", AgentUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(newHosts)));
this.hosts = newHosts;
this.checkedAndUploadedIndexTemplate = false;
this.boundToLocalNode = false;
}
Boolean newHostnameVerification = settings.getAsBoolean(SETTINGS_SSL_HOSTNAME_VERIFICATION, null);
if (newHostnameVerification != null) {
logger.info("hostname verification set to [{}]", newHostnameVerification);
this.hostnameVerification = newHostnameVerification;
}
}
protected void initKeepAliveThread() {
keepAliveThread = new Thread(keepAliveWorker, EsExecutors.threadName(settings, "keep_alive"));
keepAliveThread.setDaemon(true);
keepAliveThread.start();
}
/**
* Sadly we need to make sure we keep the connection open to the target ES a
* Java's connection pooling closes connections if idle for 5sec.
*/
class ConnectionKeepAliveWorker implements Runnable {
volatile boolean closed = false;
@Override
public void run() {
logger.trace("starting keep alive thread");
while (!closed) {
try {
Thread.sleep(1000);
if (closed) {
return;
}
String[] currentHosts = hosts;
if (currentHosts.length == 0) {
logger.trace("keep alive thread shutting down. no hosts defined");
return; // no hosts configured at the moment.
}
HttpURLConnection conn = openConnection(currentHosts[0], "GET", "", null);
if (conn == null) {
logger.trace("keep alive thread shutting down. failed to open connection to current host [{}]", AgentUtils.santizeUrlPwds(currentHosts[0]));
return;
} else {
conn.getInputStream().close(); // close and release to connection pool.
}
} catch (InterruptedException e) {
// ignore, if closed, good....
} catch (Throwable t) {
logger.debug("error in keep alive thread, shutting down (will be restarted after a successful connection has been made) {}",
AgentUtils.santizeUrlPwds(ExceptionsHelper.detailedMessage(t)));
return;
}
}
}
}
private static final String SETTINGS_SSL_PREFIX = SETTINGS_PREFIX + "ssl.";
public static final String SETTINGS_SSL_PROTOCOL = SETTINGS_SSL_PREFIX + "protocol";
public static final String SETTINGS_SSL_TRUSTSTORE = SETTINGS_SSL_PREFIX + "truststore.path";
public static final String SETTINGS_SSL_TRUSTSTORE_PASSWORD = SETTINGS_SSL_PREFIX + "truststore.password";
public static final String SETTINGS_SSL_TRUSTSTORE_ALGORITHM = SETTINGS_SSL_PREFIX + "truststore.algorithm";
public static final String SETTINGS_SSL_HOSTNAME_VERIFICATION = SETTINGS_SSL_PREFIX + "hostname_verification";
/** SSL Initialization * */
public SSLSocketFactory createSSLSocketFactory(Settings settings) {
SSLContext sslContext;
// Initialize sslContext
try {
String sslContextProtocol = settings.get(SETTINGS_SSL_PROTOCOL, "TLS");
String trustStore = settings.get(SETTINGS_SSL_TRUSTSTORE, System.getProperty("javax.net.ssl.trustStore"));
String trustStorePassword = settings.get(SETTINGS_SSL_TRUSTSTORE_PASSWORD, System.getProperty("javax.net.ssl.trustStorePassword"));
String trustStoreAlgorithm = settings.get(SETTINGS_SSL_TRUSTSTORE_ALGORITHM, System.getProperty("ssl.TrustManagerFactory.algorithm"));
if (trustStore == null) {
throw new RuntimeException("truststore is not configured, use " + SETTINGS_SSL_TRUSTSTORE);
}
if (trustStoreAlgorithm == null) {
trustStoreAlgorithm = TrustManagerFactory.getDefaultAlgorithm();
}
logger.debug("SSL: using trustStore[{}], trustAlgorithm[{}]", trustStore, trustStoreAlgorithm);
Path trustStorePath = environment.configFile().resolve(trustStore);
if (!Files.exists(trustStorePath)) {
throw new FileNotFoundException("Truststore at path [" + trustStorePath + "] does not exist");
}
TrustManager[] trustManagers;
try (InputStream trustStoreStream = Files.newInputStream(trustStorePath)) {
// Load TrustStore
KeyStore ks = KeyStore.getInstance("jks");
ks.load(trustStoreStream, trustStorePassword == null ? null : trustStorePassword.toCharArray());
// Initialize a trust manager factory with the trusted store
TrustManagerFactory trustFactory = TrustManagerFactory.getInstance(trustStoreAlgorithm);
trustFactory.init(ks);
// Retrieve the trust managers from the factory
trustManagers = trustFactory.getTrustManagers();
} catch (Exception e) {
throw new RuntimeException("Failed to initialize a TrustManagerFactory", e);
}
sslContext = SSLContext.getInstance(sslContextProtocol);
sslContext.init(null, trustManagers, null);
} catch (Exception e) {
throw new RuntimeException("[marvel.agent.exporter] failed to initialize the SSLContext", e);
}
return sslContext.getSocketFactory();
}
/**
* Trust all hostname verifier. This simply returns true to completely disable hostname verification
*/
static class TrustAllHostnameVerifier implements HostnameVerifier {
static final HostnameVerifier INSTANCE = new TrustAllHostnameVerifier();
private TrustAllHostnameVerifier() {
}
@Override
public boolean verify(String s, SSLSession sslSession) {
return true;
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
public abstract class MarvelDoc<T> implements ToXContent {
private final String clusterName;
private final String type;
private final long timestamp;
public MarvelDoc(String clusterName, String type, long timestamp) {
this.clusterName = clusterName;
this.type = type;
this.timestamp = timestamp;
}
public String clusterName() {
return clusterName;
}
public String type() {
return type;
}
public long timestamp() {
return timestamp;
}
public abstract T payload();
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.CLUSTER_NAME, clusterName());
DateTime timestampDateTime = new DateTime(timestamp(), DateTimeZone.UTC);
builder.field(Fields.TIMESTAMP, timestampDateTime.toString());
return builder;
}
static final class Fields {
static final XContentBuilderString CLUSTER_NAME = new XContentBuilderString("cluster_name");
static final XContentBuilderString TIMESTAMP = new XContentBuilderString("timestamp");
}
}

View File

@ -0,0 +1,146 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.support;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.*;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class AgentUtils {
public static XContentBuilder nodeToXContent(DiscoveryNode node, XContentBuilder builder) throws IOException {
return nodeToXContent(node, null, builder);
}
public static XContentBuilder nodeToXContent(DiscoveryNode node, Boolean isMasterNode, XContentBuilder builder) throws IOException {
builder.field("id", node.id());
builder.field("name", node.name());
builder.field("transport_address", node.address());
if (node.address().uniqueAddressTypeId() == 1) { // InetSocket
InetSocketTransportAddress address = (InetSocketTransportAddress) node.address();
InetSocketAddress inetSocketAddress = address.address();
InetAddress inetAddress = inetSocketAddress.getAddress();
if (inetAddress != null) {
builder.field("ip", inetAddress.getHostAddress());
builder.field("host", inetAddress.getHostName());
builder.field("ip_port", inetAddress.getHostAddress() + ":" + inetSocketAddress.getPort());
}
} else if (node.address().uniqueAddressTypeId() == 2) { // local transport
builder.field("ip_port", "_" + node.address()); // will end up being "_local[ID]"
}
builder.field("master_node", node.isMasterNode());
builder.field("data_node", node.isDataNode());
if (isMasterNode != null) {
builder.field("master", isMasterNode.booleanValue());
}
if (!node.attributes().isEmpty()) {
builder.startObject("attributes");
for (Map.Entry<String, String> attr : node.attributes().entrySet()) {
builder.field(attr.getKey(), attr.getValue());
}
builder.endObject();
}
return builder;
}
public static String nodeDescription(DiscoveryNode node) {
StringBuilder builder = new StringBuilder().append("[").append(node.name()).append("]");
if (node.address().uniqueAddressTypeId() == 1) { // InetSocket
InetSocketTransportAddress address = (InetSocketTransportAddress) node.address();
InetSocketAddress inetSocketAddress = address.address();
InetAddress inetAddress = inetSocketAddress.getAddress();
if (inetAddress != null) {
builder.append("[").append(inetAddress.getHostAddress()).append(":").append(inetSocketAddress.getPort()).append("]");
}
}
return builder.toString();
}
public static String[] extractHostsFromAddress(BoundTransportAddress boundAddress, ESLogger logger) {
if (boundAddress == null || boundAddress.boundAddress() == null) {
logger.debug("local http server is not yet started. can't connect");
return null;
}
if (boundAddress.boundAddress().uniqueAddressTypeId() != 1) {
logger.error("local node is not bound via the http transport. can't connect");
return null;
}
InetSocketTransportAddress address = (InetSocketTransportAddress) boundAddress.boundAddress();
InetSocketAddress inetSocketAddress = address.address();
InetAddress inetAddress = inetSocketAddress.getAddress();
if (inetAddress == null) {
logger.error("failed to extract the ip address of current node.");
return null;
}
String host = inetAddress.getHostAddress();
if (host.indexOf(":") >= 0) {
// ipv6
host = "[" + host + "]";
}
return new String[]{host + ":" + inetSocketAddress.getPort()};
}
public static URL parseHostWithPath(String host, String path) throws URISyntaxException, MalformedURLException {
if (!host.contains("://")) {
// prefix with http
host = "http://" + host;
}
if (!host.endsWith("/")) {
// make sure we can safely resolves sub paths and not replace parent folders
host = host + "/";
}
URL hostUrl = new URL(host);
if (hostUrl.getPort() == -1) {
// url has no port, default to 9200 - sadly we need to rebuild..
StringBuilder newUrl = new StringBuilder(hostUrl.getProtocol() + "://");
if (hostUrl.getUserInfo() != null) {
newUrl.append(hostUrl.getUserInfo()).append("@");
}
newUrl.append(hostUrl.getHost()).append(":9200").append(hostUrl.toURI().getPath());
hostUrl = new URL(newUrl.toString());
}
return new URL(hostUrl, path);
}
public static int parseIndexVersionFromTemplate(byte[] template) throws UnsupportedEncodingException {
Pattern versionRegex = Pattern.compile("marvel.index_format\"\\s*:\\s*\"?(\\d+)\"?");
Matcher matcher = versionRegex.matcher(new String(template, "UTF-8"));
if (matcher.find()) {
return Integer.parseInt(matcher.group(1));
} else {
return -1;
}
}
private static final String userInfoChars = "\\w-\\._~!$&\\'\\(\\)*+,;=%";
private static Pattern urlPwdSanitizer = Pattern.compile("([" + userInfoChars + "]+?):[" + userInfoChars + "]+?@");
public static String santizeUrlPwds(Object text) {
Matcher matcher = urlPwdSanitizer.matcher(text.toString());
return matcher.replaceAll("$1:XXXXXX@");
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.license;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.license.plugin.LicenseVersion;
import org.elasticsearch.marvel.MarvelVersion;
public class LicenseModule extends AbstractModule {
public LicenseModule() {
verifyLicensePlugin();
}
@Override
protected void configure() {
bind(LicenseService.class).asEagerSingleton();
}
private void verifyLicensePlugin() {
try {
getClass().getClassLoader().loadClass("org.elasticsearch.license.plugin.LicensePlugin");
} catch (ClassNotFoundException cnfe) {
throw new IllegalStateException("marvel plugin requires the license plugin to be installed");
}
if (LicenseVersion.CURRENT.before(MarvelVersion.CURRENT.minLicenseCompatibilityVersion)) {
throw new ElasticsearchException("marvel [" + MarvelVersion.CURRENT +
"] requires minimum license plugin version [" + MarvelVersion.CURRENT.minLicenseCompatibilityVersion +
"], but installed license plugin version is [" + LicenseVersion.CURRENT + "]");
}
}
}

View File

@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.license;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.core.License;
import org.elasticsearch.license.plugin.core.LicensesClientService;
import org.elasticsearch.license.plugin.core.LicensesService;
import org.elasticsearch.marvel.MarvelPlugin;
import org.elasticsearch.marvel.mode.Mode;
import java.util.Arrays;
import java.util.Collection;
import java.util.Locale;
public class LicenseService extends AbstractLifecycleComponent<LicenseService> {
public static final String FEATURE_NAME = MarvelPlugin.NAME;
private static final LicensesService.TrialLicenseOptions TRIAL_LICENSE_OPTIONS =
new LicensesService.TrialLicenseOptions(TimeValue.timeValueHours(30 * 24), 1000);
private static final FormatDateTimeFormatter DATE_FORMATTER = Joda.forPattern("EEEE, MMMMM dd, yyyy", Locale.ROOT);
private final LicensesClientService clientService;
private final Collection<LicensesService.ExpirationCallback> expirationLoggers;
private volatile Mode mode;
@Inject
public LicenseService(Settings settings, LicensesClientService clientService) {
super(settings);
this.clientService = clientService;
this.mode = Mode.LITE;
this.expirationLoggers = Arrays.asList(
new LicensesService.ExpirationCallback.Pre(days(7), days(30), days(1)) {
@Override
public void on(License license, LicensesService.ExpirationStatus status) {
logger.error("\n" +
"#\n" +
"# Marvel license will expire on [{}].\n" +
"# Have a new license? please update it. Otherwise, please reach out to your support contact.\n" +
"#", DATE_FORMATTER.printer().print(license.expiryDate()));
}
},
new LicensesService.ExpirationCallback.Pre(days(0), days(7), minutes(10)) {
@Override
public void on(License license, LicensesService.ExpirationStatus status) {
logger.error("\n" +
"#\n" +
"# Marvel license will expire on [{}].\n" +
"# Have a new license? please update it. Otherwise, please reach out to your support contact.\n" +
"#", DATE_FORMATTER.printer().print(license.expiryDate()));
}
},
new LicensesService.ExpirationCallback.Post(days(0), null, minutes(10)) {
@Override
public void on(License license, LicensesService.ExpirationStatus status) {
logger.error("\n" +
"#\n" +
"# MARVEL LICENSE WAS EXPIRED ON [{}].\n" +
"# HAVE A NEW LICENSE? PLEASE UPDATE IT. OTHERWISE, PLEASE REACH OUT TO YOUR SUPPORT CONTACT.\n" +
"#", DATE_FORMATTER.printer().print(license.expiryDate()));
}
}
);
}
@Override
protected void doStart() throws ElasticsearchException {
clientService.register(FEATURE_NAME, TRIAL_LICENSE_OPTIONS, expirationLoggers, new InternalListener(this));
}
@Override
protected void doStop() throws ElasticsearchException {
}
@Override
protected void doClose() throws ElasticsearchException {
}
static TimeValue days(int days) {
return TimeValue.timeValueHours(days * 24);
}
static TimeValue minutes(int minutes) {
return TimeValue.timeValueMinutes(minutes);
}
/**
* @return the current marvel's operating mode
*/
public Mode mode() {
return mode;
}
class InternalListener implements LicensesClientService.Listener {
private final LicenseService service;
public InternalListener(LicenseService service) {
this.service = service;
}
@Override
public void onEnabled(License license) {
try {
service.mode = Mode.fromName(license.type());
} catch (IllegalArgumentException e) {
service.mode = Mode.LITE;
}
}
@Override
public void onDisabled(License license) {
service.mode = Mode.LITE;
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.mode;
import org.elasticsearch.ElasticsearchException;
import java.util.Locale;
/**
* Marvel's operating mode
*/
public enum Mode {
/**
* Marvel runs in downgraded mode
*/
TRIAL(0),
/**
* Marvel runs in downgraded mode
*/
LITE(0),
/**
* Marvel runs in normal mode
*/
STANDARD(1);
private final byte id;
Mode(int id) {
this.id = (byte) id;
}
public byte getId() {
return id;
}
public static Mode fromId(byte id) {
switch (id) {
case 0:
return TRIAL;
case 1:
return LITE;
case 2:
return STANDARD;
case 3:
default:
throw new ElasticsearchException("unknown marvel mode id [" + id + "]");
}
}
public static Mode fromName(String name) {
switch (name.toLowerCase(Locale.ROOT)) {
case "trial": return TRIAL;
case "lite": return LITE;
case "standard" : return STANDARD;
default:
throw new ElasticsearchException("unknown marvel mode name [" + name + "]");
}
}
}

View File

@ -0,0 +1,2 @@
plugin=org.elasticsearch.marvel.MarvelPlugin
version=${project.version}

View File

@ -0,0 +1,446 @@
{
"template": ".marvel*",
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"default": {
"type": "standard",
"stopwords": "_none_"
}
}
},
"mapper.dynamic": true,
"marvel.index_format": 6
},
"mappings": {
"_default_": {
"dynamic_templates": [
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "multi_field",
"fields": {
"{name}": {
"type": "string",
"index": "analyzed",
"omit_norms": true
},
"raw": {
"type": "string",
"index": "not_analyzed",
"ignore_above": 256
}
}
}
}
}
]
},
"node_stats": {
"properties": {
"breakers": {
"properties": {
"fielddata": {
"properties": {
"estimated_size_in_bytes": {
"type": "long"
},
"tripped": {
"type": "long"
},
"limit_size_in_bytes": {
"type": "long"
}
}
},
"request": {
"properties": {
"estimated_size_in_bytes": {
"type": "long"
},
"tripped": {
"type": "long"
},
"limit_size_in_bytes": {
"type": "long"
}
}
},
"parent": {
"properties": {
"estimated_size_in_bytes": {
"type": "long"
},
"tripped": {
"type": "long"
},
"limit_size_in_bytes": {
"type": "long"
}
}
}
}
},
"fs": {
"properties": {
"total": {
"properties": {
"disk_io_op": {
"type": "long"
},
"disk_reads": {
"type": "long"
},
"disk_writes": {
"type": "long"
},
"disk_io_size_in_bytes": {
"type": "long"
},
"disk_read_size_in_bytes": {
"type": "long"
},
"disk_write_size_in_bytes": {
"type": "long"
}
}
}
}
},
"jvm": {
"properties": {
"buffer_pools": {
"properties": {
"direct": {
"properties": {
"used_in_bytes": {
"type": "long"
}
}
},
"mapped": {
"properties": {
"used_in_bytes": {
"type": "long"
}
}
}
}
},
"gc": {
"properties": {
"collectors": {
"properties": {
"young": {
"properties": {
"collection_count": {
"type": "long"
},
"collection_time_in_millis": {
"type": "long"
}
}
},
"old": {
"properties": {
"collection_count": {
"type": "long"
},
"collection_time_in_millis": {
"type": "long"
}
}
}
}
}
}
}
}
},
"indices": {
"properties": {
"indexing": {
"properties": {
"throttle_time_in_millis": {
"type": "long"
}
}
},
"percolate": {
"properties": {
"total": {
"type": "long"
},
"time_in_millis": {
"type": "long"
},
"queries": {
"type": "long"
},
"memory_size_in_bytes": {
"type": "long"
}
}
},
"segments": {
"properties": {
"index_writer_memory_in_bytes": {
"type": "long"
},
"version_map_memory_in_bytes": {
"type": "long"
},
"index_writer_max_memory_in_bytes": {
"type": "long"
}
}
},
"query_cache": {
"properties": {
"memory_size_in_bytes": {
"type": "long"
},
"evictions": {
"type": "long"
},
"hit_count": {
"type": "long"
},
"miss_count": {
"type": "long"
}
}
}
}
},
"os": {
"properties": {
"load_average": {
"properties": {
"1m": {
"type": "float"
},
"5m": {
"type": "float"
},
"15m": {
"type": "float"
}
}
}
}
},
"thread_pool": {
"properties": {
"listener": {
"properties": {
"threads": {
"type": "long"
},
"rejected": {
"type": "long"
},
"completed": {
"type": "long"
},
"queue": {
"type": "long"
},
"largest": {
"type": "long"
}
}
}
}
}
}
},
"index_stats": {
"properties": {
"index": {
"type": "multi_field",
"fields": {
"index": {
"type": "string",
"norms": {
"enabled": false
}
},
"raw": {
"type": "string",
"index": "not_analyzed",
"norms": {
"enabled": false
},
"index_options": "docs",
"include_in_all": false,
"ignore_above": 256
}
}
},
"total": {
"properties": {
"fielddata": {
"properties": {
"memory_size_in_bytes": {
"type": "long"
}
}
},
"indexing": {
"properties": {
"throttle_time_in_millis": {
"type": "long"
}
}
},
"merges": {
"properties": {
"total_size_in_bytes": {
"type": "long"
}
}
},
"percolate": {
"properties": {
"total": {
"type": "long"
},
"time_in_millis": {
"type": "long"
},
"queries": {
"type": "long"
},
"memory_size_in_bytes": {
"type": "long"
}
}
},
"search": {
"properties": {
"query": {
"properties": {
"query_total": {
"type": "long"
}
}
}
}
},
"segments": {
"properties": {
"index_writer_memory_in_bytes": {
"type": "long"
},
"version_map_memory_in_bytes": {
"type": "long"
},
"index_writer_max_memory_in_bytes": {
"type": "long"
}
}
},
"query_cache": {
"properties": {
"memory_size_in_bytes": {
"type": "long"
},
"evictions": {
"type": "long"
},
"hit_count": {
"type": "long"
},
"miss_count": {
"type": "long"
}
}
}
}
},
"primaries": {
"properties": {
"docs": {
"properties": {
"count": {
"type": "long"
}
}
},
"indexing": {
"properties": {
"index_total": {
"type": "long"
}
}
}
}
}
}
},
"cluster_event": {},
"shard_event": {},
"indices_stats": {
"properties": {
"primaries": {
"properties": {
"indexing": {
"properties": {
"index_total": {
"type": "long"
}
}
},
"docs": {
"properties": {
"count": {
"type": "long"
}
}
}
}
},
"total": {
"properties": {
"search": {
"properties": {
"query_total": {
"type": "long"
}
}
}
}
}
}
},
"cluster_stats": {},
"index_event": {},
"node_event": {},
"routing_event": {},
"cluster_state": {
"properties": {
"blocks": {
"type": "object",
"enabled": false
},
"nodes": {
"type": "object",
"enabled": false
},
"routing_nodes": {
"type": "object",
"enabled": false
},
"routing_table": {
"type": "object",
"enabled": false
}
}
}
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.Collection;
import static org.hamcrest.Matchers.is;
public class MarvelPluginClientTests extends ElasticsearchTestCase {
@Test
public void testModulesWithClientSettings() {
Settings settings = Settings.builder()
.put(Client.CLIENT_TYPE_SETTING, TransportClient.CLIENT_TYPE)
.build();
MarvelPlugin plugin = new MarvelPlugin(settings);
assertThat(plugin.isEnabled(), is(false));
Collection<Class<? extends Module>> modules = plugin.modules();
assertThat(modules.size(), is(0));
}
@Test
public void testModulesWithNodeSettings() {
// these settings mimic what ES does when running as a node...
Settings settings = Settings.builder()
.put(Client.CLIENT_TYPE_SETTING, "node")
.build();
MarvelPlugin plugin = new MarvelPlugin(settings);
assertThat(plugin.isEnabled(), is(true));
Collection<Class<? extends Module>> modules = plugin.modules();
assertThat(modules.size(), is(1));
}
}

View File

@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.info.PluginInfo;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.AgentService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.tribe.TribeService;
import org.junit.Test;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST;
import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope = TEST, transportClientRatio = 0, numClientNodes = 0, numDataNodes = 0)
public class MarvelPluginTests extends ElasticsearchIntegrationTest {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.settingsBuilder()
.put(super.nodeSettings(nodeOrdinal))
.put("plugin.types", MarvelPlugin.class.getName())
.put(PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false)
.build();
}
@Test
public void testMarvelEnabled() {
internalCluster().startNode(Settings.builder().put(MarvelPlugin.ENABLED, true).build());
assertPluginIsLoaded();
assertServiceIsBound(AgentService.class);
}
@Test
public void testMarvelDisabled() {
internalCluster().startNode(Settings.builder().put(MarvelPlugin.ENABLED, false).build());
assertPluginIsLoaded();
assertServiceIsNotBound(AgentService.class);
}
@Test
public void testMarvelDisabledOnTribeNode() {
internalCluster().startNode(Settings.builder().put(TribeService.TRIBE_NAME, "t1").build());
assertPluginIsLoaded();
assertServiceIsNotBound(AgentService.class);
}
private void assertPluginIsLoaded() {
NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().setPlugins(true).get();
for (NodeInfo nodeInfo : response) {
assertNotNull(nodeInfo.getPlugins());
boolean found = false;
for (PluginInfo plugin : nodeInfo.getPlugins().getInfos()) {
assertNotNull(plugin);
if (MarvelPlugin.NAME.equals(plugin.getName())) {
found = true;
break;
}
}
assertThat("marvel plugin not found", found, equalTo(true));
}
}
private void assertServiceIsBound(Class klass) {
try {
Object binding = internalCluster().getDataNodeInstance(klass);
assertNotNull(binding);
assertTrue(klass.isInstance(binding));
} catch (Exception e) {
fail("no service bound for class " + klass.getSimpleName());
}
}
private void assertServiceIsNotBound(Class klass) {
try {
internalCluster().getDataNodeInstance(klass);
fail("should have thrown an exception about missing implementation");
} catch (Exception ce) {
assertThat("message contains error about missing implemention: " + ce.getMessage(),
ce.getMessage().contains("No implementation"), equalTo(true));
}
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
public class MarvelVersionTests extends ElasticsearchTestCase {
@Test
public void testVersionFromString() {
assertThat(MarvelVersion.fromString("2.0.0-beta1"), equalTo(MarvelVersion.V_2_0_0_Beta1));
}
@Test
public void testVersionNumber() {
assertThat(MarvelVersion.V_2_0_0_Beta1.number(), equalTo("2.0.0-beta1"));
}
}

View File

@ -0,0 +1,168 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.marvel.agent.support.AgentUtils;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLEncoder;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
public class AgentUtilsTests extends ElasticsearchTestCase {
@Test
public void testVersionIsExtractableFromIndexTemplate() throws IOException {
byte[] template = Streams.copyToBytesFromClasspath("/marvel_index_template.json");
MatcherAssert.assertThat(AgentUtils.parseIndexVersionFromTemplate(template), Matchers.greaterThan(0));
}
@Test
public void testHostParsing() throws MalformedURLException, URISyntaxException {
URL url = AgentUtils.parseHostWithPath("localhost:9200", "");
verifyUrl(url, "http", "localhost", 9200, "/");
url = AgentUtils.parseHostWithPath("localhost", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk");
url = AgentUtils.parseHostWithPath("http://localhost:9200", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk");
url = AgentUtils.parseHostWithPath("http://localhost", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk");
url = AgentUtils.parseHostWithPath("https://localhost:9200", "_bulk");
verifyUrl(url, "https", "localhost", 9200, "/_bulk");
url = AgentUtils.parseHostWithPath("https://boaz-air.local:9200", "_bulk");
verifyUrl(url, "https", "boaz-air.local", 9200, "/_bulk");
url = AgentUtils.parseHostWithPath("boaz:test@localhost:9200", "");
verifyUrl(url, "http", "localhost", 9200, "/", "boaz:test");
url = AgentUtils.parseHostWithPath("boaz:test@localhost", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test");
url = AgentUtils.parseHostWithPath("http://boaz:test@localhost:9200", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test");
url = AgentUtils.parseHostWithPath("http://boaz:test@localhost", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/_bulk", "boaz:test");
url = AgentUtils.parseHostWithPath("https://boaz:test@localhost:9200", "_bulk");
verifyUrl(url, "https", "localhost", 9200, "/_bulk", "boaz:test");
url = AgentUtils.parseHostWithPath("boaz:test@localhost:9200/suburl", "");
verifyUrl(url, "http", "localhost", 9200, "/suburl/", "boaz:test");
url = AgentUtils.parseHostWithPath("boaz:test@localhost:9200/suburl/", "");
verifyUrl(url, "http", "localhost", 9200, "/suburl/", "boaz:test");
url = AgentUtils.parseHostWithPath("localhost/suburl", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/suburl/_bulk");
url = AgentUtils.parseHostWithPath("http://boaz:test@localhost:9200/suburl/suburl1", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/suburl/suburl1/_bulk", "boaz:test");
url = AgentUtils.parseHostWithPath("http://boaz:test@localhost/suburl", "_bulk");
verifyUrl(url, "http", "localhost", 9200, "/suburl/_bulk", "boaz:test");
url = AgentUtils.parseHostWithPath("https://boaz:test@localhost:9200/suburl", "_bulk");
verifyUrl(url, "https", "localhost", 9200, "/suburl/_bulk", "boaz:test");
url = AgentUtils.parseHostWithPath("https://user:test@server_with_underscore:9300", "_bulk");
verifyUrl(url, "https", "server_with_underscore", 9300, "/_bulk", "user:test");
url = AgentUtils.parseHostWithPath("user:test@server_with_underscore:9300", "_bulk");
verifyUrl(url, "http", "server_with_underscore", 9300, "/_bulk", "user:test");
url = AgentUtils.parseHostWithPath("server_with_underscore:9300", "_bulk");
verifyUrl(url, "http", "server_with_underscore", 9300, "/_bulk");
url = AgentUtils.parseHostWithPath("server_with_underscore", "_bulk");
verifyUrl(url, "http", "server_with_underscore", 9200, "/_bulk");
url = AgentUtils.parseHostWithPath("https://user:test@server-dash:9300", "_bulk");
verifyUrl(url, "https", "server-dash", 9300, "/_bulk", "user:test");
url = AgentUtils.parseHostWithPath("user:test@server-dash:9300", "_bulk");
verifyUrl(url, "http", "server-dash", 9300, "/_bulk", "user:test");
url = AgentUtils.parseHostWithPath("server-dash:9300", "_bulk");
verifyUrl(url, "http", "server-dash", 9300, "/_bulk");
url = AgentUtils.parseHostWithPath("server-dash", "_bulk");
verifyUrl(url, "http", "server-dash", 9200, "/_bulk");
}
void verifyUrl(URL url, String protocol, String host, int port, String path) throws URISyntaxException {
assertThat(url.getProtocol(), equalTo(protocol));
assertThat(url.getHost(), equalTo(host));
assertThat(url.getPort(), equalTo(port));
assertThat(url.toURI().getPath(), equalTo(path));
}
void verifyUrl(URL url, String protocol, String host, int port, String path, String userInfo) throws URISyntaxException {
verifyUrl(url, protocol, host, port, path);
assertThat(url.getUserInfo(), equalTo(userInfo));
}
@Test
public void sanitizeUrlPadTest() throws UnsupportedEncodingException {
String pwd = URLEncoder.encode(randomRealisticUnicodeOfCodepointLengthBetween(3, 20), "UTF-8");
String[] inputs = new String[]{
"https://boaz:" + pwd + "@hostname:9200",
"http://boaz:" + pwd + "@hostname:9200",
"boaz:" + pwd + "@hostname",
"boaz:" + pwd + "@hostname/hello",
"Parse exception in [boaz:" + pwd + "@hostname:9200,boaz1:" + pwd + "@hostname \n" +
"caused: by exception ,boaz1:" + pwd + "@hostname",
"failed to upload index template, stopping export\n" +
"java.lang.RuntimeException: failed to load/verify index template\n" +
" at org.elasticsearch.marvel.agent.exporter.ESExporter.checkAndUploadIndexTemplate(ESExporter.java:525)\n" +
" at org.elasticsearch.marvel.agent.exporter.ESExporter.openExportingConnection(ESExporter.java:213)\n" +
" at org.elasticsearch.marvel.agent.exporter.ESExporter.exportXContent(ESExporter.java:285)\n" +
" at org.elasticsearch.marvel.agent.exporter.ESExporter.exportClusterStats(ESExporter.java:206)\n" +
" at org.elasticsearch.marvel.agent.AgentService$ExportingWorker.exportClusterStats(AgentService.java:288)\n" +
" at org.elasticsearch.marvel.agent.AgentService$ExportingWorker.run(AgentService.java:245)\n" +
" at java.lang.Thread.run(Thread.java:745)\n" +
"Caused by: java.io.IOException: Server returned HTTP response code: 401 for URL: http://marvel_exporter:" + pwd + "@localhost:9200/_template/marvel\n" +
" at sun.reflect.GeneratedConstructorAccessor3.createMarvelDoc(Unknown Source)\n" +
" at sun.reflect.DelegatingConstructorAccessorImpl.createMarvelDoc(DelegatingConstructorAccessorImpl.java:45)\n" +
" at java.lang.reflect.Constructor.createMarvelDoc(Constructor.java:526)\n" +
" at sun.net.www.protocol.http.HttpURLConnection$6.run(HttpURLConnection.java:1675)\n" +
" at sun.net.www.protocol.http.HttpURLConnection$6.run(HttpURLConnection.java:1673)\n" +
" at java.security.AccessController.doPrivileged(Native Method)\n" +
" at sun.net.www.protocol.http.HttpURLConnection.getChainedException(HttpURLConnection.java:1671)\n" +
" at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1244)\n" +
" at org.elasticsearch.marvel.agent.exporter.ESExporter.checkAndUploadIndexTemplate(ESExporter.java:519)\n" +
" ... 6 more\n" +
"Caused by: java.io.IOException: Server returned HTTP response code: 401 for URL: http://marvel_exporter:" + pwd + "@localhost:9200/_template/marvel\n" +
" at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1626)\n" +
" at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:468)\n" +
" at org.elasticsearch.marvel.agent.exporter.ESExporter.checkAndUploadIndexTemplate(ESExporter.java:514)\n" +
" ... 6 more"
};
for (String input : inputs) {
String sanitized = AgentUtils.santizeUrlPwds(input);
assertThat(sanitized, not(containsString(pwd)));
}
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.collector.indices;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.junit.Test;
import java.util.Collection;
import java.util.Iterator;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.*;
public class IndexCollectorTests extends ElasticsearchSingleNodeTest {
@Test
public void testIndexCollectorNoIndices() throws Exception {
Collection<MarvelDoc> results = newIndexCollector().doCollect();
assertThat(results, is(empty()));
}
@Test
public void testIndexCollectorOneIndex() throws Exception {
int nbDocs = randomIntBetween(1, 20);
for (int i = 0; i < nbDocs; i++) {
client().prepareIndex("test", "test").setSource("num", i).get();
}
client().admin().indices().prepareRefresh().get();
assertHitCount(client().prepareCount().get(), nbDocs);
Collection<MarvelDoc> results = newIndexCollector().doCollect();
assertThat(results, hasSize(1));
MarvelDoc marvelDoc = results.iterator().next();
assertNotNull(marvelDoc);
assertThat(marvelDoc, instanceOf(IndexMarvelDoc.class));
IndexMarvelDoc indexMarvelDoc = (IndexMarvelDoc) marvelDoc;
assertThat(indexMarvelDoc.clusterName(), equalTo(client().admin().cluster().prepareHealth().get().getClusterName()));
assertThat(indexMarvelDoc.timestamp(), greaterThan(0L));
assertThat(indexMarvelDoc.type(), equalTo(IndexCollector.TYPE));
assertThat(indexMarvelDoc.getIndex(), equalTo("test"));
assertNotNull(indexMarvelDoc.getDocs());
assertThat(indexMarvelDoc.getDocs().getCount(), equalTo((long) nbDocs));
assertNotNull(indexMarvelDoc.getStore());
assertThat(indexMarvelDoc.getStore().getSizeInBytes(), greaterThan(0L));
assertThat(indexMarvelDoc.getStore().getThrottleTimeInMillis(), equalTo(0L));
assertNotNull(indexMarvelDoc.getIndexing());
assertThat(indexMarvelDoc.getIndexing().getThrottleTimeInMillis(), equalTo(0L));
}
@Test
public void testIndexCollectorMultipleIndices() throws Exception {
int nbIndices = randomIntBetween(1, 5);
int[] docsPerIndex = new int[nbIndices];
for (int i = 0; i < nbIndices; i++) {
docsPerIndex[i] = randomIntBetween(1, 20);
for (int j = 0; j < docsPerIndex[i]; j++) {
client().prepareIndex("test-" + i, "test").setSource("num", i).get();
}
}
String clusterName = client().admin().cluster().prepareHealth().get().getClusterName();
client().admin().indices().prepareRefresh().get();
for (int i = 0; i < nbIndices; i++) {
assertHitCount(client().prepareCount("test-" + i).get(), docsPerIndex[i]);
}
Collection<MarvelDoc> results = newIndexCollector().doCollect();
assertThat(results, hasSize(nbIndices));
for (int i = 0; i < nbIndices; i++) {
boolean found = false;
Iterator<MarvelDoc> it = results.iterator();
while (!found && it.hasNext()) {
MarvelDoc marvelDoc = it.next();
assertThat(marvelDoc, instanceOf(IndexMarvelDoc.class));
IndexMarvelDoc indexMarvelDoc = (IndexMarvelDoc) marvelDoc;
if (indexMarvelDoc.getIndex().equals("test-" + i)) {
assertThat(indexMarvelDoc.clusterName(), equalTo(clusterName));
assertThat(indexMarvelDoc.timestamp(), greaterThan(0L));
assertThat(indexMarvelDoc.type(), equalTo(IndexCollector.TYPE));
assertNotNull(indexMarvelDoc.getDocs());
assertThat(indexMarvelDoc.getDocs().getCount(), equalTo((long) docsPerIndex[i]));
assertNotNull(indexMarvelDoc.getStore());
assertThat(indexMarvelDoc.getStore().getSizeInBytes(), greaterThan(0L));
assertThat(indexMarvelDoc.getStore().getThrottleTimeInMillis(), equalTo(0L));
assertNotNull(indexMarvelDoc.getIndexing());
assertThat(indexMarvelDoc.getIndexing().getThrottleTimeInMillis(), equalTo(0L));
found = true;
}
}
assertThat("could not find collected stats for index [test-" + i + "]", found, is(true));
}
}
private IndexCollector newIndexCollector() {
return new IndexCollector(getInstanceFromNode(Settings.class), getInstanceFromNode(ClusterService.class), getInstanceFromNode(ClusterName.class), client());
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.collector.indices;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
public class IndexMarvelDocTests extends ElasticsearchTestCase {
@Test
public void testCreateMarvelDoc() {
String cluster = randomUnicodeOfLength(10);
String type = randomUnicodeOfLength(10);
long timestamp = randomLong();
String index = randomUnicodeOfLength(10);
long docsCount = randomLong();
long storeSize = randomLong();
long storeThrottle = randomLong();
long indexingThrottle = randomLong();
IndexMarvelDoc marvelDoc = IndexMarvelDoc.createMarvelDoc(cluster, type, timestamp,
index, docsCount, storeSize, storeThrottle, indexingThrottle);
assertNotNull(marvelDoc);
assertThat(marvelDoc.clusterName(), equalTo(cluster));
assertThat(marvelDoc.type(), equalTo(type));
assertThat(marvelDoc.timestamp(), equalTo(timestamp));
assertThat(marvelDoc.getIndex(), equalTo(index));
assertNotNull(marvelDoc.getDocs());
assertThat(marvelDoc.getDocs().getCount(), equalTo(docsCount));
assertNotNull(marvelDoc.getStore());
assertThat(marvelDoc.getStore().getSizeInBytes(), equalTo(storeSize));
assertThat(marvelDoc.getStore().getThrottleTimeInMillis(), equalTo(storeThrottle));
assertNotNull(marvelDoc.getIndexing());
assertThat(marvelDoc.getIndexing().getThrottleTimeInMillis(), equalTo(indexingThrottle));
}
}

View File

@ -0,0 +1,230 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.AgentService;
import org.elasticsearch.marvel.agent.collector.indices.IndexMarvelDoc;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
// Transport Client instantiation also calls the marvel plugin, which then fails to find modules
@ClusterScope(transportClientRatio = 0.0, scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class HttpESExporterTests extends ElasticsearchIntegrationTest {
@Test
public void testHttpServerOff() {
Settings.Builder builder = Settings.builder()
.put(AgentService.SETTINGS_INTERVAL, "200m")
.put(Node.HTTP_ENABLED, false);
internalCluster().startNode(builder);
HttpESExporter httpEsExporter = getEsExporter();
logger.info("trying exporting despite of no target");
httpEsExporter.export(ImmutableList.of(newRandomMarvelDoc()));
}
/*
@Test
public void testLargeClusterStateSerialization() throws InterruptedException {
// make sure no other exporting is done (quicker)..
internalCluster().startNode(Settings.builder().put(AgentService.SETTINGS_INTERVAL, "200m").put(Node.HTTP_ENABLED, true));
ESExporter esExporter = internalCluster().getInstance(ESExporter.class);
DiscoveryNodes.Builder nodesBuilder = new DiscoveryNodes.Builder();
int nodeCount = randomIntBetween(10, 200);
for (int i = 0; i < nodeCount; i++) {
nodesBuilder.put(new DiscoveryNode("node_" + i, new LocalTransportAddress("node_" + i), Version.CURRENT));
}
// get the current cluster state rather then construct one because the constructors have changed across ES versions
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
ClusterState state = ClusterState.builder(clusterService.state()).nodes(nodesBuilder).build();
logger.info("exporting cluster state with {} nodes", state.nodes().size());
esExporter.exportEvents(new Event[]{
new ClusterEvent.ClusterStateChange(1234l, state, "test", ClusterHealthStatus.GREEN, "testing_1234", "test_source_unique")
});
logger.info("done exporting");
ensureYellow();
client().admin().indices().prepareRefresh(".marvel-*").get();
assertHitCount(client().prepareSearch().setQuery(QueryBuilders.termQuery("event_source", "test_source_unique")).get(), 1);
}
*/
@Test
@LuceneTestCase.Slow
public void testTemplateAdditionDespiteOfLateClusterForming() {
Settings.Builder builder = Settings.builder()
.put(AgentService.SETTINGS_INTERVAL, "200m")
.put(Node.HTTP_ENABLED, true)
.put("discovery.type", "zen")
.put("discovery.zen.ping_timeout", "1s")
.put("discovery.initial_state_timeout", "100ms")
.put("discovery.zen.minimum_master_nodes", 2)
.put(HttpESExporter.SETTINGS_BULK_TIMEOUT, "1s")
.put(HttpESExporter.SETTINGS_CHECK_TEMPLATE_TIMEOUT, "1s");
internalCluster().startNode(builder);
HttpESExporter httpEsExporter = getEsExporter();
logger.info("exporting events while there is no cluster");
httpEsExporter.export(ImmutableList.of(newRandomMarvelDoc()));
logger.info("bringing up a second node");
internalCluster().startNode(builder);
ensureGreen();
logger.info("exporting a second event");
httpEsExporter.export(ImmutableList.of(newRandomMarvelDoc()));
logger.info("verifying template is inserted");
assertMarvelTemplate();
}
private void assertMarvelTemplate() {
boolean found;
found = findMarvelTemplate();
assertTrue("failed to find a template named `marvel`", found);
}
private boolean findMarvelTemplate() {
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates("marvel").get().getIndexTemplates()) {
if (template.getName().equals("marvel")) {
return true;
}
}
return false;
}
@Test
public void testDynamicHostChange() {
// disable exporting to be able to use non valid hosts
Settings.Builder builder = Settings.builder()
.put(AgentService.SETTINGS_INTERVAL, "-1");
internalCluster().startNode(builder);
HttpESExporter httpEsExporter = getEsExporter();
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(HttpESExporter.SETTINGS_HOSTS, "test1")));
assertThat(httpEsExporter.getHosts(), Matchers.arrayContaining("test1"));
// wipes the non array settings
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.putArray(HttpESExporter.SETTINGS_HOSTS, "test2").put(HttpESExporter.SETTINGS_HOSTS, "")));
assertThat(httpEsExporter.getHosts(), Matchers.arrayContaining("test2"));
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().putArray(HttpESExporter.SETTINGS_HOSTS, "test3")));
assertThat(httpEsExporter.getHosts(), Matchers.arrayContaining("test3"));
}
@Test
public void testHostChangeReChecksTemplate() {
Settings.Builder builder = Settings.builder()
.put(AgentService.SETTINGS_INTERVAL, "200m")
.put(Node.HTTP_ENABLED, true);
internalCluster().startNode(builder);
HttpESExporter httpEsExporter = getEsExporter();
logger.info("exporting an event");
httpEsExporter.export(ImmutableList.of(newRandomMarvelDoc()));
logger.info("removing the marvel template");
assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get());
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
Settings.builder().putArray(HttpESExporter.SETTINGS_HOSTS, httpEsExporter.getHosts())).get());
logger.info("exporting a second event");
httpEsExporter.export(ImmutableList.of(newRandomMarvelDoc()));
logger.info("verifying template is inserted");
assertMarvelTemplate();
}
@Test
public void testHostFailureChecksTemplate() throws InterruptedException, IOException {
Settings.Builder builder = Settings.builder()
.put(AgentService.SETTINGS_INTERVAL, "200m")
.put(Node.HTTP_ENABLED, true);
final String node0 = internalCluster().startNode(builder);
String node1 = internalCluster().startNode(builder);
HttpESExporter httpEsExporter0 = getEsExporter(node0);
final HttpESExporter httpEsExporter1 = getEsExporter(node1);
logger.info("--> exporting events to force host resolution");
httpEsExporter0.export(ImmutableList.of(newRandomMarvelDoc()));
httpEsExporter1.export(ImmutableList.of(newRandomMarvelDoc()));
logger.info("--> setting exporting hosts to {} + {}", httpEsExporter0.getHosts(), httpEsExporter1.getHosts());
ArrayList<String> mergedHosts = new ArrayList<String>();
mergedHosts.addAll(Arrays.asList(httpEsExporter0.getHosts()));
mergedHosts.addAll(Arrays.asList(httpEsExporter1.getHosts()));
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
Settings.builder().putArray(HttpESExporter.SETTINGS_HOSTS, mergedHosts.toArray(Strings.EMPTY_ARRAY))).get());
logger.info("--> exporting events to have new settings take effect");
httpEsExporter0.export(ImmutableList.of(newRandomMarvelDoc()));
httpEsExporter1.export(ImmutableList.of(newRandomMarvelDoc()));
assertMarvelTemplate();
logger.info("--> removing the marvel template");
assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get());
logger.info("--> shutting down node0");
internalCluster().stopRandomNode(new Predicate<Settings>() {
@Override
public boolean apply(Settings settings) {
return settings.get("name").equals(node0);
}
});
logger.info("--> exporting events from node1");
// we use assert busy node because url caching may cause the node failure to be only detected while sending the event
assertTrue("failed to find a template named 'marvel'", awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
httpEsExporter1.export(ImmutableList.of(newRandomMarvelDoc()));
logger.debug("--> checking for template");
return findMarvelTemplate();
}
}));
}
private HttpESExporter getEsExporter() {
AgentService service = internalCluster().getInstance(AgentService.class);
return (HttpESExporter) service.getExporters().iterator().next();
}
private HttpESExporter getEsExporter(String node) {
AgentService service = internalCluster().getInstance(AgentService.class, node);
return (HttpESExporter) service.getExporters().iterator().next();
}
final static AtomicLong timeStampGenerator = new AtomicLong();
private MarvelDoc newRandomMarvelDoc() {
return IndexMarvelDoc.createMarvelDoc(internalCluster().getClusterName(), "test_marvelDoc", timeStampGenerator.incrementAndGet(),
"test_index", randomInt(), randomLong(), randomLong(), randomLong());
}
}

View File

@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.license;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.SysGlobals;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.core.License;
import org.elasticsearch.license.plugin.core.LicensesClientService;
import org.elasticsearch.license.plugin.core.LicensesService;
import org.elasticsearch.marvel.MarvelPlugin;
import org.elasticsearch.marvel.mode.Mode;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope = SUITE, transportClientRatio = 0, numClientNodes = 0)
public class LicenseIntegrationTests extends ElasticsearchIntegrationTest {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.settingsBuilder()
.put(super.nodeSettings(nodeOrdinal))
.put("plugin.types", MarvelPlugin.class.getName() + "," + MockLicensePlugin.class.getName())
.put(PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false)
.build();
}
@Test
public void testEnableDisableLicense() {
assertMarvelMode(Mode.STANDARD);
disableLicensing();
assertMarvelMode(Mode.LITE);
enableLicensing();
assertMarvelMode(Mode.STANDARD);
}
private void assertMarvelMode(Mode expected) {
LicenseService licenseService = internalCluster().getInstance(LicenseService.class);
assertNotNull(licenseService);
assertThat(licenseService.mode(), equalTo(expected));
}
public static void disableLicensing() {
for (MockLicenseService service : internalCluster().getInstances(MockLicenseService.class)) {
service.disable();
}
}
public static void enableLicensing() {
for (MockLicenseService service : internalCluster().getInstances(MockLicenseService.class)) {
service.enable();
}
}
public static class MockLicensePlugin extends AbstractPlugin {
public static final String NAME = "internal-test-licensing";
@Override
public String name() {
return NAME;
}
@Override
public String description() {
return name();
}
@Override
public Collection<Class<? extends Module>> modules() {
return ImmutableSet.<Class<? extends Module>>of(InternalLicenseModule.class);
}
}
public static class InternalLicenseModule extends AbstractModule {
@Override
protected void configure() {
bind(MockLicenseService.class).asEagerSingleton();
bind(LicensesClientService.class).to(MockLicenseService.class);
}
}
public static class MockLicenseService extends AbstractComponent implements LicensesClientService {
static final License DUMMY_LICENSE = License.builder()
.feature(LicenseService.FEATURE_NAME)
.expiryDate(System.currentTimeMillis())
.issueDate(System.currentTimeMillis())
.issuedTo("LicensingTests")
.issuer("test")
.maxNodes(Integer.MAX_VALUE)
.signature("_signature")
.type("standard")
.subscriptionType("all_is_good")
.uid(String.valueOf(RandomizedTest.systemPropertyAsInt(SysGlobals.CHILDVM_SYSPROP_JVM_ID, 0)) + System.identityHashCode(LicenseIntegrationTests.class))
.build();
private final List<Listener> listeners = new ArrayList<>();
@Inject
public MockLicenseService(Settings settings) {
super(settings);
enable();
}
@Override
public void register(String s, LicensesService.TrialLicenseOptions trialLicenseOptions, Collection<LicensesService.ExpirationCallback> collection, Listener listener) {
listeners.add(listener);
enable();
}
public void enable() {
// enabled all listeners (incl. shield)
for (Listener listener : listeners) {
listener.onEnabled(DUMMY_LICENSE);
}
}
public void disable() {
// only disable watcher listener (we need shield to work)
for (Listener listener : listeners) {
listener.onDisabled(DUMMY_LICENSE);
}
}
}
}

View File

@ -0,0 +1,11 @@
es.logger.level=DEBUG
log4j.rootLogger=${es.logger.level}, out
log4j.logger.org.apache.http=INFO, out
log4j.additivity.org.apache.http=false
log4j.appender.out=org.apache.log4j.ConsoleAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.conversionPattern=[%d{ISO8601}][%-5p][%-25c] %m%n
log4j.logger.org.elasticsearch.marvel.agent=TRACE, out
log4j.additivity.org.elasticsearch.marvel.agent=false