Monitoring: Fix synchronization in Exporters

This commit fixes an issue in synchronization in Exporters class. The export() method is synchronized and when used with LocalExport can provoke a deadlock. LocalExporter exports data locally using bulk requests that can trigger cluster state updates for mapping updates. If a exporters settings update sneaks in, the settings update waits for the export to terminate but the export waits for the settings to be updated... and boom.

This commit removes the synchronized and refactor Exporters/LocalExporter to use state and dedicated instance of LocalBulk for each export so that synchronizing methods is not necessary anymore.

It also lower down some random settings in MonitoringBulkTests because the previous settings almost always fill the bulk thread pool.

closes elastic/elasticsearch#1769

Original commit: elastic/x-pack-elasticsearch@f50c916f8b
This commit is contained in:
Tanguy Leroux 2016-03-29 17:56:11 +02:00
parent 8cd5560be4
commit 4007ff44b7
15 changed files with 269 additions and 266 deletions

View File

@ -172,15 +172,16 @@ integTest {
waitCondition = { node, ant ->
// HTTPS check is tricky to do, so we wait for the log file to indicate that the node is started
String waitForNodeStartProp = "waitForNodeStart${name}"
ant.waitfor(maxwait: '10', maxwaitunit: 'second', checkevery: '100', checkeveryunit: 'millisecond',
ant.waitfor(maxwait: '30', maxwaitunit: 'second', checkevery: '100', checkeveryunit: 'millisecond',
timeoutproperty: waitForNodeStartProp) {
and {
resourcecontains(resource: "${node.startLog.toString()}", substring: 'started')
resourcecontains(resource: "${node.startLog.toString()}", substring: 'monitoring service started')
}
}
if (ant.project.getProperty(waitForNodeStartProp)) {
println "Timed out when looking for bound_addresses in log file ${node.startLog.toString()}"
println "Timed out when looking for node startup in log file ${node.startLog.toString()}"
return false;
}
return true;

View File

@ -32,6 +32,7 @@ import java.util.Collections;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
/**
* This test checks that a Monitoring's HTTP exporter correctly exports to a monitoring cluster
@ -91,9 +92,21 @@ public class SmokeTestMonitoringWithShieldIT extends ESIntegTestCase {
// Checks that the monitoring index templates have been installed
assertBusy(() -> {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(MONITORING_PATTERN).get();
assertThat(response.getIndexTemplates().size(), equalTo(2));
assertThat(response.getIndexTemplates().size(), greaterThanOrEqualTo(2));
});
// Waits for monitoring indices to be created
assertBusy(() -> {
try {
assertThat(client().admin().indices().prepareExists(MONITORING_PATTERN).get().isExists(), equalTo(true));
} catch (Exception e) {
fail("exception when checking for monitoring documents: " + e.getMessage());
}
});
// Waits for indices to be ready
ensureYellow(MONITORING_PATTERN);
// Checks that the HTTP exporter has successfully exported some data
assertBusy(() -> {
try {

View File

@ -120,6 +120,8 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
@Override
protected void doStart() {
logger.info("monitoring service started");
for (Collector collector : collectors) {
collector.start();
}
@ -153,7 +155,11 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
}
for (Exporter exporter : exporters) {
try {
exporter.close();
} catch (Exception e) {
logger.error("failed to close exporter [{}]", e, exporter.name());
}
}
}

View File

@ -98,5 +98,23 @@ public abstract class ExportBulk {
throw exception;
}
}
@Override
protected void onClose() throws Exception {
ExportException exception = null;
for (ExportBulk bulk : bulks) {
try {
bulk.onClose();
} catch (ExportException e) {
if (exception == null) {
exception = new ExportException("failed to close export bulks");
}
exception.addExportException(e);
}
}
if (exception != null) {
throw exception;
}
}
}
}

View File

@ -12,9 +12,9 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.marvel.MarvelSettings;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class Exporter {
public abstract class Exporter implements AutoCloseable {
public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format";
public static final String BULK_TIMEOUT_SETTING = "bulk.timeout";
@ -24,6 +24,7 @@ public abstract class Exporter {
protected final ESLogger logger;
protected final @Nullable TimeValue bulkTimeout;
private AtomicBoolean closed = new AtomicBoolean(false);
public Exporter(String type, Config config) {
this.type = type;
@ -50,14 +51,18 @@ public abstract class Exporter {
*/
public abstract ExportBulk openBulk();
public void export(Collection<MonitoringDoc> monitoringDocs) throws Exception {
ExportBulk bulk = openBulk();
if (bulk != null) {
bulk.add(monitoringDocs).flush();
protected final boolean isClosed() {
return closed.get();
}
@Override
public void close() throws Exception {
if (closed.compareAndSet(false, true)) {
doClose();
}
}
public abstract void close();
protected abstract void doClose();
protected String settingFQN(String setting) {
return MarvelSettings.EXPORTERS_SETTINGS.getKey() + config.name + "." + setting;

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
@ -19,13 +18,15 @@ import org.elasticsearch.marvel.agent.exporter.local.LocalExporter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
/**
*
@ -35,8 +36,7 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
private final Map<String, Exporter.Factory> factories;
private final ClusterService clusterService;
private volatile CurrentExporters exporters = CurrentExporters.EMPTY;
private volatile Settings exporterSettings;
private final AtomicReference<Map<String, Exporter>> exporters;
@Inject
public Exporters(Settings settings, Map<String, Exporter.Factory> factories,
@ -46,51 +46,28 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
super(settings);
this.factories = factories;
this.clusterService = clusterService;
exporterSettings = MarvelSettings.EXPORTERS_SETTINGS.get(settings);
this.exporters = new AtomicReference<>(emptyMap());
clusterSettings.addSettingsUpdateConsumer(MarvelSettings.EXPORTERS_SETTINGS, this::setExportersSetting);
}
private synchronized void setExportersSetting(Settings exportersSetting) {
this.exporterSettings = exportersSetting;
private void setExportersSetting(Settings exportersSetting) {
if (this.lifecycleState() == Lifecycle.State.STARTED) {
CurrentExporters existing = exporters;
Settings updatedSettings = exportersSetting;
if (updatedSettings.names().isEmpty()) {
if (exportersSetting.names().isEmpty()) {
return;
}
this.exporters = initExporters(Settings.builder()
.put(existing.settings)
.put(updatedSettings)
.build());
existing.close(logger);
Map<String, Exporter> updated = initExporters(exportersSetting);
closeExporters(logger, this.exporters.getAndSet(updated));
}
}
@Override
protected void doStart() {
synchronized (this) {
exporters = initExporters(exporterSettings);
}
exporters.set(initExporters(MarvelSettings.EXPORTERS_SETTINGS.get(settings)));
}
@Override
protected void doStop() {
ElasticsearchException exception = null;
for (Exporter exporter : exporters) {
try {
exporter.close();
} catch (Exception e) {
logger.error("exporter [{}] failed to close cleanly", e, exporter.name());
if (exception == null) {
exception = new ElasticsearchException("failed to cleanly close exporters");
}
exception.addSuppressed(e);
}
}
if (exception != null) {
throw exception;
}
closeExporters(logger, exporters.get());
}
@Override
@ -98,18 +75,28 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
}
public Exporter getExporter(String name) {
return exporters.get(name);
return exporters.get().get(name);
}
@Override
public Iterator<Exporter> iterator() {
return exporters.iterator();
return exporters.get().values().iterator();
}
static void closeExporters(ESLogger logger, Map<String, Exporter> exporters) {
for (Exporter exporter : exporters.values()) {
try {
exporter.close();
} catch (Exception e) {
logger.error("failed to close exporter [{}]", e, exporter.name());
}
}
}
ExportBulk openBulk() {
List<ExportBulk> bulks = new ArrayList<>();
for (Exporter exporter : exporters) {
if (exporter.masterOnly() && !clusterService.localNode().isMasterNode()) {
for (Exporter exporter : this) {
if (exporter.masterOnly() && clusterService.state().nodes().isLocalNodeElectedMaster() == false) {
// the exporter is supposed to only run on the master node, but we're not
// the master node... so skipping
continue;
@ -128,8 +115,7 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
return bulks.isEmpty() ? null : new ExportBulk.Compound(bulks);
}
// TODO only rebuild the exporters that need to be updated according to settings
CurrentExporters initExporters(Settings settings) {
Map<String, Exporter> initExporters(Settings settings) {
Set<String> singletons = new HashSet<>();
Map<String, Exporter> exporters = new HashMap<>();
boolean hasDisabled = false;
@ -173,21 +159,20 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
exporters.put(config.name(), factories.get(LocalExporter.TYPE).create(config));
}
return new CurrentExporters(settings, exporters);
return exporters;
}
/**
* Exports a collection of monitoring documents using the configured exporters
*/
public synchronized void export(Collection<MonitoringDoc> docs) throws ExportException {
public void export(Collection<MonitoringDoc> docs) throws ExportException {
if (this.lifecycleState() != Lifecycle.State.STARTED) {
throw new ExportException("Export service is not started");
}
if (docs != null && docs.size() > 0) {
ExportBulk bulk = openBulk();
if (bulk == null) {
logger.debug("exporters are either not ready or faulty");
return;
throw new ExportException("exporters are either not ready or faulty");
}
try {
@ -197,36 +182,4 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
}
}
}
static class CurrentExporters implements Iterable<Exporter> {
static final CurrentExporters EMPTY = new CurrentExporters(Settings.EMPTY, Collections.emptyMap());
final Settings settings;
final Map<String, Exporter> exporters;
public CurrentExporters(Settings settings, Map<String, Exporter> exporters) {
this.settings = settings;
this.exporters = exporters;
}
@Override
public Iterator<Exporter> iterator() {
return exporters.values().iterator();
}
public Exporter get(String name) {
return exporters.get(name);
}
void close(ESLogger logger) {
for (Exporter exporter : exporters.values()) {
try {
exporter.close();
} catch (Exception e) {
logger.error("failed to close exporter [{}]", e, exporter.name());
}
}
}
}
}

View File

@ -166,7 +166,7 @@ public class HttpExporter extends Exporter {
}
@Override
public void close() {
public void doClose() {
if (keepAliveThread != null && keepAliveThread.isAlive()) {
keepAliveWorker.closed = true;
keepAliveThread.interrupt();

View File

@ -20,34 +20,36 @@ import org.elasticsearch.marvel.support.init.proxy.MonitoringClientProxy;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
* LocalBulk exports monitoring data in the local cluster using bulk requests. Its usage is not thread safe since the
* {@link LocalBulk#add(Collection)}, {@link LocalBulk#flush()} and {@link LocalBulk#onClose()} methods are not synchronized.
*/
public class LocalBulk extends ExportBulk {
private final ESLogger logger;
private final MonitoringClientProxy client;
private final ResolversRegistry resolvers;
private final AtomicBoolean closed;
private BulkRequestBuilder requestBuilder;
BulkRequestBuilder requestBuilder;
AtomicReference<State> state = new AtomicReference<>();
public LocalBulk(String name, ESLogger logger, MonitoringClientProxy client, ResolversRegistry resolvers) {
super(name);
this.logger = logger;
this.client = client;
this.resolvers = resolvers;
state.set(State.ACTIVE);
this.closed = new AtomicBoolean(false);
}
@Override
public synchronized ExportBulk add(Collection<MonitoringDoc> docs) throws ExportException {
public ExportBulk add(Collection<MonitoringDoc> docs) throws ExportException {
ExportException exception = null;
for (MonitoringDoc doc : docs) {
if (state.get() != State.ACTIVE) {
if (closed.get()) {
return this;
}
if (requestBuilder == null) {
@ -81,7 +83,7 @@ public class LocalBulk extends ExportBulk {
@Override
public void flush() throws ExportException {
if (state.get() != State.ACTIVE || requestBuilder == null || requestBuilder.numberOfActions() == 0) {
if (closed.get() || requestBuilder == null || requestBuilder.numberOfActions() == 0) {
return;
}
try {
@ -111,17 +113,10 @@ public class LocalBulk extends ExportBulk {
}
}
void terminate() {
state.set(State.TERMINATING);
synchronized (this) {
@Override
protected void onClose() throws Exception {
if (closed.compareAndSet(false, true)) {
requestBuilder = null;
state.compareAndSet(State.TERMINATING, State.TERMINATED);
}
}
enum State {
ACTIVE,
TERMINATING,
TERMINATED
}
}

View File

@ -13,11 +13,11 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
@ -36,6 +36,7 @@ import org.joda.time.DateTimeZone;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -53,8 +54,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
private final ResolversRegistry resolvers;
private final CleanerService cleanerService;
private volatile LocalBulk bulk;
private volatile boolean active = true;
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
/** Version number of built-in templates **/
private final Integer templateVersion;
@ -73,66 +73,41 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
}
resolvers = new ResolversRegistry(config.settings());
bulk = resolveBulk(clusterService.state(), bulk);
clusterService.add(this);
cleanerService.add(this);
}
LocalBulk getBulk() {
return bulk;
}
ResolversRegistry getResolvers() {
return resolvers;
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
LocalBulk currentBulk = bulk;
LocalBulk newBulk = resolveBulk(event.state(), currentBulk);
// yes, this method will always be called by the cluster event loop thread
// but we need to sync with the {@code #close()} mechanism
synchronized (this) {
if (active) {
bulk = newBulk;
} else if (newBulk != null) {
newBulk.terminate();
}
if (currentBulk == null && bulk != null) {
logger.debug("local exporter [{}] - started!", name());
}
if (bulk != currentBulk && currentBulk != null) {
logger.debug("local exporter [{}] - stopped!", name());
currentBulk.terminate();
}
if (state.get() == State.INITIALIZED) {
resolveBulk(event.state());
}
}
@Override
public ExportBulk openBulk() {
return bulk;
if (state.get() != State.RUNNING) {
return null;
}
return resolveBulk(clusterService.state());
}
// requires synchronization due to cluster state update events (see above)
@Override
public synchronized void close() {
active = false;
public void doClose() {
if (state.getAndSet(State.TERMINATED) != State.TERMINATED) {
logger.debug("local exporter [{}] - stopped", name());
clusterService.remove(this);
cleanerService.remove(this);
if (bulk != null) {
try {
bulk.terminate();
bulk = null;
} catch (Exception e) {
logger.error("local exporter [{}] - failed to cleanly close bulk", e, name());
}
}
}
LocalBulk resolveBulk(ClusterState clusterState, LocalBulk currentBulk) {
LocalBulk resolveBulk(ClusterState clusterState) {
if (clusterService.localNode() == null || clusterState == null) {
return currentBulk;
return null;
}
if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
@ -148,18 +123,17 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
// if this is not the master, we'll just look to see if the monitoring timestamped template is already
// installed and if so, if it has a compatible version. If it is (installed and compatible)
// we'll be able to start this exporter. Otherwise, we'll just wait for a new cluster state.
if (!clusterService.localNode().isMasterNode()) {
if (clusterService.state().nodes().isLocalNodeElectedMaster() == false) {
// We only need to check the index template for timestamped indices
if (!templateInstalled) {
if (templateInstalled == false) {
// the template for timestamped indices is not yet installed in the given cluster state, we'll wait.
logger.debug("local exporter [{}] - monitoring index template does not exist, so service cannot start", name());
return null;
}
// ok.. we have a compatible template... we can start
logger.debug("local exporter [{}] - started!", name());
return currentBulk != null ? currentBulk : new LocalBulk(name(), logger, client, resolvers);
}
logger.debug("local exporter [{}] - monitoring index template found, service can start", name());
} else {
// we are on master
//
@ -170,9 +144,9 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
}
// Install the index template for timestamped indices first, so that other nodes can ship data
if (!templateInstalled) {
logger.debug("local exporter [{}] - could not find existing monitoring template for timestamped indices, installing a new one",
name());
if (templateInstalled == false) {
logger.debug("local exporter [{}] - could not find existing monitoring template for timestamped indices, " +
"installing a new one", name());
putTemplate(templateName, MarvelTemplateUtils.loadTimestampedIndexTemplate());
// we'll get that template on the next cluster state update
return null;
@ -180,15 +154,21 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
// Install the index template for data index
templateName = MarvelTemplateUtils.dataTemplateName(templateVersion);
if (!hasTemplate(templateName, clusterState)) {
logger.debug("local exporter [{}] - could not find existing monitoring template for data index, installing a new one", name());
if (hasTemplate(templateName, clusterState) == false) {
logger.debug("local exporter [{}] - could not find existing monitoring template for data index, " +
"installing a new one", name());
putTemplate(templateName, MarvelTemplateUtils.loadDataIndexTemplate());
// we'll get that template on the next cluster state update
return null;
}
// ok.. we have a compatible templates... we can start
return currentBulk != null ? currentBulk : new LocalBulk(name(), logger, client, resolvers);
logger.debug("local exporter [{}] - monitoring index template found on master node, service can start", name());
}
if (state.compareAndSet(State.INITIALIZED, State.RUNNING)) {
logger.debug("local exporter [{}] - started!", name());
}
return new LocalBulk(name(), logger, client, resolvers);
}
/**
@ -245,12 +225,12 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
@Override
public void onCleanUpIndices(TimeValue retention) {
if (bulk == null) {
if (state.get() != State.RUNNING) {
logger.debug("local exporter [{}] - not ready yet", name());
return;
}
if (clusterService.localNode().isMasterNode()) {
if (clusterService.state().nodes().isLocalNodeElectedMaster()) {
// Reference date time will be compared to index.creation_date settings,
// that's why it must be in UTC
DateTime expiration = new DateTime(DateTimeZone.UTC).minus(retention.millis());
@ -345,4 +325,10 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
return new LocalExporter(config, client, clusterService, cleanerService);
}
}
enum State {
INITIALIZED,
RUNNING,
TERMINATED
}
}

View File

@ -19,11 +19,12 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -67,16 +68,16 @@ public class MonitoringBulkTests extends MarvelIntegTestCase {
* This test creates N threads that execute a random number of monitoring bulk requests.
*/
public void testConcurrentRequests() throws Exception {
final int numberThreads = randomIntBetween(3, 10);
final int numberThreads = randomIntBetween(3, 5);
final Thread[] threads = new Thread[numberThreads];
final CountDownLatch latch = new CountDownLatch(numberThreads + 1);
final List<Throwable> exceptions = new CopyOnWriteArrayList<>();
AtomicInteger total = new AtomicInteger(0);
AtomicLong total = new AtomicLong(0);
logger.info("--> using {} concurrent clients to execute requests", threads.length);
for (int i = 0; i < threads.length; i++) {
final int nbRequests = randomIntBetween(3, 10);
final int nbRequests = randomIntBetween(1, 5);
threads[i] = new Thread(new AbstractRunnable() {
@Override
@ -92,7 +93,7 @@ public class MonitoringBulkTests extends MarvelIntegTestCase {
for (int j = 0; j < nbRequests; j++) {
MonitoringBulkRequestBuilder requestBuilder = monitoringClient().prepareMonitoringBulk();
int numDocs = scaledRandomIntBetween(10, 1000);
int numDocs = scaledRandomIntBetween(10, 50);
for (int k = 0; k < numDocs; k++) {
MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString());
doc.setType("concurrent");
@ -102,7 +103,7 @@ public class MonitoringBulkTests extends MarvelIntegTestCase {
total.addAndGet(numDocs);
MonitoringBulkResponse response = requestBuilder.get();
assertThat(response.getError(), is(nullValue()));
assertNull (response.getError());
}
}
}, "export_thread_" + i);
@ -119,10 +120,7 @@ public class MonitoringBulkTests extends MarvelIntegTestCase {
}
assertThat(exceptions, empty());
refresh();
SearchResponse countResponse = client().prepareSearch().setTypes("concurrent").setSize(0).get();
assertHitCount(countResponse, total.get());
awaitMarvelDocsCount(greaterThanOrEqualTo(total.get()), "concurrent");
}
public void testUnsupportedSystem() throws Exception {

View File

@ -22,6 +22,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.dataTemplateName;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.indexTemplateName;
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.hamcrest.Matchers.notNullValue;
@ClusterScope(scope = TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public abstract class AbstractExporterTemplateTestCase extends MarvelIntegTestCase {
@ -156,12 +157,14 @@ public abstract class AbstractExporterTemplateTestCase extends MarvelIntegTestCa
protected void doExporting() throws Exception {
Collector collector = internalCluster().getInstance(ClusterStatsCollector.class);
exporter().export(collector.collect());
}
assertNotNull(collector);
private Exporter exporter() {
Exporters exporters = internalCluster().getInstance(Exporters.class);
return exporters.iterator().next();
assertNotNull(exporters);
// Wait for exporting bulks to be ready to export
assertBusy(() -> assertThat(exporters.openBulk(), notNullValue()));
exporters.export(collector.collect());
}
private String currentDataIndexName() {

View File

@ -7,8 +7,10 @@ package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
@ -77,48 +79,40 @@ public class ExportersTests extends ESTestCase {
public void testInitExportersDefault() throws Exception {
Exporter.Factory factory = new TestFactory("_type", true);
factories.put("_type", factory);
Exporters.CurrentExporters internalExporters = exporters.initExporters(Settings.builder()
.build());
Map<String, Exporter> internalExporters = exporters.initExporters(Settings.builder().build());
assertThat(internalExporters, notNullValue());
assertThat(internalExporters.settings.getAsMap().size(), is(0));
assertThat(internalExporters.exporters.size(), is(1));
assertThat(internalExporters.exporters, hasKey("default_" + LocalExporter.TYPE));
assertThat(internalExporters.exporters.get("default_" + LocalExporter.TYPE), instanceOf(LocalExporter.class));
assertThat(internalExporters.size(), is(1));
assertThat(internalExporters, hasKey("default_" + LocalExporter.TYPE));
assertThat(internalExporters.get("default_" + LocalExporter.TYPE), instanceOf(LocalExporter.class));
}
public void testInitExportersSingle() throws Exception {
Exporter.Factory factory = new TestFactory("_type", true);
factories.put("_type", factory);
Exporters.CurrentExporters internalExporters = exporters.initExporters(Settings.builder()
Map<String, Exporter> internalExporters = exporters.initExporters(Settings.builder()
.put("_name.type", "_type")
.build());
assertThat(internalExporters, notNullValue());
assertThat(internalExporters.settings.getAsMap().size(), is(1));
assertThat(internalExporters.settings.getAsMap(), hasEntry("_name.type", "_type"));
assertThat(internalExporters.exporters.size(), is(1));
assertThat(internalExporters.exporters, hasKey("_name"));
assertThat(internalExporters.exporters.get("_name"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.exporters.get("_name").type, is("_type"));
assertThat(internalExporters.size(), is(1));
assertThat(internalExporters, hasKey("_name"));
assertThat(internalExporters.get("_name"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.get("_name").type, is("_type"));
}
public void testInitExportersSingleDisabled() throws Exception {
Exporter.Factory factory = new TestFactory("_type", true);
factories.put("_type", factory);
Exporters.CurrentExporters internalExporters = exporters.initExporters(Settings.builder()
Map<String, Exporter> internalExporters = exporters.initExporters(Settings.builder()
.put("_name.type", "_type")
.put("_name.enabled", false)
.build());
assertThat(internalExporters, notNullValue());
assertThat(internalExporters.settings.getAsMap().size(), is(2));
assertThat(internalExporters.settings.getAsMap(), hasEntry("_name.type", "_type"));
assertThat(internalExporters.settings.getAsMap(), hasEntry("_name.enabled", "false"));
// the only configured exporter is disabled... yet we intentionally don't fallback on the default
assertThat(internalExporters.exporters.size(), is(0));
assertThat(internalExporters.size(), is(0));
}
public void testInitExportersSingleUnknownType() throws Exception {
@ -146,22 +140,19 @@ public class ExportersTests extends ESTestCase {
public void testInitExportersMultipleSameType() throws Exception {
Exporter.Factory factory = new TestFactory("_type", false);
factories.put("_type", factory);
Exporters.CurrentExporters internalExporters = exporters.initExporters(Settings.builder()
Map<String, Exporter> internalExporters = exporters.initExporters(Settings.builder()
.put("_name0.type", "_type")
.put("_name1.type", "_type")
.build());
assertThat(internalExporters, notNullValue());
assertThat(internalExporters.settings.getAsMap().size(), is(2));
assertThat(internalExporters.settings.getAsMap(), hasEntry("_name0.type", "_type"));
assertThat(internalExporters.settings.getAsMap(), hasEntry("_name1.type", "_type"));
assertThat(internalExporters.exporters.size(), is(2));
assertThat(internalExporters.exporters, hasKey("_name0"));
assertThat(internalExporters.exporters.get("_name0"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.exporters.get("_name0").type, is("_type"));
assertThat(internalExporters.exporters, hasKey("_name1"));
assertThat(internalExporters.exporters.get("_name1"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.exporters.get("_name1").type, is("_type"));
assertThat(internalExporters.size(), is(2));
assertThat(internalExporters, hasKey("_name0"));
assertThat(internalExporters.get("_name0"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.get("_name0").type, is("_type"));
assertThat(internalExporters, hasKey("_name1"));
assertThat(internalExporters.get("_name1"), instanceOf(TestFactory.TestExporter.class));
assertThat(internalExporters.get("_name1").type, is("_type"));
}
public void testInitExportersMultipleSameTypeSingletons() throws Exception {
@ -184,12 +175,15 @@ public class ExportersTests extends ESTestCase {
final AtomicReference<Settings> settingsHolder = new AtomicReference<>();
exporters = new Exporters(Settings.builder()
Settings nodeSettings = Settings.builder()
.put("xpack.monitoring.agent.exporters._name0.type", "_type")
.put("xpack.monitoring.agent.exporters._name1.type", "_type")
.build(), factories, clusterService, clusterSettings) {
.build();
clusterSettings = new ClusterSettings(nodeSettings, new HashSet<>(Arrays.asList(MarvelSettings.EXPORTERS_SETTINGS)));
exporters = new Exporters(nodeSettings, factories, clusterService, clusterSettings) {
@Override
CurrentExporters initExporters(Settings settings) {
Map<String, Exporter> initExporters(Settings settings) {
settingsHolder.set(settings);
return super.initExporters(settings);
}
@ -227,9 +221,9 @@ public class ExportersTests extends ESTestCase {
.build(), factories, clusterService, clusterSettings);
exporters.start();
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(localNode.isMasterNode()).thenReturn(true);
when(clusterService.localNode()).thenReturn(localNode);
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
when(nodes.isLocalNodeElectedMaster()).thenReturn(true);
when(clusterService.state()).thenReturn(ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build());
ExportBulk bulk = exporters.openBulk();
assertThat(bulk, notNullValue());
@ -251,9 +245,9 @@ public class ExportersTests extends ESTestCase {
.build(), factories, clusterService, clusterSettings);
exporters.start();
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(localNode.isMasterNode()).thenReturn(false);
when(clusterService.localNode()).thenReturn(localNode);
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
when(nodes.isLocalNodeElectedMaster()).thenReturn(false);
when(clusterService.state()).thenReturn(ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build());
ExportBulk bulk = exporters.openBulk();
assertThat(bulk, notNullValue());
@ -342,17 +336,13 @@ public class ExportersTests extends ESTestCase {
super(type, config);
}
@Override
public void export(Collection<MonitoringDoc> monitoringDocs) throws Exception {
}
@Override
public ExportBulk openBulk() {
return mock(ExportBulk.class);
}
@Override
public void close() {
public void doClose() {
}
}
}
@ -406,7 +396,7 @@ public class ExportersTests extends ESTestCase {
}
@Override
public void close() {
public void doClose() {
}
public int getExportedCount() {

View File

@ -42,6 +42,7 @@ import org.junit.Before;
import java.io.IOException;
import java.net.BindException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -55,6 +56,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public class HttpExporterTests extends MarvelIntegTestCase {
@ -102,11 +104,10 @@ public class HttpExporterTests extends MarvelIntegTestCase {
.put("xpack.monitoring.agent.exporters._http.connection.keep_alive", false)
.put("xpack.monitoring.agent.exporters._http.update_mappings", false);
String agentNode = internalCluster().startNode(builder);
HttpExporter exporter = getExporter(agentNode);
internalCluster().startNode(builder);
final int nbDocs = randomIntBetween(1, 25);
exporter.export(newRandomMarvelDocs(nbDocs));
export(newRandomMarvelDocs(nbDocs));
assertThat(webServer.getRequestCount(), equalTo(6));
@ -186,7 +187,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
logger.info("--> exporting data");
HttpExporter exporter = getExporter(agentNode);
assertThat(exporter.supportedClusterVersion, is(false));
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
export(Collections.singletonList(newRandomMarvelDoc()));
assertThat(exporter.supportedClusterVersion, is(true));
assertThat(webServer.getRequestCount(), equalTo(6));
@ -251,7 +252,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
enqueueResponse(secondWebServer, 200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
logger.info("--> exporting a second event");
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
export(Collections.singletonList(newRandomMarvelDoc()));
assertThat(secondWebServer.getRequestCount(), equalTo(5));
@ -301,7 +302,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
logger.info("--> exporting data");
HttpExporter exporter = getExporter(agentNode);
assertThat(exporter.supportedClusterVersion, is(false));
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
assertNull(exporter.openBulk());
assertThat(exporter.supportedClusterVersion, is(false));
assertThat(webServer.getRequestCount(), equalTo(1));
@ -333,7 +334,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
HttpExporter exporter = getExporter(agentNode);
MonitoringDoc doc = newRandomMarvelDoc();
exporter.export(Collections.singletonList(doc));
export(Collections.singletonList(doc));
assertThat(webServer.getRequestCount(), equalTo(6));
@ -385,8 +386,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}");
doc = newRandomMarvelDoc();
exporter = getExporter(agentNode);
exporter.export(Collections.singletonList(doc));
export(Collections.singletonList(doc));
String expectedMarvelIndex = ".monitoring-es-" + MarvelTemplateUtils.TEMPLATE_VERSION + "-"
+ DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(doc.getTimestamp());
@ -440,6 +440,15 @@ public class HttpExporterTests extends MarvelIntegTestCase {
assertTrue(resolved.equals(expected));
}
private void export(Collection<MonitoringDoc> docs) throws Exception {
Exporters exporters = internalCluster().getInstance(Exporters.class);
assertThat(exporters, notNullValue());
// Wait for exporting bulks to be ready to export
assertBusy(() -> exporters.forEach(exporter -> assertThat(exporter.openBulk(), notNullValue())));
exporters.export(docs);
}
private HttpExporter getExporter(String nodeName) {
Exporters exporters = internalCluster().getInstance(Exporters.class, nodeName);
return (HttpExporter) exporters.iterator().next();

View File

@ -31,6 +31,7 @@ import org.joda.time.format.DateTimeFormat;
import org.junit.After;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -73,10 +74,8 @@ public class LocalExporterTests extends MarvelIntegTestCase {
.build());
securedEnsureGreen();
Exporter exporter = getLocalExporter("_local");
logger.debug("--> exporting a single monitoring doc");
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
export(Collections.singletonList(newRandomMarvelDoc()));
awaitMarvelDocsCount(is(1L));
deleteMarvelIndices();
@ -87,7 +86,7 @@ public class LocalExporterTests extends MarvelIntegTestCase {
}
logger.debug("--> exporting {} monitoring docs", monitoringDocs.size());
exporter.export(monitoringDocs);
export(monitoringDocs);
awaitMarvelDocsCount(is((long) monitoringDocs.size()));
SearchResponse response = client().prepareSearch(MONITORING_INDICES_PREFIX + "*").get();
@ -131,7 +130,7 @@ public class LocalExporterTests extends MarvelIntegTestCase {
assertThat(exporter.getResolvers().getResolver(doc).index(doc), equalTo(indexName));
logger.debug("--> exporting a random monitoring document");
exporter.export(Collections.singletonList(doc));
export(Collections.singletonList(doc));
awaitIndexExists(indexName);
logger.debug("--> updates the timestamp");
@ -144,7 +143,7 @@ public class LocalExporterTests extends MarvelIntegTestCase {
logger.debug("--> exporting the document again (this time with the the new index name time format [{}], expecting index name [{}]",
timeFormat, indexName);
exporter.export(Collections.singletonList(doc));
export(Collections.singletonList(doc));
awaitIndexExists(indexName);
}
@ -155,19 +154,21 @@ public class LocalExporterTests extends MarvelIntegTestCase {
.build());
securedEnsureGreen();
LocalExporter exporter = getLocalExporter("_local");
logger.debug("--> exporting a single monitoring doc");
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
export(Collections.singletonList(newRandomMarvelDoc()));
awaitMarvelDocsCount(is(1L));
assertNull(exporter.getBulk().requestBuilder);
logger.debug("--> closing monitoring indices");
assertAcked(client().admin().indices().prepareClose(MONITORING_INDICES_PREFIX + "*").get());
try {
logger.debug("--> exporting a second monitoring doc");
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
LocalExporter exporter = getLocalExporter("_local");
LocalBulk bulk = (LocalBulk) exporter.openBulk();
bulk.add(Collections.singletonList(newRandomMarvelDoc()));
bulk.close(true);
} catch (ElasticsearchException e) {
assertThat(e.getMessage(), containsString("failed to flush export bulk [_local]"));
assertThat(e.getCause(), instanceOf(ExportException.class));
@ -177,10 +178,18 @@ public class LocalExporterTests extends MarvelIntegTestCase {
for (ExportException c : cause) {
assertThat(c.getMessage(), containsString("IndexClosedException[closed]"));
}
assertNull(exporter.getBulk().requestBuilder);
}
}
private void export(Collection<MonitoringDoc> docs) throws Exception {
Exporters exporters = internalCluster().getInstance(Exporters.class);
assertThat(exporters, notNullValue());
// Wait for exporting bulks to be ready to export
assertBusy(() -> exporters.forEach(exporter -> assertThat(exporter.openBulk(), notNullValue())));
exporters.export(docs);
}
private LocalExporter getLocalExporter(String name) throws Exception {
final Exporter exporter = internalCluster().getInstance(Exporters.class).getExporter(name);
assertThat(exporter, notNullValue());

View File

@ -1,3 +1,20 @@
---
setup:
- do:
# Reduce the interval time so that monitoring
# indices are created faster
cluster.put_settings:
body:
transient:
xpack.monitoring.agent.interval: 1s
flat_settings: true
- do:
# Waits for the monitoring data index to be available:
# it indicates that the local exporter is ready
cluster.health:
index: ".monitoring-data-*"
wait_for_active_shards: 1
---
"Bulk indexing of monitoring data":
# Get the current version
@ -53,7 +70,7 @@
body:
- '{"index": {}}'
- '{"field_1": "value_1"}'
- '{"index": {"_type": "test_type"}}'
- '{"index": {"_type": "custom_type"}}'
- '{"field_1": "value_2"}'
- '{"index": {}}'
- '{"field_1": "value_3"}'
@ -73,6 +90,6 @@
- do:
search:
index: .monitoring-kibana-*
type: test_type
type: custom_type
- match: { hits.total: 1 }