diff --git a/elasticsearch/qa/smoke-test-plugins-ssl/build.gradle b/elasticsearch/qa/smoke-test-plugins-ssl/build.gradle index fc2b3515258..9e0675beaa0 100644 --- a/elasticsearch/qa/smoke-test-plugins-ssl/build.gradle +++ b/elasticsearch/qa/smoke-test-plugins-ssl/build.gradle @@ -172,15 +172,16 @@ integTest { waitCondition = { node, ant -> // HTTPS check is tricky to do, so we wait for the log file to indicate that the node is started String waitForNodeStartProp = "waitForNodeStart${name}" - ant.waitfor(maxwait: '10', maxwaitunit: 'second', checkevery: '100', checkeveryunit: 'millisecond', + ant.waitfor(maxwait: '30', maxwaitunit: 'second', checkevery: '100', checkeveryunit: 'millisecond', timeoutproperty: waitForNodeStartProp) { and { resourcecontains(resource: "${node.startLog.toString()}", substring: 'started') + resourcecontains(resource: "${node.startLog.toString()}", substring: 'monitoring service started') } } if (ant.project.getProperty(waitForNodeStartProp)) { - println "Timed out when looking for bound_addresses in log file ${node.startLog.toString()}" + println "Timed out when looking for node startup in log file ${node.startLog.toString()}" return false; } return true; diff --git a/elasticsearch/qa/smoke-test-plugins-ssl/src/test/java/org/elasticsearch/smoketest/SmokeTestMonitoringWithShieldIT.java b/elasticsearch/qa/smoke-test-plugins-ssl/src/test/java/org/elasticsearch/smoketest/SmokeTestMonitoringWithShieldIT.java index a64fb1f6667..dd1aa46f8fd 100644 --- a/elasticsearch/qa/smoke-test-plugins-ssl/src/test/java/org/elasticsearch/smoketest/SmokeTestMonitoringWithShieldIT.java +++ b/elasticsearch/qa/smoke-test-plugins-ssl/src/test/java/org/elasticsearch/smoketest/SmokeTestMonitoringWithShieldIT.java @@ -32,6 +32,7 @@ import java.util.Collections; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; /** * This test checks that a Monitoring's HTTP exporter correctly exports to a monitoring cluster @@ -91,9 +92,21 @@ public class SmokeTestMonitoringWithShieldIT extends ESIntegTestCase { // Checks that the monitoring index templates have been installed assertBusy(() -> { GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(MONITORING_PATTERN).get(); - assertThat(response.getIndexTemplates().size(), equalTo(2)); + assertThat(response.getIndexTemplates().size(), greaterThanOrEqualTo(2)); }); + // Waits for monitoring indices to be created + assertBusy(() -> { + try { + assertThat(client().admin().indices().prepareExists(MONITORING_PATTERN).get().isExists(), equalTo(true)); + } catch (Exception e) { + fail("exception when checking for monitoring documents: " + e.getMessage()); + } + }); + + // Waits for indices to be ready + ensureYellow(MONITORING_PATTERN); + // Checks that the HTTP exporter has successfully exported some data assertBusy(() -> { try { diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java index 908bf3745d3..b7dfbc57193 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/AgentService.java @@ -120,6 +120,8 @@ public class AgentService extends AbstractLifecycleComponent { @Override protected void doStart() { + logger.info("monitoring service started"); + for (Collector collector : collectors) { collector.start(); } @@ -153,7 +155,11 @@ public class AgentService extends AbstractLifecycleComponent { } for (Exporter exporter : exporters) { - exporter.close(); + try { + exporter.close(); + } catch (Exception e) { + logger.error("failed to close exporter [{}]", e, exporter.name()); + } } } diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java index f02a11ac95e..feb3a1b3007 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java @@ -98,5 +98,23 @@ public abstract class ExportBulk { throw exception; } } + + @Override + protected void onClose() throws Exception { + ExportException exception = null; + for (ExportBulk bulk : bulks) { + try { + bulk.onClose(); + } catch (ExportException e) { + if (exception == null) { + exception = new ExportException("failed to close export bulks"); + } + exception.addExportException(e); + } + } + if (exception != null) { + throw exception; + } + } } } diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java index ffce9f91cdd..aa0fc7785cf 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporter.java @@ -12,9 +12,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.marvel.MarvelSettings; -import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; -public abstract class Exporter { +public abstract class Exporter implements AutoCloseable { public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format"; public static final String BULK_TIMEOUT_SETTING = "bulk.timeout"; @@ -24,6 +24,7 @@ public abstract class Exporter { protected final ESLogger logger; protected final @Nullable TimeValue bulkTimeout; + private AtomicBoolean closed = new AtomicBoolean(false); public Exporter(String type, Config config) { this.type = type; @@ -50,14 +51,18 @@ public abstract class Exporter { */ public abstract ExportBulk openBulk(); - public void export(Collection monitoringDocs) throws Exception { - ExportBulk bulk = openBulk(); - if (bulk != null) { - bulk.add(monitoringDocs).flush(); + protected final boolean isClosed() { + return closed.get(); + } + + @Override + public void close() throws Exception { + if (closed.compareAndSet(false, true)) { + doClose(); } } - public abstract void close(); + protected abstract void doClose(); protected String settingFQN(String setting) { return MarvelSettings.EXPORTERS_SETTINGS.getKey() + config.name + "." + setting; diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java index 3db429fa5dd..a63393dc8f8 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/Exporters.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.marvel.agent.exporter; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; @@ -19,13 +18,15 @@ import org.elasticsearch.marvel.agent.exporter.local.LocalExporter; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Collections.emptyMap; /** * @@ -35,8 +36,7 @@ public class Exporters extends AbstractLifecycleComponent implements private final Map factories; private final ClusterService clusterService; - private volatile CurrentExporters exporters = CurrentExporters.EMPTY; - private volatile Settings exporterSettings; + private final AtomicReference> exporters; @Inject public Exporters(Settings settings, Map factories, @@ -46,51 +46,28 @@ public class Exporters extends AbstractLifecycleComponent implements super(settings); this.factories = factories; this.clusterService = clusterService; - exporterSettings = MarvelSettings.EXPORTERS_SETTINGS.get(settings); + this.exporters = new AtomicReference<>(emptyMap()); clusterSettings.addSettingsUpdateConsumer(MarvelSettings.EXPORTERS_SETTINGS, this::setExportersSetting); } - private synchronized void setExportersSetting(Settings exportersSetting) { - this.exporterSettings = exportersSetting; + private void setExportersSetting(Settings exportersSetting) { if (this.lifecycleState() == Lifecycle.State.STARTED) { - - CurrentExporters existing = exporters; - Settings updatedSettings = exportersSetting; - if (updatedSettings.names().isEmpty()) { + if (exportersSetting.names().isEmpty()) { return; } - this.exporters = initExporters(Settings.builder() - .put(existing.settings) - .put(updatedSettings) - .build()); - existing.close(logger); + Map updated = initExporters(exportersSetting); + closeExporters(logger, this.exporters.getAndSet(updated)); } } @Override protected void doStart() { - synchronized (this) { - exporters = initExporters(exporterSettings); - } + exporters.set(initExporters(MarvelSettings.EXPORTERS_SETTINGS.get(settings))); } @Override protected void doStop() { - ElasticsearchException exception = null; - for (Exporter exporter : exporters) { - try { - exporter.close(); - } catch (Exception e) { - logger.error("exporter [{}] failed to close cleanly", e, exporter.name()); - if (exception == null) { - exception = new ElasticsearchException("failed to cleanly close exporters"); - } - exception.addSuppressed(e); - } - } - if (exception != null) { - throw exception; - } + closeExporters(logger, exporters.get()); } @Override @@ -98,18 +75,28 @@ public class Exporters extends AbstractLifecycleComponent implements } public Exporter getExporter(String name) { - return exporters.get(name); + return exporters.get().get(name); } @Override public Iterator iterator() { - return exporters.iterator(); + return exporters.get().values().iterator(); + } + + static void closeExporters(ESLogger logger, Map exporters) { + for (Exporter exporter : exporters.values()) { + try { + exporter.close(); + } catch (Exception e) { + logger.error("failed to close exporter [{}]", e, exporter.name()); + } + } } ExportBulk openBulk() { List bulks = new ArrayList<>(); - for (Exporter exporter : exporters) { - if (exporter.masterOnly() && !clusterService.localNode().isMasterNode()) { + for (Exporter exporter : this) { + if (exporter.masterOnly() && clusterService.state().nodes().isLocalNodeElectedMaster() == false) { // the exporter is supposed to only run on the master node, but we're not // the master node... so skipping continue; @@ -128,8 +115,7 @@ public class Exporters extends AbstractLifecycleComponent implements return bulks.isEmpty() ? null : new ExportBulk.Compound(bulks); } - // TODO only rebuild the exporters that need to be updated according to settings - CurrentExporters initExporters(Settings settings) { + Map initExporters(Settings settings) { Set singletons = new HashSet<>(); Map exporters = new HashMap<>(); boolean hasDisabled = false; @@ -173,21 +159,20 @@ public class Exporters extends AbstractLifecycleComponent implements exporters.put(config.name(), factories.get(LocalExporter.TYPE).create(config)); } - return new CurrentExporters(settings, exporters); + return exporters; } /** * Exports a collection of monitoring documents using the configured exporters */ - public synchronized void export(Collection docs) throws ExportException { + public void export(Collection docs) throws ExportException { if (this.lifecycleState() != Lifecycle.State.STARTED) { throw new ExportException("Export service is not started"); } if (docs != null && docs.size() > 0) { ExportBulk bulk = openBulk(); if (bulk == null) { - logger.debug("exporters are either not ready or faulty"); - return; + throw new ExportException("exporters are either not ready or faulty"); } try { @@ -197,36 +182,4 @@ public class Exporters extends AbstractLifecycleComponent implements } } } - - static class CurrentExporters implements Iterable { - - static final CurrentExporters EMPTY = new CurrentExporters(Settings.EMPTY, Collections.emptyMap()); - - final Settings settings; - final Map exporters; - - public CurrentExporters(Settings settings, Map exporters) { - this.settings = settings; - this.exporters = exporters; - } - - @Override - public Iterator iterator() { - return exporters.values().iterator(); - } - - public Exporter get(String name) { - return exporters.get(name); - } - - void close(ESLogger logger) { - for (Exporter exporter : exporters.values()) { - try { - exporter.close(); - } catch (Exception e) { - logger.error("failed to close exporter [{}]", e, exporter.name()); - } - } - } - } } diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java index 5f92489d0bd..2ed15395501 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java @@ -166,7 +166,7 @@ public class HttpExporter extends Exporter { } @Override - public void close() { + public void doClose() { if (keepAliveThread != null && keepAliveThread.isAlive()) { keepAliveWorker.closed = true; keepAliveThread.interrupt(); diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java index 5dbe433b082..2b739d324ec 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java @@ -20,34 +20,36 @@ import org.elasticsearch.marvel.support.init.proxy.MonitoringClientProxy; import java.util.Arrays; import java.util.Collection; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicBoolean; /** - * + * LocalBulk exports monitoring data in the local cluster using bulk requests. Its usage is not thread safe since the + * {@link LocalBulk#add(Collection)}, {@link LocalBulk#flush()} and {@link LocalBulk#onClose()} methods are not synchronized. */ public class LocalBulk extends ExportBulk { private final ESLogger logger; private final MonitoringClientProxy client; private final ResolversRegistry resolvers; + private final AtomicBoolean closed; + + private BulkRequestBuilder requestBuilder; - BulkRequestBuilder requestBuilder; - AtomicReference state = new AtomicReference<>(); public LocalBulk(String name, ESLogger logger, MonitoringClientProxy client, ResolversRegistry resolvers) { super(name); this.logger = logger; this.client = client; this.resolvers = resolvers; - state.set(State.ACTIVE); + this.closed = new AtomicBoolean(false); } @Override - public synchronized ExportBulk add(Collection docs) throws ExportException { + public ExportBulk add(Collection docs) throws ExportException { ExportException exception = null; for (MonitoringDoc doc : docs) { - if (state.get() != State.ACTIVE) { + if (closed.get()) { return this; } if (requestBuilder == null) { @@ -68,7 +70,7 @@ public class LocalBulk extends ExportBulk { if (exception == null) { exception = new ExportException("failed to add documents to export bulk [{}]", name); } - exception.addExportException(new ExportException("failed to add document [{}]", e, doc, name)); + exception.addExportException(new ExportException("failed to add document [{}]", e, doc, name)); } } @@ -81,7 +83,7 @@ public class LocalBulk extends ExportBulk { @Override public void flush() throws ExportException { - if (state.get() != State.ACTIVE || requestBuilder == null || requestBuilder.numberOfActions() == 0) { + if (closed.get() || requestBuilder == null || requestBuilder.numberOfActions() == 0) { return; } try { @@ -111,17 +113,10 @@ public class LocalBulk extends ExportBulk { } } - void terminate() { - state.set(State.TERMINATING); - synchronized (this) { + @Override + protected void onClose() throws Exception { + if (closed.compareAndSet(false, true)) { requestBuilder = null; - state.compareAndSet(State.TERMINATING, State.TERMINATED); } } - - enum State { - ACTIVE, - TERMINATING, - TERMINATED - } } diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java index e012c9203eb..1e38b8dfd7e 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporter.java @@ -13,11 +13,11 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.regex.Regex; @@ -36,6 +36,7 @@ import org.joda.time.DateTimeZone; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -53,8 +54,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle private final ResolversRegistry resolvers; private final CleanerService cleanerService; - private volatile LocalBulk bulk; - private volatile boolean active = true; + private final AtomicReference state = new AtomicReference<>(State.INITIALIZED); /** Version number of built-in templates **/ private final Integer templateVersion; @@ -73,66 +73,41 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle } resolvers = new ResolversRegistry(config.settings()); - bulk = resolveBulk(clusterService.state(), bulk); clusterService.add(this); cleanerService.add(this); } - LocalBulk getBulk() { - return bulk; - } - ResolversRegistry getResolvers() { return resolvers; } @Override public void clusterChanged(ClusterChangedEvent event) { - LocalBulk currentBulk = bulk; - LocalBulk newBulk = resolveBulk(event.state(), currentBulk); - - // yes, this method will always be called by the cluster event loop thread - // but we need to sync with the {@code #close()} mechanism - synchronized (this) { - if (active) { - bulk = newBulk; - } else if (newBulk != null) { - newBulk.terminate(); - } - if (currentBulk == null && bulk != null) { - logger.debug("local exporter [{}] - started!", name()); - } - if (bulk != currentBulk && currentBulk != null) { - logger.debug("local exporter [{}] - stopped!", name()); - currentBulk.terminate(); - } + if (state.get() == State.INITIALIZED) { + resolveBulk(event.state()); } } @Override public ExportBulk openBulk() { - return bulk; + if (state.get() != State.RUNNING) { + return null; + } + return resolveBulk(clusterService.state()); } - // requires synchronization due to cluster state update events (see above) @Override - public synchronized void close() { - active = false; - clusterService.remove(this); - cleanerService.remove(this); - if (bulk != null) { - try { - bulk.terminate(); - bulk = null; - } catch (Exception e) { - logger.error("local exporter [{}] - failed to cleanly close bulk", e, name()); - } + public void doClose() { + if (state.getAndSet(State.TERMINATED) != State.TERMINATED) { + logger.debug("local exporter [{}] - stopped", name()); + clusterService.remove(this); + cleanerService.remove(this); } } - LocalBulk resolveBulk(ClusterState clusterState, LocalBulk currentBulk) { + LocalBulk resolveBulk(ClusterState clusterState) { if (clusterService.localNode() == null || clusterState == null) { - return currentBulk; + return null; } if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { @@ -148,47 +123,52 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle // if this is not the master, we'll just look to see if the monitoring timestamped template is already // installed and if so, if it has a compatible version. If it is (installed and compatible) // we'll be able to start this exporter. Otherwise, we'll just wait for a new cluster state. - if (!clusterService.localNode().isMasterNode()) { + if (clusterService.state().nodes().isLocalNodeElectedMaster() == false) { // We only need to check the index template for timestamped indices - if (!templateInstalled) { + if (templateInstalled == false) { // the template for timestamped indices is not yet installed in the given cluster state, we'll wait. logger.debug("local exporter [{}] - monitoring index template does not exist, so service cannot start", name()); return null; } - // ok.. we have a compatible template... we can start + logger.debug("local exporter [{}] - monitoring index template found, service can start", name()); + + } else { + + // we are on master + // + // Check that there is nothing that could block metadata updates + if (clusterState.blocks().hasGlobalBlock(ClusterBlockLevel.METADATA_WRITE)) { + logger.debug("local exporter [{}] - waiting until metadata writes are unblocked", name()); + return null; + } + + // Install the index template for timestamped indices first, so that other nodes can ship data + if (templateInstalled == false) { + logger.debug("local exporter [{}] - could not find existing monitoring template for timestamped indices, " + + "installing a new one", name()); + putTemplate(templateName, MarvelTemplateUtils.loadTimestampedIndexTemplate()); + // we'll get that template on the next cluster state update + return null; + } + + // Install the index template for data index + templateName = MarvelTemplateUtils.dataTemplateName(templateVersion); + if (hasTemplate(templateName, clusterState) == false) { + logger.debug("local exporter [{}] - could not find existing monitoring template for data index, " + + "installing a new one", name()); + putTemplate(templateName, MarvelTemplateUtils.loadDataIndexTemplate()); + // we'll get that template on the next cluster state update + return null; + } + + logger.debug("local exporter [{}] - monitoring index template found on master node, service can start", name()); + } + + if (state.compareAndSet(State.INITIALIZED, State.RUNNING)) { logger.debug("local exporter [{}] - started!", name()); - return currentBulk != null ? currentBulk : new LocalBulk(name(), logger, client, resolvers); } - - // we are on master - // - // Check that there is nothing that could block metadata updates - if (clusterState.blocks().hasGlobalBlock(ClusterBlockLevel.METADATA_WRITE)) { - logger.debug("local exporter [{}] - waiting until metadata writes are unblocked", name()); - return null; - } - - // Install the index template for timestamped indices first, so that other nodes can ship data - if (!templateInstalled) { - logger.debug("local exporter [{}] - could not find existing monitoring template for timestamped indices, installing a new one", - name()); - putTemplate(templateName, MarvelTemplateUtils.loadTimestampedIndexTemplate()); - // we'll get that template on the next cluster state update - return null; - } - - // Install the index template for data index - templateName = MarvelTemplateUtils.dataTemplateName(templateVersion); - if (!hasTemplate(templateName, clusterState)) { - logger.debug("local exporter [{}] - could not find existing monitoring template for data index, installing a new one", name()); - putTemplate(templateName, MarvelTemplateUtils.loadDataIndexTemplate()); - // we'll get that template on the next cluster state update - return null; - } - - // ok.. we have a compatible templates... we can start - return currentBulk != null ? currentBulk : new LocalBulk(name(), logger, client, resolvers); + return new LocalBulk(name(), logger, client, resolvers); } /** @@ -245,12 +225,12 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle @Override public void onCleanUpIndices(TimeValue retention) { - if (bulk == null) { + if (state.get() != State.RUNNING) { logger.debug("local exporter [{}] - not ready yet", name()); return; } - if (clusterService.localNode().isMasterNode()) { + if (clusterService.state().nodes().isLocalNodeElectedMaster()) { // Reference date time will be compared to index.creation_date settings, // that's why it must be in UTC DateTime expiration = new DateTime(DateTimeZone.UTC).minus(retention.millis()); @@ -345,4 +325,10 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle return new LocalExporter(config, client, clusterService, cleanerService); } } + + enum State { + INITIALIZED, + RUNNING, + TERMINATED + } } diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkTests.java index 26a8a95aea3..68624cfd994 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkTests.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/MonitoringBulkTests.java @@ -19,11 +19,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -67,16 +68,16 @@ public class MonitoringBulkTests extends MarvelIntegTestCase { * This test creates N threads that execute a random number of monitoring bulk requests. */ public void testConcurrentRequests() throws Exception { - final int numberThreads = randomIntBetween(3, 10); + final int numberThreads = randomIntBetween(3, 5); final Thread[] threads = new Thread[numberThreads]; final CountDownLatch latch = new CountDownLatch(numberThreads + 1); final List exceptions = new CopyOnWriteArrayList<>(); - AtomicInteger total = new AtomicInteger(0); + AtomicLong total = new AtomicLong(0); logger.info("--> using {} concurrent clients to execute requests", threads.length); for (int i = 0; i < threads.length; i++) { - final int nbRequests = randomIntBetween(3, 10); + final int nbRequests = randomIntBetween(1, 5); threads[i] = new Thread(new AbstractRunnable() { @Override @@ -92,7 +93,7 @@ public class MonitoringBulkTests extends MarvelIntegTestCase { for (int j = 0; j < nbRequests; j++) { MonitoringBulkRequestBuilder requestBuilder = monitoringClient().prepareMonitoringBulk(); - int numDocs = scaledRandomIntBetween(10, 1000); + int numDocs = scaledRandomIntBetween(10, 50); for (int k = 0; k < numDocs; k++) { MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString()); doc.setType("concurrent"); @@ -102,7 +103,7 @@ public class MonitoringBulkTests extends MarvelIntegTestCase { total.addAndGet(numDocs); MonitoringBulkResponse response = requestBuilder.get(); - assertThat(response.getError(), is(nullValue())); + assertNull (response.getError()); } } }, "export_thread_" + i); @@ -119,10 +120,7 @@ public class MonitoringBulkTests extends MarvelIntegTestCase { } assertThat(exceptions, empty()); - refresh(); - - SearchResponse countResponse = client().prepareSearch().setTypes("concurrent").setSize(0).get(); - assertHitCount(countResponse, total.get()); + awaitMarvelDocsCount(greaterThanOrEqualTo(total.get()), "concurrent"); } public void testUnsupportedSystem() throws Exception { diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/AbstractExporterTemplateTestCase.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/AbstractExporterTemplateTestCase.java index d7f82e074de..4c21eab7b74 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/AbstractExporterTemplateTestCase.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/AbstractExporterTemplateTestCase.java @@ -22,6 +22,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.dataTemplateName; import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.indexTemplateName; import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST; +import static org.hamcrest.Matchers.notNullValue; @ClusterScope(scope = TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0) public abstract class AbstractExporterTemplateTestCase extends MarvelIntegTestCase { @@ -156,12 +157,14 @@ public abstract class AbstractExporterTemplateTestCase extends MarvelIntegTestCa protected void doExporting() throws Exception { Collector collector = internalCluster().getInstance(ClusterStatsCollector.class); - exporter().export(collector.collect()); - } + assertNotNull(collector); - private Exporter exporter() { Exporters exporters = internalCluster().getInstance(Exporters.class); - return exporters.iterator().next(); + assertNotNull(exporters); + + // Wait for exporting bulks to be ready to export + assertBusy(() -> assertThat(exporters.openBulk(), notNullValue())); + exporters.export(collector.collect()); } private String currentDataIndexName() { diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java index fdcdbc50730..9a46c3e727d 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java @@ -7,8 +7,10 @@ package org.elasticsearch.marvel.agent.exporter; import org.elasticsearch.Version; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; @@ -77,48 +79,40 @@ public class ExportersTests extends ESTestCase { public void testInitExportersDefault() throws Exception { Exporter.Factory factory = new TestFactory("_type", true); factories.put("_type", factory); - Exporters.CurrentExporters internalExporters = exporters.initExporters(Settings.builder() - .build()); + Map internalExporters = exporters.initExporters(Settings.builder().build()); assertThat(internalExporters, notNullValue()); - assertThat(internalExporters.settings.getAsMap().size(), is(0)); - assertThat(internalExporters.exporters.size(), is(1)); - assertThat(internalExporters.exporters, hasKey("default_" + LocalExporter.TYPE)); - assertThat(internalExporters.exporters.get("default_" + LocalExporter.TYPE), instanceOf(LocalExporter.class)); + assertThat(internalExporters.size(), is(1)); + assertThat(internalExporters, hasKey("default_" + LocalExporter.TYPE)); + assertThat(internalExporters.get("default_" + LocalExporter.TYPE), instanceOf(LocalExporter.class)); } public void testInitExportersSingle() throws Exception { Exporter.Factory factory = new TestFactory("_type", true); factories.put("_type", factory); - Exporters.CurrentExporters internalExporters = exporters.initExporters(Settings.builder() + Map internalExporters = exporters.initExporters(Settings.builder() .put("_name.type", "_type") .build()); assertThat(internalExporters, notNullValue()); - assertThat(internalExporters.settings.getAsMap().size(), is(1)); - assertThat(internalExporters.settings.getAsMap(), hasEntry("_name.type", "_type")); - assertThat(internalExporters.exporters.size(), is(1)); - assertThat(internalExporters.exporters, hasKey("_name")); - assertThat(internalExporters.exporters.get("_name"), instanceOf(TestFactory.TestExporter.class)); - assertThat(internalExporters.exporters.get("_name").type, is("_type")); + assertThat(internalExporters.size(), is(1)); + assertThat(internalExporters, hasKey("_name")); + assertThat(internalExporters.get("_name"), instanceOf(TestFactory.TestExporter.class)); + assertThat(internalExporters.get("_name").type, is("_type")); } public void testInitExportersSingleDisabled() throws Exception { Exporter.Factory factory = new TestFactory("_type", true); factories.put("_type", factory); - Exporters.CurrentExporters internalExporters = exporters.initExporters(Settings.builder() + Map internalExporters = exporters.initExporters(Settings.builder() .put("_name.type", "_type") .put("_name.enabled", false) .build()); assertThat(internalExporters, notNullValue()); - assertThat(internalExporters.settings.getAsMap().size(), is(2)); - assertThat(internalExporters.settings.getAsMap(), hasEntry("_name.type", "_type")); - assertThat(internalExporters.settings.getAsMap(), hasEntry("_name.enabled", "false")); // the only configured exporter is disabled... yet we intentionally don't fallback on the default - - assertThat(internalExporters.exporters.size(), is(0)); + assertThat(internalExporters.size(), is(0)); } public void testInitExportersSingleUnknownType() throws Exception { @@ -146,22 +140,19 @@ public class ExportersTests extends ESTestCase { public void testInitExportersMultipleSameType() throws Exception { Exporter.Factory factory = new TestFactory("_type", false); factories.put("_type", factory); - Exporters.CurrentExporters internalExporters = exporters.initExporters(Settings.builder() + Map internalExporters = exporters.initExporters(Settings.builder() .put("_name0.type", "_type") .put("_name1.type", "_type") .build()); assertThat(internalExporters, notNullValue()); - assertThat(internalExporters.settings.getAsMap().size(), is(2)); - assertThat(internalExporters.settings.getAsMap(), hasEntry("_name0.type", "_type")); - assertThat(internalExporters.settings.getAsMap(), hasEntry("_name1.type", "_type")); - assertThat(internalExporters.exporters.size(), is(2)); - assertThat(internalExporters.exporters, hasKey("_name0")); - assertThat(internalExporters.exporters.get("_name0"), instanceOf(TestFactory.TestExporter.class)); - assertThat(internalExporters.exporters.get("_name0").type, is("_type")); - assertThat(internalExporters.exporters, hasKey("_name1")); - assertThat(internalExporters.exporters.get("_name1"), instanceOf(TestFactory.TestExporter.class)); - assertThat(internalExporters.exporters.get("_name1").type, is("_type")); + assertThat(internalExporters.size(), is(2)); + assertThat(internalExporters, hasKey("_name0")); + assertThat(internalExporters.get("_name0"), instanceOf(TestFactory.TestExporter.class)); + assertThat(internalExporters.get("_name0").type, is("_type")); + assertThat(internalExporters, hasKey("_name1")); + assertThat(internalExporters.get("_name1"), instanceOf(TestFactory.TestExporter.class)); + assertThat(internalExporters.get("_name1").type, is("_type")); } public void testInitExportersMultipleSameTypeSingletons() throws Exception { @@ -184,12 +175,15 @@ public class ExportersTests extends ESTestCase { final AtomicReference settingsHolder = new AtomicReference<>(); - exporters = new Exporters(Settings.builder() + Settings nodeSettings = Settings.builder() .put("xpack.monitoring.agent.exporters._name0.type", "_type") .put("xpack.monitoring.agent.exporters._name1.type", "_type") - .build(), factories, clusterService, clusterSettings) { + .build(); + clusterSettings = new ClusterSettings(nodeSettings, new HashSet<>(Arrays.asList(MarvelSettings.EXPORTERS_SETTINGS))); + + exporters = new Exporters(nodeSettings, factories, clusterService, clusterSettings) { @Override - CurrentExporters initExporters(Settings settings) { + Map initExporters(Settings settings) { settingsHolder.set(settings); return super.initExporters(settings); } @@ -227,9 +221,9 @@ public class ExportersTests extends ESTestCase { .build(), factories, clusterService, clusterSettings); exporters.start(); - DiscoveryNode localNode = mock(DiscoveryNode.class); - when(localNode.isMasterNode()).thenReturn(true); - when(clusterService.localNode()).thenReturn(localNode); + DiscoveryNodes nodes = mock(DiscoveryNodes.class); + when(nodes.isLocalNodeElectedMaster()).thenReturn(true); + when(clusterService.state()).thenReturn(ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build()); ExportBulk bulk = exporters.openBulk(); assertThat(bulk, notNullValue()); @@ -251,9 +245,9 @@ public class ExportersTests extends ESTestCase { .build(), factories, clusterService, clusterSettings); exporters.start(); - DiscoveryNode localNode = mock(DiscoveryNode.class); - when(localNode.isMasterNode()).thenReturn(false); - when(clusterService.localNode()).thenReturn(localNode); + DiscoveryNodes nodes = mock(DiscoveryNodes.class); + when(nodes.isLocalNodeElectedMaster()).thenReturn(false); + when(clusterService.state()).thenReturn(ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build()); ExportBulk bulk = exporters.openBulk(); assertThat(bulk, notNullValue()); @@ -342,17 +336,13 @@ public class ExportersTests extends ESTestCase { super(type, config); } - @Override - public void export(Collection monitoringDocs) throws Exception { - } - @Override public ExportBulk openBulk() { return mock(ExportBulk.class); } @Override - public void close() { + public void doClose() { } } } @@ -406,7 +396,7 @@ public class ExportersTests extends ESTestCase { } @Override - public void close() { + public void doClose() { } public int getExportedCount() { diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterTests.java index e975df6336b..eae01dfa73e 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterTests.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporterTests.java @@ -42,6 +42,7 @@ import org.junit.Before; import java.io.IOException; import java.net.BindException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -55,6 +56,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; @ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0) public class HttpExporterTests extends MarvelIntegTestCase { @@ -102,11 +104,10 @@ public class HttpExporterTests extends MarvelIntegTestCase { .put("xpack.monitoring.agent.exporters._http.connection.keep_alive", false) .put("xpack.monitoring.agent.exporters._http.update_mappings", false); - String agentNode = internalCluster().startNode(builder); - HttpExporter exporter = getExporter(agentNode); + internalCluster().startNode(builder); final int nbDocs = randomIntBetween(1, 25); - exporter.export(newRandomMarvelDocs(nbDocs)); + export(newRandomMarvelDocs(nbDocs)); assertThat(webServer.getRequestCount(), equalTo(6)); @@ -186,7 +187,7 @@ public class HttpExporterTests extends MarvelIntegTestCase { logger.info("--> exporting data"); HttpExporter exporter = getExporter(agentNode); assertThat(exporter.supportedClusterVersion, is(false)); - exporter.export(Collections.singletonList(newRandomMarvelDoc())); + export(Collections.singletonList(newRandomMarvelDoc())); assertThat(exporter.supportedClusterVersion, is(true)); assertThat(webServer.getRequestCount(), equalTo(6)); @@ -251,7 +252,7 @@ public class HttpExporterTests extends MarvelIntegTestCase { enqueueResponse(secondWebServer, 200, "{\"errors\": false, \"msg\": \"successful bulk request\"}"); logger.info("--> exporting a second event"); - exporter.export(Collections.singletonList(newRandomMarvelDoc())); + export(Collections.singletonList(newRandomMarvelDoc())); assertThat(secondWebServer.getRequestCount(), equalTo(5)); @@ -301,7 +302,7 @@ public class HttpExporterTests extends MarvelIntegTestCase { logger.info("--> exporting data"); HttpExporter exporter = getExporter(agentNode); assertThat(exporter.supportedClusterVersion, is(false)); - exporter.export(Collections.singletonList(newRandomMarvelDoc())); + assertNull(exporter.openBulk()); assertThat(exporter.supportedClusterVersion, is(false)); assertThat(webServer.getRequestCount(), equalTo(1)); @@ -333,7 +334,7 @@ public class HttpExporterTests extends MarvelIntegTestCase { HttpExporter exporter = getExporter(agentNode); MonitoringDoc doc = newRandomMarvelDoc(); - exporter.export(Collections.singletonList(doc)); + export(Collections.singletonList(doc)); assertThat(webServer.getRequestCount(), equalTo(6)); @@ -385,8 +386,7 @@ public class HttpExporterTests extends MarvelIntegTestCase { enqueueResponse(200, "{\"errors\": false, \"msg\": \"successful bulk request\"}"); doc = newRandomMarvelDoc(); - exporter = getExporter(agentNode); - exporter.export(Collections.singletonList(doc)); + export(Collections.singletonList(doc)); String expectedMarvelIndex = ".monitoring-es-" + MarvelTemplateUtils.TEMPLATE_VERSION + "-" + DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(doc.getTimestamp()); @@ -440,6 +440,15 @@ public class HttpExporterTests extends MarvelIntegTestCase { assertTrue(resolved.equals(expected)); } + private void export(Collection docs) throws Exception { + Exporters exporters = internalCluster().getInstance(Exporters.class); + assertThat(exporters, notNullValue()); + + // Wait for exporting bulks to be ready to export + assertBusy(() -> exporters.forEach(exporter -> assertThat(exporter.openBulk(), notNullValue()))); + exporters.export(docs); + } + private HttpExporter getExporter(String nodeName) { Exporters exporters = internalCluster().getInstance(Exporters.class, nodeName); return (HttpExporter) exporters.iterator().next(); diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java index 0bdeb52d75a..cf41c1996d9 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/local/LocalExporterTests.java @@ -31,6 +31,7 @@ import org.joda.time.format.DateTimeFormat; import org.junit.After; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -73,10 +74,8 @@ public class LocalExporterTests extends MarvelIntegTestCase { .build()); securedEnsureGreen(); - Exporter exporter = getLocalExporter("_local"); - logger.debug("--> exporting a single monitoring doc"); - exporter.export(Collections.singletonList(newRandomMarvelDoc())); + export(Collections.singletonList(newRandomMarvelDoc())); awaitMarvelDocsCount(is(1L)); deleteMarvelIndices(); @@ -87,7 +86,7 @@ public class LocalExporterTests extends MarvelIntegTestCase { } logger.debug("--> exporting {} monitoring docs", monitoringDocs.size()); - exporter.export(monitoringDocs); + export(monitoringDocs); awaitMarvelDocsCount(is((long) monitoringDocs.size())); SearchResponse response = client().prepareSearch(MONITORING_INDICES_PREFIX + "*").get(); @@ -131,7 +130,7 @@ public class LocalExporterTests extends MarvelIntegTestCase { assertThat(exporter.getResolvers().getResolver(doc).index(doc), equalTo(indexName)); logger.debug("--> exporting a random monitoring document"); - exporter.export(Collections.singletonList(doc)); + export(Collections.singletonList(doc)); awaitIndexExists(indexName); logger.debug("--> updates the timestamp"); @@ -144,7 +143,7 @@ public class LocalExporterTests extends MarvelIntegTestCase { logger.debug("--> exporting the document again (this time with the the new index name time format [{}], expecting index name [{}]", timeFormat, indexName); - exporter.export(Collections.singletonList(doc)); + export(Collections.singletonList(doc)); awaitIndexExists(indexName); } @@ -155,19 +154,21 @@ public class LocalExporterTests extends MarvelIntegTestCase { .build()); securedEnsureGreen(); - LocalExporter exporter = getLocalExporter("_local"); - logger.debug("--> exporting a single monitoring doc"); - exporter.export(Collections.singletonList(newRandomMarvelDoc())); + export(Collections.singletonList(newRandomMarvelDoc())); awaitMarvelDocsCount(is(1L)); - assertNull(exporter.getBulk().requestBuilder); logger.debug("--> closing monitoring indices"); assertAcked(client().admin().indices().prepareClose(MONITORING_INDICES_PREFIX + "*").get()); try { logger.debug("--> exporting a second monitoring doc"); - exporter.export(Collections.singletonList(newRandomMarvelDoc())); + LocalExporter exporter = getLocalExporter("_local"); + + LocalBulk bulk = (LocalBulk) exporter.openBulk(); + bulk.add(Collections.singletonList(newRandomMarvelDoc())); + bulk.close(true); + } catch (ElasticsearchException e) { assertThat(e.getMessage(), containsString("failed to flush export bulk [_local]")); assertThat(e.getCause(), instanceOf(ExportException.class)); @@ -177,10 +178,18 @@ public class LocalExporterTests extends MarvelIntegTestCase { for (ExportException c : cause) { assertThat(c.getMessage(), containsString("IndexClosedException[closed]")); } - assertNull(exporter.getBulk().requestBuilder); } } + private void export(Collection docs) throws Exception { + Exporters exporters = internalCluster().getInstance(Exporters.class); + assertThat(exporters, notNullValue()); + + // Wait for exporting bulks to be ready to export + assertBusy(() -> exporters.forEach(exporter -> assertThat(exporter.openBulk(), notNullValue()))); + exporters.export(docs); + } + private LocalExporter getLocalExporter(String name) throws Exception { final Exporter exporter = internalCluster().getInstance(Exporters.class).getExporter(name); assertThat(exporter, notNullValue()); diff --git a/elasticsearch/x-pack/marvel/src/test/resources/rest-api-spec/test/monitoring/bulk/10_basic.yaml b/elasticsearch/x-pack/marvel/src/test/resources/rest-api-spec/test/monitoring/bulk/10_basic.yaml index 1e1f472cf9a..09177132bee 100644 --- a/elasticsearch/x-pack/marvel/src/test/resources/rest-api-spec/test/monitoring/bulk/10_basic.yaml +++ b/elasticsearch/x-pack/marvel/src/test/resources/rest-api-spec/test/monitoring/bulk/10_basic.yaml @@ -1,3 +1,20 @@ +--- +setup: + - do: + # Reduce the interval time so that monitoring + # indices are created faster + cluster.put_settings: + body: + transient: + xpack.monitoring.agent.interval: 1s + flat_settings: true + - do: + # Waits for the monitoring data index to be available: + # it indicates that the local exporter is ready + cluster.health: + index: ".monitoring-data-*" + wait_for_active_shards: 1 + --- "Bulk indexing of monitoring data": # Get the current version @@ -53,7 +70,7 @@ body: - '{"index": {}}' - '{"field_1": "value_1"}' - - '{"index": {"_type": "test_type"}}' + - '{"index": {"_type": "custom_type"}}' - '{"field_1": "value_2"}' - '{"index": {}}' - '{"field_1": "value_3"}' @@ -73,6 +90,6 @@ - do: search: index: .monitoring-kibana-* - type: test_type + type: custom_type - match: { hits.total: 1 }