diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java index 39d2b631bb0..aa894898e3c 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java @@ -288,7 +288,10 @@ public class ExportersTests extends ESTestCase { int nbDocs = randomIntBetween(10, 50); total += nbDocs; - logger.debug("--> exporting thread [{}] exports {} documents", i, nbDocs); + final int threadNum = i; + final int threadDocs = nbDocs; + + logger.debug("--> exporting thread [{}] exports {} documents", threadNum, threadDocs); threads[i] = new Thread(new AbstractRunnable() { @Override public void onFailure(Throwable t) { @@ -299,11 +302,17 @@ public class ExportersTests extends ESTestCase { @Override protected void doRun() throws Exception { List docs = new ArrayList<>(); - for (int n = 0; n < nbDocs; n++) { + for (int n = 0; n < threadDocs; n++) { docs.add(new MonitoringDoc(MonitoredSystem.ES.getSystem(), Version.CURRENT.toString())); } barrier.await(10, TimeUnit.SECONDS); - exporters.export(docs); + try { + exporters.export(docs); + logger.debug("--> thread [{}] successfully exported {} documents", threadNum, threadDocs); + } catch (Exception e) { + logger.debug("--> thread [{}] failed to export {} documents", threadNum, threadDocs); + } + } }, "export_thread_" + i); threads[i].start(); @@ -317,8 +326,10 @@ public class ExportersTests extends ESTestCase { assertThat(exceptions, empty()); for (Exporter exporter : exporters) { assertThat(exporter, instanceOf(CountingExportFactory.CountingExporter.class)); - assertThat(((CountingExportFactory.CountingExporter)exporter).getExportedCount(), equalTo(total)); + assertThat(((CountingExportFactory.CountingExporter) exporter).getExportedCount(), equalTo(total)); } + + exporters.close(); } static class TestFactory extends Exporter.Factory { @@ -383,15 +394,16 @@ public class ExportersTests extends ESTestCase { static class CountingExporter extends Exporter { private static final AtomicInteger count = new AtomicInteger(0); - private final CountingBulk bulk; + private List bulks = new CopyOnWriteArrayList<>(); public CountingExporter(String type, Config config) { super(type, config); - this.bulk = new CountingBulk(type + "#" + count.getAndIncrement()); } @Override public ExportBulk openBulk() { + CountingBulk bulk = new CountingBulk(type + "#" + count.getAndIncrement()); + bulks.add(bulk); return bulk; } @@ -400,7 +412,11 @@ public class ExportersTests extends ESTestCase { } public int getExportedCount() { - return bulk.getCount().get(); + int exported = 0; + for (CountingBulk bulk : bulks) { + exported += bulk.getCount(); + } + return exported; } } @@ -425,8 +441,8 @@ public class ExportersTests extends ESTestCase { protected void doClose() throws ExportException { } - AtomicInteger getCount() { - return count; + int getCount() { + return count.get(); } } }