Monitoring: Add export() method to Exporters class

This commit adds a synchronized "export()" method to the Exporters so that the Exporters class can be used as an export service for exporting monitoring docs.

Original commit: elastic/x-pack-elasticsearch@22bda986c5
This commit is contained in:
Tanguy Leroux 2016-03-07 14:03:17 +01:00
parent 925afa3cab
commit e51aa21575
3 changed files with 181 additions and 25 deletions

View File

@ -17,11 +17,11 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.marvel.MarvelSettings;
import org.elasticsearch.marvel.agent.collector.Collector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.marvel.agent.exporter.ExportBulk;
import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.exporter.MonitoringDoc;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@ -174,33 +174,29 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
continue;
}
ExportBulk bulk = exporters.openBulk();
if (bulk == null) { // exporters are either not ready or faulty
continue;
if (logger.isTraceEnabled()) {
logger.trace("collecting data - collectors [{}]", Strings.collectionToCommaDelimitedString(collectors));
}
try {
if (logger.isTraceEnabled()) {
logger.trace("collecting data - collectors [{}]", Strings.collectionToCommaDelimitedString(collectors));
}
for (Collector collector : collectors) {
if (collecting) {
Collection<MonitoringDoc> docs = collector.collect();
if ((docs != null) && (docs.size() > 0)) {
logger.trace("bulk [{}] - adding [{}] collected docs from [{}] collector", bulk, docs.size(),
collector.name());
bulk.add(docs);
} else {
logger.trace("bulk [{}] - skipping collected docs from [{}] collector", bulk, collector.name());
}
}
if (closed) {
// Stop collecting if the worker is marked as closed
break;
Collection<MonitoringDoc> docs = new ArrayList<>();
for (Collector collector : collectors) {
if (collecting) {
Collection<MonitoringDoc> result = collector.collect();
if (result != null) {
logger.trace("adding [{}] collected docs from [{}] collector", result.size(), collector.name());
docs.addAll(result);
} else {
logger.trace("skipping collected docs from [{}] collector", collector.name());
}
}
} finally {
bulk.close(!closed && collecting);
if (closed) {
// Stop collecting if the worker is marked as closed
break;
}
}
if ((docs.isEmpty() == false) && (closed == false)) {
exporters.export(docs);
}
} catch (InterruptedException e) {

View File

@ -18,6 +18,7 @@ import org.elasticsearch.marvel.MarvelSettings;
import org.elasticsearch.marvel.agent.exporter.local.LocalExporter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -105,7 +106,7 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
return exporters.iterator();
}
public ExportBulk openBulk() {
ExportBulk openBulk() {
List<ExportBulk> bulks = new ArrayList<>();
for (Exporter exporter : exporters) {
if (exporter.masterOnly() && !clusterService.localNode().masterNode()) {
@ -175,6 +176,29 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
return new CurrentExporters(settings, exporters);
}
/**
* Exports a collection of monitoring documents using the configured exporters
*/
public synchronized void export(Collection<MonitoringDoc> docs) throws Exception {
if (this.lifecycleState() != Lifecycle.State.STARTED) {
throw new IllegalStateException("Export service is not started");
}
if (docs != null && docs.size() > 0) {
ExportBulk bulk = openBulk();
if (bulk == null) {
logger.debug("exporters are either not ready or faulty");
return;
}
try {
logger.debug("exporting [{}] monitoring documents", docs.size());
bulk.add(docs);
} finally {
bulk.close(lifecycleState() == Lifecycle.State.STARTED);
}
}
}
static class CurrentExporters implements Iterable<Exporter> {
static final CurrentExporters EMPTY = new CurrentExporters(Settings.EMPTY, Collections.emptyMap());

View File

@ -5,27 +5,38 @@
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.marvel.MarvelSettings;
import org.elasticsearch.marvel.MonitoringIds;
import org.elasticsearch.marvel.agent.exporter.local.LocalExporter;
import org.elasticsearch.marvel.cleaner.CleanerService;
import org.elasticsearch.shield.InternalClient;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
@ -253,6 +264,70 @@ public class ExportersTests extends ESTestCase {
verifyNoMoreInteractions(exporters.getExporter("_name1"));
}
/**
* This test creates N threads that export a random number of document
* using a {@link Exporters} instance.
*/
public void testConcurrentExports() throws Exception {
final int nbExporters = randomIntBetween(1, 5);
logger.info("--> creating {} exporters", nbExporters);
Settings.Builder settings = Settings.builder();
for (int i = 0; i < nbExporters; i++) {
settings.put("xpack.monitoring.agent.exporters._name" + String.valueOf(i) + ".type", "record");
}
Exporter.Factory factory = new CountingExportFactory("record", false);
factories.put("record", factory);
Exporters exporters = new Exporters(settings.build(), factories, clusterService, clusterSettings);
exporters.start();
final Thread[] threads = new Thread[3 + randomInt(7)];
final CyclicBarrier barrier = new CyclicBarrier(threads.length);
final List<Throwable> exceptions = new CopyOnWriteArrayList<>();
int total = 0;
logger.info("--> exporting documents using {} threads", threads.length);
for (int i = 0; i < threads.length; i++) {
int nbDocs = randomIntBetween(10, 50);
total += nbDocs;
logger.debug("--> exporting thread [{}] exports {} documents", i, nbDocs);
threads[i] = new Thread(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.error("unexpected error in exporting thread", t);
exceptions.add(t);
}
@Override
protected void doRun() throws Exception {
List<MonitoringDoc> docs = new ArrayList<>();
for (int n = 0; n < nbDocs; n++) {
docs.add(new MonitoringDoc(MonitoringIds.ES.getId(), Version.CURRENT.toString()));
}
barrier.await(10, TimeUnit.SECONDS);
exporters.export(docs);
}
}, "export_thread_" + i);
threads[i].start();
}
logger.info("--> waiting for threads to exports {} documents", total);
for (Thread thread : threads) {
thread.join();
}
assertThat(exceptions, empty());
for (Exporter exporter : exporters) {
assertThat(exporter, instanceOf(CountingExportFactory.CountingExporter.class));
assertThat(((CountingExportFactory.CountingExporter)exporter).getExportedCount(), equalTo(total));
}
}
static class TestFactory extends Exporter.Factory<TestFactory.TestExporter> {
public TestFactory(String type, boolean singleton) {
super(type, singleton);
@ -302,4 +377,65 @@ public class ExportersTests extends ESTestCase {
}
}
/**
* A factory of exporters that count the number of exported documents.
*/
static class CountingExportFactory extends Exporter.Factory<CountingExportFactory.CountingExporter> {
public CountingExportFactory(String type, boolean singleton) {
super(type, singleton);
}
@Override
public CountingExporter create(Exporter.Config config) {
return new CountingExporter(type(), config);
}
static class CountingExporter extends Exporter {
private static final AtomicInteger count = new AtomicInteger(0);
private final CountingBulk bulk;
public CountingExporter(String type, Config config) {
super(type, config);
this.bulk = new CountingBulk(type + "#" + count.getAndIncrement());
}
@Override
public ExportBulk openBulk() {
return bulk;
}
@Override
public void close() {
}
public int getExportedCount() {
return bulk.getCount().get();
}
}
static class CountingBulk extends ExportBulk {
private final AtomicInteger count = new AtomicInteger();
public CountingBulk(String name) {
super(name);
}
@Override
public ExportBulk add(Collection<MonitoringDoc> docs) throws Exception {
count.addAndGet(docs.size());
return this;
}
@Override
public void flush() throws Exception {
}
AtomicInteger getCount() {
return count;
}
}
}
}