From e51aa21575e94b38535fc08f10e42867da1b6289 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 7 Mar 2016 14:03:17 +0100 Subject: [PATCH] Monitoring: Add export() method to Exporters class This commit adds a synchronized "export()" method to the Exporters so that the Exporters class can be used as an export service for exporting monitoring docs. Original commit: elastic/x-pack-elasticsearch@22bda986c5bcb29947fd8bc184632110c6e25bdc --- .../marvel/agent/AgentService.java | 44 +++--- .../marvel/agent/exporter/Exporters.java | 26 +++- .../marvel/agent/exporter/ExportersTests.java | 136 ++++++++++++++++++ 3 files changed, 181 insertions(+), 25 deletions(-) diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java index 3256163c1ea..3f4199866dc 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java @@ -17,11 +17,11 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.marvel.MarvelSettings; import org.elasticsearch.marvel.agent.collector.Collector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStatsCollector; -import org.elasticsearch.marvel.agent.exporter.ExportBulk; import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.marvel.agent.exporter.Exporters; import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -174,33 +174,29 @@ public class AgentService extends AbstractLifecycleComponent { continue; } - ExportBulk bulk = exporters.openBulk(); - if (bulk == null) { // exporters are either not ready or faulty - continue; + if (logger.isTraceEnabled()) { + logger.trace("collecting data - collectors [{}]", Strings.collectionToCommaDelimitedString(collectors)); } - try { - if (logger.isTraceEnabled()) { - logger.trace("collecting data - collectors [{}]", Strings.collectionToCommaDelimitedString(collectors)); - } - for (Collector collector : collectors) { - if (collecting) { - Collection docs = collector.collect(); - if ((docs != null) && (docs.size() > 0)) { - logger.trace("bulk [{}] - adding [{}] collected docs from [{}] collector", bulk, docs.size(), - collector.name()); - bulk.add(docs); - } else { - logger.trace("bulk [{}] - skipping collected docs from [{}] collector", bulk, collector.name()); - } - } - if (closed) { - // Stop collecting if the worker is marked as closed - break; + Collection docs = new ArrayList<>(); + for (Collector collector : collectors) { + if (collecting) { + Collection result = collector.collect(); + if (result != null) { + logger.trace("adding [{}] collected docs from [{}] collector", result.size(), collector.name()); + docs.addAll(result); + } else { + logger.trace("skipping collected docs from [{}] collector", collector.name()); } } - } finally { - bulk.close(!closed && collecting); + if (closed) { + // Stop collecting if the worker is marked as closed + break; + } + } + + if ((docs.isEmpty() == false) && (closed == false)) { + exporters.export(docs); } } catch (InterruptedException e) { diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java index 1c4cd8462ed..d3da36f35de 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java @@ -18,6 +18,7 @@ import org.elasticsearch.marvel.MarvelSettings; import org.elasticsearch.marvel.agent.exporter.local.LocalExporter; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -105,7 +106,7 @@ public class Exporters extends AbstractLifecycleComponent implements return exporters.iterator(); } - public ExportBulk openBulk() { + ExportBulk openBulk() { List bulks = new ArrayList<>(); for (Exporter exporter : exporters) { if (exporter.masterOnly() && !clusterService.localNode().masterNode()) { @@ -175,6 +176,29 @@ public class Exporters extends AbstractLifecycleComponent implements return new CurrentExporters(settings, exporters); } + /** + * Exports a collection of monitoring documents using the configured exporters + */ + public synchronized void export(Collection docs) throws Exception { + if (this.lifecycleState() != Lifecycle.State.STARTED) { + throw new IllegalStateException("Export service is not started"); + } + if (docs != null && docs.size() > 0) { + ExportBulk bulk = openBulk(); + if (bulk == null) { + logger.debug("exporters are either not ready or faulty"); + return; + } + + try { + logger.debug("exporting [{}] monitoring documents", docs.size()); + bulk.add(docs); + } finally { + bulk.close(lifecycleState() == Lifecycle.State.STARTED); + } + } + } + static class CurrentExporters implements Iterable { static final CurrentExporters EMPTY = new CurrentExporters(Settings.EMPTY, Collections.emptyMap()); diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java index 39a77453682..213a64a1f91 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java @@ -5,27 +5,38 @@ */ package org.elasticsearch.marvel.agent.exporter; +import org.elasticsearch.Version; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.marvel.MarvelSettings; +import org.elasticsearch.marvel.MonitoringIds; import org.elasticsearch.marvel.agent.exporter.local.LocalExporter; import org.elasticsearch.marvel.cleaner.CleanerService; import org.elasticsearch.shield.InternalClient; import org.elasticsearch.test.ESTestCase; import org.junit.Before; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.instanceOf; @@ -253,6 +264,70 @@ public class ExportersTests extends ESTestCase { verifyNoMoreInteractions(exporters.getExporter("_name1")); } + /** + * This test creates N threads that export a random number of document + * using a {@link Exporters} instance. + */ + public void testConcurrentExports() throws Exception { + final int nbExporters = randomIntBetween(1, 5); + + logger.info("--> creating {} exporters", nbExporters); + Settings.Builder settings = Settings.builder(); + for (int i = 0; i < nbExporters; i++) { + settings.put("xpack.monitoring.agent.exporters._name" + String.valueOf(i) + ".type", "record"); + } + + Exporter.Factory factory = new CountingExportFactory("record", false); + factories.put("record", factory); + + Exporters exporters = new Exporters(settings.build(), factories, clusterService, clusterSettings); + exporters.start(); + + final Thread[] threads = new Thread[3 + randomInt(7)]; + final CyclicBarrier barrier = new CyclicBarrier(threads.length); + final List exceptions = new CopyOnWriteArrayList<>(); + + int total = 0; + + logger.info("--> exporting documents using {} threads", threads.length); + for (int i = 0; i < threads.length; i++) { + int nbDocs = randomIntBetween(10, 50); + total += nbDocs; + + logger.debug("--> exporting thread [{}] exports {} documents", i, nbDocs); + threads[i] = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + logger.error("unexpected error in exporting thread", t); + exceptions.add(t); + } + + @Override + protected void doRun() throws Exception { + List docs = new ArrayList<>(); + for (int n = 0; n < nbDocs; n++) { + docs.add(new MonitoringDoc(MonitoringIds.ES.getId(), Version.CURRENT.toString())); + } + barrier.await(10, TimeUnit.SECONDS); + exporters.export(docs); + } + }, "export_thread_" + i); + threads[i].start(); + } + + logger.info("--> waiting for threads to exports {} documents", total); + for (Thread thread : threads) { + thread.join(); + } + + assertThat(exceptions, empty()); + for (Exporter exporter : exporters) { + assertThat(exporter, instanceOf(CountingExportFactory.CountingExporter.class)); + assertThat(((CountingExportFactory.CountingExporter)exporter).getExportedCount(), equalTo(total)); + } + } + + static class TestFactory extends Exporter.Factory { public TestFactory(String type, boolean singleton) { super(type, singleton); @@ -302,4 +377,65 @@ public class ExportersTests extends ESTestCase { } } + /** + * A factory of exporters that count the number of exported documents. + */ + static class CountingExportFactory extends Exporter.Factory { + + public CountingExportFactory(String type, boolean singleton) { + super(type, singleton); + } + + @Override + public CountingExporter create(Exporter.Config config) { + return new CountingExporter(type(), config); + } + + static class CountingExporter extends Exporter { + + private static final AtomicInteger count = new AtomicInteger(0); + private final CountingBulk bulk; + + public CountingExporter(String type, Config config) { + super(type, config); + this.bulk = new CountingBulk(type + "#" + count.getAndIncrement()); + } + + @Override + public ExportBulk openBulk() { + return bulk; + } + + @Override + public void close() { + } + + public int getExportedCount() { + return bulk.getCount().get(); + } + } + + static class CountingBulk extends ExportBulk { + + private final AtomicInteger count = new AtomicInteger(); + + public CountingBulk(String name) { + super(name); + } + + @Override + public ExportBulk add(Collection docs) throws Exception { + count.addAndGet(docs.size()); + return this; + } + + @Override + public void flush() throws Exception { + } + + AtomicInteger getCount() { + return count; + } + } + } }