diff --git a/marvel/dev-tools/elasticsearch_license_header.txt b/marvel/dev-tools/elasticsearch_license_header.txt
new file mode 100644
index 00000000000..250c1ffaa0b
--- /dev/null
+++ b/marvel/dev-tools/elasticsearch_license_header.txt
@@ -0,0 +1,14 @@
+ELASTICSEARCH CONFIDENTIAL
+__________________
+
+ [2014] Elasticsearch Incorporated. All Rights Reserved.
+
+NOTICE: All information contained herein is, and remains
+the property of Elasticsearch Incorporated and its suppliers,
+if any. The intellectual and technical concepts contained
+herein are proprietary to Elasticsearch Incorporated
+and its suppliers and may be covered by U.S. and Foreign Patents,
+patents in process, and are protected by trade secret or copyright law.
+Dissemination of this information or reproduction of this material
+is strictly forbidden unless prior written permission is obtained
+from Elasticsearch Incorporated.
\ No newline at end of file
diff --git a/marvel/dev-tools/license_header_definition.xml b/marvel/dev-tools/license_header_definition.xml
new file mode 100644
index 00000000000..86857154bcf
--- /dev/null
+++ b/marvel/dev-tools/license_header_definition.xml
@@ -0,0 +1,13 @@
+
+
+
+ /*
+ *
+ */EOL
+
+ (\s|\t)*/\*.*$
+ .*\*/(\s|\t)*$
+ false
+ true
+
+
\ No newline at end of file
diff --git a/marvel/pom.xml b/marvel/pom.xml
new file mode 100644
index 00000000000..ab0785168ae
--- /dev/null
+++ b/marvel/pom.xml
@@ -0,0 +1,42 @@
+
+
+
+ 4.0.0
+
+ org.elasticsearch
+ elasticsearch-marvel
+ Elastic X-Plugins - Marvel
+
+
+ org.elasticsearch
+ x-plugins
+ 2.0.0-beta1-SNAPSHOT
+
+
+
+
+
+ org.elasticsearch
+ elasticsearch-license-plugin
+ ${project.version}
+ provided
+
+
+
+
+
+
+
+ src/main/resources
+ true
+
+ **/*.properties
+ marvel_index_template.json
+
+
+
+
+
+
diff --git a/marvel/src/main/assemblies/plugin.xml b/marvel/src/main/assemblies/plugin.xml
new file mode 100644
index 00000000000..2ccfc875e9c
--- /dev/null
+++ b/marvel/src/main/assemblies/plugin.xml
@@ -0,0 +1,17 @@
+
+ plugin
+
+ zip
+
+ false
+
+
+ /
+ true
+ true
+
+ org.elasticsearch:elasticsearch
+
+
+
+
\ No newline at end of file
diff --git a/marvel/src/main/java/org/elasticsearch/marvel/MarvelModule.java b/marvel/src/main/java/org/elasticsearch/marvel/MarvelModule.java
new file mode 100644
index 00000000000..4419c2efe44
--- /dev/null
+++ b/marvel/src/main/java/org/elasticsearch/marvel/MarvelModule.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.marvel;
+
+import com.google.common.collect.ImmutableList;
+import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.common.inject.Module;
+import org.elasticsearch.common.inject.Scopes;
+import org.elasticsearch.common.inject.SpawnModules;
+import org.elasticsearch.marvel.agent.AgentService;
+import org.elasticsearch.marvel.agent.collector.CollectorModule;
+import org.elasticsearch.marvel.agent.exporter.ExporterModule;
+import org.elasticsearch.marvel.license.LicenseModule;
+
+public class MarvelModule extends AbstractModule implements SpawnModules {
+
+ @Override
+ protected void configure() {
+ bind(AgentService.class).in(Scopes.SINGLETON);
+ }
+
+ @Override
+ public Iterable extends Module> spawnModules() {
+ return ImmutableList.of(new LicenseModule(), new CollectorModule(), new ExporterModule());
+ }
+}
diff --git a/marvel/src/main/java/org/elasticsearch/marvel/MarvelPlugin.java b/marvel/src/main/java/org/elasticsearch/marvel/MarvelPlugin.java
new file mode 100644
index 00000000000..1530fd150cf
--- /dev/null
+++ b/marvel/src/main/java/org/elasticsearch/marvel/MarvelPlugin.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.marvel;
+
+import com.google.common.collect.ImmutableList;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.component.LifecycleComponent;
+import org.elasticsearch.common.inject.Module;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.marvel.agent.AgentService;
+import org.elasticsearch.marvel.license.LicenseService;
+import org.elasticsearch.plugins.AbstractPlugin;
+import org.elasticsearch.tribe.TribeService;
+
+import java.util.Collection;
+
+public class MarvelPlugin extends AbstractPlugin {
+
+ private static final ESLogger logger = Loggers.getLogger(MarvelPlugin.class);
+
+ public static final String NAME = "marvel";
+ public static final String ENABLED = NAME + ".enabled";
+
+ private final boolean enabled;
+
+ public MarvelPlugin(Settings settings) {
+ this.enabled = marvelEnabled(settings);
+ }
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public String description() {
+ return "Elasticsearch Marvel";
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ @Override
+ public Collection> modules() {
+ if (!enabled) {
+ return ImmutableList.of();
+ }
+ return ImmutableList.>of(MarvelModule.class);
+ }
+
+ @Override
+ public Collection> services() {
+ if (!enabled) {
+ return ImmutableList.of();
+ }
+ return ImmutableList.>of(LicenseService.class, AgentService.class);
+ }
+
+ public static boolean marvelEnabled(Settings settings) {
+ String tribe = settings.get(TribeService.TRIBE_NAME);
+ if (tribe != null) {
+ logger.trace("marvel cannot be started on tribe node [{}]", tribe);
+ return false;
+ }
+
+ if (!"node".equals(settings.get(Client.CLIENT_TYPE_SETTING))) {
+ logger.trace("marvel cannot be started on a transport client");
+ return false;
+ }
+ return settings.getAsBoolean(ENABLED, true);
+ }
+}
diff --git a/marvel/src/main/java/org/elasticsearch/marvel/MarvelVersion.java b/marvel/src/main/java/org/elasticsearch/marvel/MarvelVersion.java
new file mode 100644
index 00000000000..f156e93c424
--- /dev/null
+++ b/marvel/src/main/java/org/elasticsearch/marvel/MarvelVersion.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.marvel;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.license.plugin.LicenseVersion;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public class MarvelVersion implements Serializable {
+
+ // The logic for ID is: XXYYZZAA, where XX is major version, YY is minor version, ZZ is revision, and AA is Beta/RC indicator
+ // AA values below 50 are beta builds, and below 99 are RC builds, with 99 indicating a release
+ // the (internal) format of the id is there so we can easily do after/before checks on the id
+
+ public static final int V_2_0_0_Beta1_ID = /*00*/2000001;
+ public static final MarvelVersion V_2_0_0_Beta1 = new MarvelVersion(V_2_0_0_Beta1_ID, true, Version.V_2_0_0_Beta1, LicenseVersion.V_1_0_0);
+
+ public static final MarvelVersion CURRENT = V_2_0_0_Beta1;
+
+ public static MarvelVersion readVersion(StreamInput in) throws IOException {
+ return fromId(in.readVInt());
+ }
+
+ public static MarvelVersion fromId(int id) {
+ switch (id) {
+ case V_2_0_0_Beta1_ID:
+ return V_2_0_0_Beta1;
+ default:
+ return new MarvelVersion(id, null, Version.CURRENT, LicenseVersion.CURRENT);
+ }
+ }
+
+ public static void writeVersion(MarvelVersion version, StreamOutput out) throws IOException {
+ out.writeVInt(version.id);
+ }
+
+ /**
+ * Returns the smallest version between the 2.
+ */
+ public static MarvelVersion smallest(MarvelVersion version1, MarvelVersion version2) {
+ return version1.id < version2.id ? version1 : version2;
+ }
+
+ /**
+ * Returns the version given its string representation, current version if the argument is null or empty
+ */
+ public static MarvelVersion fromString(String version) {
+ if (!Strings.hasLength(version)) {
+ return MarvelVersion.CURRENT;
+ }
+
+ String[] parts = version.split("[\\.-]");
+ if (parts.length < 3 || parts.length > 4) {
+ throw new IllegalArgumentException("the version needs to contain major, minor and revision, and optionally the build");
+ }
+
+ try {
+ //we reverse the version id calculation based on some assumption as we can't reliably reverse the modulo
+ int major = Integer.parseInt(parts[0]) * 1000000;
+ int minor = Integer.parseInt(parts[1]) * 10000;
+ int revision = Integer.parseInt(parts[2]) * 100;
+
+ int build = 99;
+ if (parts.length == 4) {
+ String buildStr = parts[3];
+ if (buildStr.startsWith("beta")) {
+ build = Integer.parseInt(buildStr.substring(4));
+ }
+ if (buildStr.startsWith("rc")) {
+ build = Integer.parseInt(buildStr.substring(2)) + 50;
+ }
+ }
+
+ return fromId(major + minor + revision + build);
+
+ } catch(NumberFormatException e) {
+ throw new IllegalArgumentException("unable to parse version " + version, e);
+ }
+ }
+
+ public final int id;
+ public final byte major;
+ public final byte minor;
+ public final byte revision;
+ public final byte build;
+ public final Boolean snapshot;
+ public final Version minEsCompatibilityVersion;
+ public final LicenseVersion minLicenseCompatibilityVersion;
+
+ MarvelVersion(int id, @Nullable Boolean snapshot, Version minEsCompatibilityVersion, LicenseVersion minLicenseCompatibilityVersion) {
+ this.id = id;
+ this.major = (byte) ((id / 1000000) % 100);
+ this.minor = (byte) ((id / 10000) % 100);
+ this.revision = (byte) ((id / 100) % 100);
+ this.build = (byte) (id % 100);
+ this.snapshot = snapshot;
+ this.minEsCompatibilityVersion = minEsCompatibilityVersion;
+ this.minLicenseCompatibilityVersion = minLicenseCompatibilityVersion;
+ }
+
+ public boolean snapshot() {
+ return snapshot != null && snapshot;
+ }
+
+ public boolean after(MarvelVersion version) {
+ return version.id < id;
+ }
+
+ public boolean onOrAfter(MarvelVersion version) {
+ return version.id <= id;
+ }
+
+ public boolean before(MarvelVersion version) {
+ return version.id > id;
+ }
+
+ public boolean onOrBefore(MarvelVersion version) {
+ return version.id >= id;
+ }
+
+ public boolean compatibleWith(MarvelVersion version) {
+ return version.onOrAfter(minimumCompatibilityVersion());
+ }
+
+ public boolean compatibleWith(Version esVersion) {
+ return esVersion.onOrAfter(minEsCompatibilityVersion);
+ }
+
+ /**
+ * Returns the minimum compatible version based on the current
+ * version. Ie a node needs to have at least the return version in order
+ * to communicate with a node running the current version. The returned version
+ * is in most of the cases the smallest major version release unless the current version
+ * is a beta or RC release then the version itself is returned.
+ */
+ public MarvelVersion minimumCompatibilityVersion() {
+ return MarvelVersion.smallest(this, fromId(major * 1000000 + 99));
+ }
+
+ /**
+ * @return The minimum elasticsearch version this marvel version is compatible with.
+ */
+ public Version minimumEsCompatiblityVersion() {
+ return minEsCompatibilityVersion;
+ }
+
+ /**
+ * @return The minimum license plugin version this marvel version is compatible with.
+ */
+ public LicenseVersion minimumLicenseCompatibilityVersion() {
+ return minLicenseCompatibilityVersion;
+ }
+
+
+ /**
+ * Just the version number (without -SNAPSHOT if snapshot).
+ */
+ public String number() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(major).append('.').append(minor).append('.').append(revision);
+ if (build < 50) {
+ sb.append("-beta").append(build);
+ } else if (build < 99) {
+ sb.append("-rc").append(build - 50);
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(number());
+ if (snapshot()) {
+ sb.append("-SNAPSHOT");
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ MarvelVersion that = (MarvelVersion) o;
+
+ if (id != that.id) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return id;
+ }
+}
\ No newline at end of file
diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java
new file mode 100644
index 00000000000..b7b3dbb6c6a
--- /dev/null
+++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.marvel.agent;
+
+import com.google.common.collect.ImmutableSet;
+import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
+import org.elasticsearch.cluster.settings.DynamicSettings;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.marvel.agent.collector.Collector;
+import org.elasticsearch.marvel.agent.exporter.Exporter;
+import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
+import org.elasticsearch.marvel.license.LicenseService;
+import org.elasticsearch.node.settings.NodeSettingsService;
+
+import java.util.Collection;
+import java.util.Set;
+
+public class AgentService extends AbstractLifecycleComponent implements NodeSettingsService.Listener {
+
+ private static final String SETTINGS_BASE = "marvel.agent.";
+
+ public static final String SETTINGS_INTERVAL = SETTINGS_BASE + "interval";
+ public static final String SETTINGS_INDICES = SETTINGS_BASE + "indices";
+ public static final String SETTINGS_ENABLED = SETTINGS_BASE + "enabled";
+
+ public static final String SETTINGS_STATS_TIMEOUT = SETTINGS_BASE + "stats.timeout";
+ public static final String SETTINGS_INDICES_STATS_TIMEOUT = SETTINGS_BASE + "stats.indices.timeout";
+
+ private volatile ExportingWorker exportingWorker;
+
+ private volatile Thread workerThread;
+ private volatile long samplingInterval;
+ volatile private String[] indicesToExport = Strings.EMPTY_ARRAY;
+
+ private volatile TimeValue indicesStatsTimeout;
+ private volatile TimeValue clusterStatsTimeout;
+
+ private final Collection collectors;
+ private final Collection exporters;
+
+ @Inject
+ public AgentService(Settings settings, NodeSettingsService nodeSettingsService,
+ @ClusterDynamicSettings DynamicSettings dynamicSettings,
+ LicenseService licenseService,
+ Set collectors, Set exporters) {
+ super(settings);
+ this.samplingInterval = settings.getAsTime(SETTINGS_INTERVAL, TimeValue.timeValueSeconds(10)).millis();
+ this.indicesToExport = settings.getAsArray(SETTINGS_INDICES, this.indicesToExport, true);
+
+ TimeValue statsTimeout = settings.getAsTime(SETTINGS_STATS_TIMEOUT, TimeValue.timeValueMinutes(10));
+ indicesStatsTimeout = settings.getAsTime(SETTINGS_INDICES_STATS_TIMEOUT, statsTimeout);
+
+ if (settings.getAsBoolean(SETTINGS_ENABLED, true)) {
+ this.collectors = ImmutableSet.copyOf(collectors);
+ this.exporters = ImmutableSet.copyOf(exporters);
+ } else {
+ this.collectors = ImmutableSet.of();
+ this.exporters = ImmutableSet.of();
+ logger.info("collecting disabled by settings");
+ }
+
+ nodeSettingsService.addListener(this);
+ dynamicSettings.addDynamicSetting(SETTINGS_INTERVAL);
+ dynamicSettings.addDynamicSetting(SETTINGS_INDICES + ".*"); // array settings
+ dynamicSettings.addDynamicSetting(SETTINGS_STATS_TIMEOUT);
+ dynamicSettings.addDynamicSetting(SETTINGS_INDICES_STATS_TIMEOUT);
+
+ logger.trace("marvel is running in [{}] mode", licenseService.mode());
+ }
+
+ protected void applyIntervalSettings() {
+ if (samplingInterval <= 0) {
+ logger.info("data sampling is disabled due to interval settings [{}]", samplingInterval);
+ if (workerThread != null) {
+
+ // notify worker to stop on its leisure, not to disturb an exporting operation
+ exportingWorker.closed = true;
+
+ exportingWorker = null;
+ workerThread = null;
+ }
+ } else if (workerThread == null || !workerThread.isAlive()) {
+
+ exportingWorker = new ExportingWorker();
+ workerThread = new Thread(exportingWorker, EsExecutors.threadName(settings, "marvel.exporters"));
+ workerThread.setDaemon(true);
+ workerThread.start();
+
+ }
+ }
+
+ @Override
+ protected void doStart() {
+ if (exporters.size() == 0) {
+ return;
+ }
+
+ for (Collector collector : collectors) {
+ collector.start();
+ }
+
+ for (Exporter exporter : exporters) {
+ exporter.start();
+ }
+
+ applyIntervalSettings();
+ }
+
+ @Override
+ protected void doStop() {
+ if (exporters.size() == 0) {
+ return;
+ }
+ if (workerThread != null && workerThread.isAlive()) {
+ exportingWorker.closed = true;
+ workerThread.interrupt();
+ try {
+ workerThread.join(60000);
+ } catch (InterruptedException e) {
+ // we don't care...
+ }
+
+ }
+
+ for (Collector collector : collectors) {
+ collector.stop();
+ }
+
+ for (Exporter exporter : exporters) {
+ exporter.stop();
+ }
+ }
+
+ @Override
+ protected void doClose() {
+ for (Collector collector : collectors) {
+ collector.close();
+ }
+
+ for (Exporter exporter : exporters) {
+ exporter.close();
+ }
+ }
+
+ // used for testing
+ public Collection getExporters() {
+ return exporters;
+ }
+
+ @Override
+ public void onRefreshSettings(Settings settings) {
+ TimeValue newSamplingInterval = settings.getAsTime(SETTINGS_INTERVAL, null);
+ if (newSamplingInterval != null && newSamplingInterval.millis() != samplingInterval) {
+ logger.info("sampling interval updated to [{}]", newSamplingInterval);
+ samplingInterval = newSamplingInterval.millis();
+ applyIntervalSettings();
+ }
+
+ String[] indices = settings.getAsArray(SETTINGS_INDICES, null, true);
+ if (indices != null) {
+ logger.info("sampling indices updated to [{}]", Strings.arrayToCommaDelimitedString(indices));
+ indicesToExport = indices;
+ }
+
+ TimeValue statsTimeout = settings.getAsTime(SETTINGS_STATS_TIMEOUT, TimeValue.timeValueMinutes(10));
+ TimeValue newTimeValue = settings.getAsTime(SETTINGS_INDICES_STATS_TIMEOUT, statsTimeout);
+ if (!indicesStatsTimeout.equals(newTimeValue)) {
+ logger.info("indices stats timeout updated to [{}]", newTimeValue);
+ indicesStatsTimeout = newTimeValue;
+
+ }
+ }
+
+ class ExportingWorker implements Runnable {
+
+ volatile boolean closed = false;
+
+ @Override
+ public void run() {
+ while (!closed) {
+ // sleep first to allow node to complete initialization before collecting the first start
+ try {
+ Thread.sleep(samplingInterval);
+ if (closed) {
+ continue;
+ }
+
+ for (Collector collector : collectors) {
+ logger.trace("collecting {}", collector.name());
+ Collection results = collector.collect();
+
+ if (results != null && !results.isEmpty()) {
+ for (Exporter exporter : exporters) {
+ exporter.export(results);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Throwable t) {
+ logger.error("Background thread had an uncaught exception:", t);
+ }
+ }
+ logger.debug("worker shutdown");
+ }
+ }
+}
diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/AbstractCollector.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/AbstractCollector.java
new file mode 100644
index 00000000000..d2918ad9220
--- /dev/null
+++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/AbstractCollector.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.marvel.agent.collector;
+
+
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
+
+import java.util.Collection;
+
+public abstract class AbstractCollector extends AbstractLifecycleComponent implements Collector {
+
+ private final String name;
+
+ protected final ClusterService clusterService;
+
+ @Inject
+ public AbstractCollector(Settings settings, String name, ClusterService clusterService) {
+ super(settings);
+ this.name = name;
+ this.clusterService = clusterService;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public T start() {
+ logger.debug("starting collector [{}]", name());
+ return super.start();
+ }
+
+ @Override
+ protected void doStart() {
+ }
+
+ /**
+ * Indicates if the current collector should
+ * be executed on master node only.
+ */
+ protected boolean masterOnly() {
+ return false;
+ }
+
+ @Override
+ public Collection collect() {
+ if (masterOnly() && !clusterService.state().nodes().localNodeMaster()) {
+ logger.trace("collector [{}] runs on master only", name());
+ return null;
+ }
+
+ try {
+ return doCollect();
+ } catch (ElasticsearchTimeoutException e) {
+ logger.error("collector [{}] timed out when collecting data");
+ } catch (Exception e) {
+ logger.error("collector [{}] throws exception when collecting data", e, name());
+ }
+ return null;
+ }
+
+ protected abstract Collection doCollect() throws Exception;
+
+ @Override
+ public T stop() {
+ logger.debug("stopping collector [{}]", name());
+ return super.stop();
+ }
+
+ @Override
+ protected void doStop() {
+ }
+
+ @Override
+ public void close() {
+ logger.trace("closing collector [{}]", name());
+ super.close();
+ }
+
+ @Override
+ protected void doClose() {
+ }
+}
\ No newline at end of file
diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/Collector.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/Collector.java
new file mode 100644
index 00000000000..6e077960020
--- /dev/null
+++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/Collector.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.marvel.agent.collector;
+
+import org.elasticsearch.common.component.LifecycleComponent;
+import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
+
+import java.util.Collection;
+
+public interface Collector extends LifecycleComponent {
+
+ String name();
+
+ Collection collect();
+}
\ No newline at end of file
diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/CollectorModule.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/CollectorModule.java
new file mode 100644
index 00000000000..25b71d3f5a1
--- /dev/null
+++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/CollectorModule.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.marvel.agent.collector;
+
+import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.common.inject.multibindings.Multibinder;
+import org.elasticsearch.marvel.agent.collector.indices.IndexCollector;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class CollectorModule extends AbstractModule {
+
+ private final Set> collectors = new HashSet<>();
+
+ public CollectorModule() {
+ // Registers default collectors
+ registerCollector(IndexCollector.class);
+ }
+
+ @Override
+ protected void configure() {
+ Multibinder binder = Multibinder.newSetBinder(binder(), Collector.class);
+ for (Class extends Collector> collector : collectors) {
+ bind(collector).asEagerSingleton();
+ binder.addBinding().to(collector);
+ }
+ }
+
+ public void registerCollector(Class extends Collector> collector) {
+ collectors.add(collector);
+ }
+}
\ No newline at end of file
diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/indices/IndexCollector.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/indices/IndexCollector.java
new file mode 100644
index 00000000000..6df746e713d
--- /dev/null
+++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/indices/IndexCollector.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.marvel.agent.collector.indices;
+
+import com.google.common.collect.ImmutableList;
+import org.elasticsearch.action.admin.indices.stats.IndexStats;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.marvel.agent.collector.AbstractCollector;
+import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
+
+import java.util.Collection;
+
+/**
+ * Collector for indices statistics.
+ *
+ * This collector runs on the master node only and collect a {@link IndexMarvelDoc} document
+ * for each existing index in the cluster.
+ */
+public class IndexCollector extends AbstractCollector {
+
+ public static final String NAME = "index-collector";
+ protected static final String TYPE = "marvel_index";
+
+ private final ClusterName clusterName;
+ private final Client client;
+
+ @Inject
+ public IndexCollector(Settings settings, ClusterService clusterService, ClusterName clusterName, Client client) {
+ super(settings, NAME, clusterService);
+ this.client = client;
+ this.clusterName = clusterName;
+ }
+
+ @Override
+ protected boolean masterOnly() {
+ return true;
+ }
+
+ @Override
+ protected Collection doCollect() throws Exception {
+ ImmutableList.Builder results = ImmutableList.builder();
+
+ IndicesStatsResponse indicesStats = client.admin().indices().prepareStats().all()
+ .setStore(true)
+ .setIndexing(true)
+ .setDocs(true)
+ .get();
+
+ long timestamp = System.currentTimeMillis();
+ for (IndexStats indexStats : indicesStats.getIndices().values()) {
+ results.add(buildMarvelDoc(clusterName.value(), TYPE, timestamp, indexStats));
+ }
+ return results.build();
+ }
+
+ protected MarvelDoc buildMarvelDoc(String clusterName, String type, long timestamp, IndexStats indexStats) {
+ return IndexMarvelDoc.createMarvelDoc(clusterName, type, timestamp,
+ indexStats.getIndex(),
+ indexStats.getTotal().getDocs().getCount(),
+ indexStats.getTotal().getStore().sizeInBytes(), indexStats.getTotal().getStore().throttleTime().millis(),
+ indexStats.getTotal().getIndexing().getTotal().getThrottleTimeInMillis());
+ }
+}
diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/indices/IndexMarvelDoc.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/indices/IndexMarvelDoc.java
new file mode 100644
index 00000000000..890f9af7751
--- /dev/null
+++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/collector/indices/IndexMarvelDoc.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.marvel.agent.collector.indices;
+
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class IndexMarvelDoc extends MarvelDoc {
+
+ private final String index;
+ private final Docs docs;
+ private final Store store;
+ private final Indexing indexing;
+
+ public IndexMarvelDoc(String clusterName, String type, long timestamp,
+ String index, Docs docs, Store store, Indexing indexing) {
+ super(clusterName, type, timestamp);
+ this.index = index;
+ this.docs = docs;
+ this.store = store;
+ this.indexing = indexing;
+ }
+
+ @Override
+ public IndexMarvelDoc payload() {
+ return this;
+ }
+
+ public String getIndex() {
+ return index;
+ }
+
+ public Docs getDocs() {
+ return docs;
+ }
+
+ public Store getStore() {
+ return store;
+ }
+
+ public Indexing getIndexing() {
+ return indexing;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject();
+ super.toXContent(builder, params);
+ builder.field(Fields.INDEX, index);
+ if (docs != null) {
+ docs.toXContent(builder, params);
+ }
+ if (store != null) {
+ store.toXContent(builder, params);
+ }
+ if (indexing != null) {
+ indexing.toXContent(builder, params);
+ }
+ builder.endObject();
+ return builder;
+ }
+
+ public static IndexMarvelDoc createMarvelDoc(String clusterName, String type, long timestamp,
+ String index, long docsCount, long storeSizeInBytes, long storeThrottleTimeInMillis, long indexingThrottleTimeInMillis) {
+ return new IndexMarvelDoc(clusterName, type, timestamp, index,
+ new Docs(docsCount),
+ new Store(storeSizeInBytes, storeThrottleTimeInMillis),
+ new Indexing(indexingThrottleTimeInMillis));
+ }
+
+ static final class Fields {
+ static final XContentBuilderString INDEX = new XContentBuilderString("index");
+ }
+
+ static class Docs implements ToXContent {
+
+ private final long count;
+
+ Docs(long count) {
+ this.count = count;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject(Fields.DOCS);
+ builder.field(Fields.COUNT, count);
+ builder.endObject();
+ return builder;
+ }
+
+ static final class Fields {
+ static final XContentBuilderString DOCS = new XContentBuilderString("docs");
+ static final XContentBuilderString COUNT = new XContentBuilderString("count");
+ }
+ }
+
+ static class Store implements ToXContent {
+
+ private final long sizeInBytes;
+ private final long throttleTimeInMillis;
+
+ public Store(long sizeInBytes, long throttleTimeInMillis) {
+ this.sizeInBytes = sizeInBytes;
+ this.throttleTimeInMillis = throttleTimeInMillis;
+ }
+
+ public long getSizeInBytes() {
+ return sizeInBytes;
+ }
+
+ public long getThrottleTimeInMillis() {
+ return throttleTimeInMillis;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject(Fields.STORE);
+ builder.field(Fields.SIZE_IN_BYTES, sizeInBytes);
+ builder.timeValueField(Fields.THROTTLE_TIME_IN_MILLIS, Fields.THROTTLE_TIME, new TimeValue(throttleTimeInMillis, TimeUnit.MILLISECONDS));
+ builder.endObject();
+ return builder;
+ }
+
+ static final class Fields {
+ static final XContentBuilderString STORE = new XContentBuilderString("store");
+ static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes");
+ static final XContentBuilderString THROTTLE_TIME = new XContentBuilderString("throttle_time");
+ static final XContentBuilderString THROTTLE_TIME_IN_MILLIS = new XContentBuilderString("throttle_time_in_millis");
+ }
+ }
+
+ static class Indexing implements ToXContent {
+
+ private final long throttleTimeInMillis;
+
+ public Indexing(long throttleTimeInMillis) {
+ this.throttleTimeInMillis = throttleTimeInMillis;
+ }
+
+ public long getThrottleTimeInMillis() {
+ return throttleTimeInMillis;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject(Fields.INDEXING);
+ builder.timeValueField(Fields.THROTTLE_TIME_IN_MILLIS, Fields.THROTTLE_TIME, new TimeValue(throttleTimeInMillis, TimeUnit.MILLISECONDS));
+ builder.endObject();
+ return builder;
+ }
+
+ static final class Fields {
+ static final XContentBuilderString INDEXING = new XContentBuilderString("indexing");
+ static final XContentBuilderString THROTTLE_TIME = new XContentBuilderString("throttle_time");
+ static final XContentBuilderString THROTTLE_TIME_IN_MILLIS = new XContentBuilderString("throttle_time_in_millis");
+ }
+ }
+}
+
diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/AbstractExporter.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/AbstractExporter.java
new file mode 100644
index 00000000000..1002774d015
--- /dev/null
+++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/AbstractExporter.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.marvel.agent.exporter;
+
+
+import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+
+import java.util.Collection;
+
+public abstract class AbstractExporter extends AbstractLifecycleComponent implements Exporter {
+
+ private final String name;
+
+ protected final ClusterService clusterService;
+
+ @Inject
+ public AbstractExporter(Settings settings, String name, ClusterService clusterService) {
+ super(settings);
+ this.name = name;
+ this.clusterService = clusterService;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public T start() {
+ logger.debug("starting exporter [{}]", name());
+ return super.start();
+ }
+
+ @Override
+ protected void doStart() {
+ }
+
+ protected boolean masterOnly() {
+ return false;
+ }
+
+ @Override
+ public void export(Collection marvelDocs) {
+ if (masterOnly() && !clusterService.state().nodes().localNodeMaster()) {
+ logger.trace("exporter [{}] runs on master only", name());
+ return;
+ }
+
+ if (marvelDocs == null) {
+ logger.debug("no objects to export for [{}]", name());
+ return;
+ }
+
+ try {
+ doExport(marvelDocs);
+ } catch (Exception e) {
+ logger.error("export [{}] throws exception when exporting data", e, name());
+ }
+ }
+
+ protected abstract void doExport(Collection marvelDocs) throws Exception;
+
+ @Override
+ public T stop() {
+ logger.debug("stopping exporter [{}]", name());
+ return super.stop();
+ }
+
+ @Override
+ protected void doStop() {
+ }
+
+ @Override
+ public void close() {
+ logger.trace("closing exporter [{}]", name());
+ super.close();
+ }
+
+ @Override
+ protected void doClose() {
+ }
+}
\ No newline at end of file
diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java
new file mode 100644
index 00000000000..c7688aa3ebb
--- /dev/null
+++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.marvel.agent.exporter;
+
+import org.elasticsearch.common.component.LifecycleComponent;
+
+import java.util.Collection;
+
+public interface Exporter extends LifecycleComponent {
+
+ String name();
+
+ void export(Collection marvelDocs);
+}
\ No newline at end of file
diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExporterModule.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExporterModule.java
new file mode 100644
index 00000000000..23ac241128e
--- /dev/null
+++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExporterModule.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.marvel.agent.exporter;
+
+import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.common.inject.multibindings.Multibinder;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ExporterModule extends AbstractModule {
+
+ private final Set> exporters = new HashSet<>();
+
+ public ExporterModule() {
+ // Registers default exporter
+ registerExporter(HttpESExporter.class);
+ }
+
+ @Override
+ protected void configure() {
+ Multibinder binder = Multibinder.newSetBinder(binder(), Exporter.class);
+ for (Class extends Exporter> exporter : exporters) {
+ bind(exporter).asEagerSingleton();
+ binder.addBinding().to(exporter);
+ }
+ }
+
+ public void registerExporter(Class extends Exporter> exporter) {
+ exporters.add(exporter);
+ }
+}
diff --git a/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporter.java b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporter.java
new file mode 100644
index 00000000000..208a7539f30
--- /dev/null
+++ b/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/HttpESExporter.java
@@ -0,0 +1,652 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.marvel.agent.exporter;
+
+import com.google.common.io.ByteStreams;
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
+import org.elasticsearch.cluster.settings.DynamicSettings;
+import org.elasticsearch.common.Base64;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.component.Lifecycle;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.BoundTransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.common.xcontent.*;
+import org.elasticsearch.common.xcontent.smile.SmileXContent;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.http.HttpServer;
+import org.elasticsearch.marvel.agent.support.AgentUtils;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.service.NodeService;
+import org.elasticsearch.node.settings.NodeSettingsService;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import javax.net.ssl.*;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyStore;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+public class HttpESExporter extends AbstractExporter implements NodeSettingsService.Listener {
+
+ private static final String NAME = "es_exporter";
+
+ private static final String SETTINGS_PREFIX = "marvel.agent.exporter.es.";
+ public static final String SETTINGS_HOSTS = SETTINGS_PREFIX + "hosts";
+ public static final String SETTINGS_INDEX_PREFIX = SETTINGS_PREFIX + "index.prefix";
+ public static final String SETTINGS_INDEX_TIME_FORMAT = SETTINGS_PREFIX + "index.timeformat";
+ public static final String SETTINGS_TIMEOUT = SETTINGS_PREFIX + "timeout";
+ public static final String SETTINGS_READ_TIMEOUT = SETTINGS_PREFIX + "read_timeout";
+
+ // es level timeout used when checking and writing templates (used to speed up tests)
+ public static final String SETTINGS_CHECK_TEMPLATE_TIMEOUT = SETTINGS_PREFIX + ".template.master_timeout";
+
+ // es level timeout used for bulk indexing (used to speed up tests)
+ public static final String SETTINGS_BULK_TIMEOUT = SETTINGS_PREFIX + ".bulk.timeout";
+
+ volatile String[] hosts;
+ volatile boolean boundToLocalNode = false;
+ final String indexPrefix;
+ final DateTimeFormatter indexTimeFormatter;
+ volatile int timeoutInMillis;
+ volatile int readTimeoutInMillis;
+
+
+ /** https support * */
+ final SSLSocketFactory sslSocketFactory;
+ volatile boolean hostnameVerification;
+
+ final ClusterService clusterService;
+ final ClusterName clusterName;
+ final NodeService nodeService;
+ final Environment environment;
+
+ HttpServer httpServer;
+ final boolean httpEnabled;
+
+ @Nullable
+ final TimeValue templateCheckTimeout;
+ @Nullable
+ final TimeValue bulkTimeout;
+
+ volatile boolean checkedAndUploadedIndexTemplate = false;
+
+ final ConnectionKeepAliveWorker keepAliveWorker;
+ Thread keepAliveThread;
+
+ @Inject
+ public HttpESExporter(Settings settings, ClusterService clusterService, ClusterName clusterName,
+ @ClusterDynamicSettings DynamicSettings dynamicSettings,
+ NodeSettingsService nodeSettingsService,
+ NodeService nodeService, Environment environment) {
+ super(settings, NAME, clusterService);
+
+ this.clusterService = clusterService;
+
+ this.clusterName = clusterName;
+ this.nodeService = nodeService;
+ this.environment = environment;
+
+ httpEnabled = settings.getAsBoolean(Node.HTTP_ENABLED, true);
+
+ hosts = settings.getAsArray(SETTINGS_HOSTS, Strings.EMPTY_ARRAY);
+
+ validateHosts(hosts);
+
+ indexPrefix = settings.get(SETTINGS_INDEX_PREFIX, ".marvel");
+ String indexTimeFormat = settings.get(SETTINGS_INDEX_TIME_FORMAT, "YYYY.MM.dd");
+ indexTimeFormatter = DateTimeFormat.forPattern(indexTimeFormat).withZoneUTC();
+
+ timeoutInMillis = (int) settings.getAsTime(SETTINGS_TIMEOUT, new TimeValue(6000)).millis();
+ readTimeoutInMillis = (int) settings.getAsTime(SETTINGS_READ_TIMEOUT, new TimeValue(timeoutInMillis * 10)).millis();
+
+ templateCheckTimeout = settings.getAsTime(SETTINGS_CHECK_TEMPLATE_TIMEOUT, null);
+ bulkTimeout = settings.getAsTime(SETTINGS_CHECK_TEMPLATE_TIMEOUT, null);
+
+ keepAliveWorker = new ConnectionKeepAliveWorker();
+
+ dynamicSettings.addDynamicSetting(SETTINGS_HOSTS);
+ dynamicSettings.addDynamicSetting(SETTINGS_HOSTS + ".*");
+ dynamicSettings.addDynamicSetting(SETTINGS_TIMEOUT);
+ dynamicSettings.addDynamicSetting(SETTINGS_READ_TIMEOUT);
+ dynamicSettings.addDynamicSetting(SETTINGS_SSL_HOSTNAME_VERIFICATION);
+ nodeSettingsService.addListener(this);
+
+ if (!settings.getByPrefix(SETTINGS_SSL_PREFIX).getAsMap().isEmpty()) {
+ sslSocketFactory = createSSLSocketFactory(settings);
+ } else {
+ logger.trace("no ssl context configured");
+ sslSocketFactory = null;
+ }
+ hostnameVerification = settings.getAsBoolean(SETTINGS_SSL_HOSTNAME_VERIFICATION, true);
+
+ logger.debug("initialized with targets: {}, index prefix [{}], index time format [{}]",
+ AgentUtils.santizeUrlPwds(Strings.arrayToCommaDelimitedString(hosts)), indexPrefix, indexTimeFormat);
+ }
+
+ static private void validateHosts(String[] hosts) {
+ for (String host : hosts) {
+ try {
+ AgentUtils.parseHostWithPath(host, "");
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + AgentUtils.santizeUrlPwds(host) + "]." +
+ " error: [" + AgentUtils.santizeUrlPwds(e.getMessage()) + "]");
+ } catch (MalformedURLException e) {
+ throw new RuntimeException("[marvel.agent.exporter] invalid host: [" + AgentUtils.santizeUrlPwds(host) + "]." +
+ " error: [" + AgentUtils.santizeUrlPwds(e.getMessage()) + "]");
+ }
+ }
+ }
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Inject(optional = true)
+ public void setHttpServer(HttpServer httpServer) {
+ this.httpServer = httpServer;
+ }
+
+ private HttpURLConnection openExportingConnection() {
+ logger.trace("setting up an export connection");
+ String queryString = "";
+ if (bulkTimeout != null) {
+ queryString = "?master_timeout=" + bulkTimeout;
+ }
+ HttpURLConnection conn = openAndValidateConnection("POST", getIndexName() + "/_bulk" + queryString, XContentType.SMILE.restContentType());
+ if (conn != null && (keepAliveThread == null || !keepAliveThread.isAlive())) {
+ // start keep alive upon successful connection if not there.
+ initKeepAliveThread();
+ }
+ return conn;
+ }
+
+ private void addMarvelDocToConnection(HttpURLConnection conn,
+ MarvelDoc marvelDoc) throws IOException {
+ OutputStream os = conn.getOutputStream();
+ // TODO: find a way to disable builder's substream flushing or something neat solution
+ XContentBuilder builder = XContentFactory.smileBuilder();
+ builder.startObject().startObject("index")
+ .field("_type", marvelDoc.type())
+ .endObject().endObject();
+ builder.close();
+ builder.bytes().writeTo(os);
+ os.write(SmileXContent.smileXContent.streamSeparator());
+
+ builder = XContentFactory.smileBuilder();
+ builder.humanReadable(false);
+ marvelDoc.toXContent(builder, ToXContent.EMPTY_PARAMS);
+ builder.close();
+ builder.bytes().writeTo(os);
+ os.write(SmileXContent.smileXContent.streamSeparator());
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendCloseExportingConnection(HttpURLConnection conn) throws IOException {
+ logger.trace("sending content");
+ OutputStream os = conn.getOutputStream();
+ os.close();
+ if (conn.getResponseCode() != 200) {
+ logConnectionError("remote target didn't respond with 200 OK", conn);
+ return;
+ }
+
+ InputStream inputStream = conn.getInputStream();
+ try (XContentParser parser = XContentType.SMILE.xContent().createParser(inputStream)) {
+ Map response = parser.map();
+ if (response.get("items") != null) {
+ ArrayList