Monitoring: Update exporter & bulk in ExportersTests

Since elastic/elasticsearch#1832 exporters are created once, but the inner exporting bulks must be instanciated for each export. The CountingExporter and CountingBulk have not been updated to reflect this change.

Original commit: elastic/x-pack-elasticsearch@bbbde22363
This commit is contained in:
Tanguy Leroux 2016-04-06 11:35:39 +02:00
parent 1e3c56ce97
commit 3c65f38fbe
1 changed files with 25 additions and 9 deletions

View File

@ -288,7 +288,10 @@ public class ExportersTests extends ESTestCase {
int nbDocs = randomIntBetween(10, 50); int nbDocs = randomIntBetween(10, 50);
total += nbDocs; total += nbDocs;
logger.debug("--> exporting thread [{}] exports {} documents", i, nbDocs); final int threadNum = i;
final int threadDocs = nbDocs;
logger.debug("--> exporting thread [{}] exports {} documents", threadNum, threadDocs);
threads[i] = new Thread(new AbstractRunnable() { threads[i] = new Thread(new AbstractRunnable() {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
@ -299,11 +302,17 @@ public class ExportersTests extends ESTestCase {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
List<MonitoringDoc> docs = new ArrayList<>(); List<MonitoringDoc> docs = new ArrayList<>();
for (int n = 0; n < nbDocs; n++) { for (int n = 0; n < threadDocs; n++) {
docs.add(new MonitoringDoc(MonitoredSystem.ES.getSystem(), Version.CURRENT.toString())); docs.add(new MonitoringDoc(MonitoredSystem.ES.getSystem(), Version.CURRENT.toString()));
} }
barrier.await(10, TimeUnit.SECONDS); barrier.await(10, TimeUnit.SECONDS);
try {
exporters.export(docs); exporters.export(docs);
logger.debug("--> thread [{}] successfully exported {} documents", threadNum, threadDocs);
} catch (Exception e) {
logger.debug("--> thread [{}] failed to export {} documents", threadNum, threadDocs);
}
} }
}, "export_thread_" + i); }, "export_thread_" + i);
threads[i].start(); threads[i].start();
@ -319,6 +328,8 @@ public class ExportersTests extends ESTestCase {
assertThat(exporter, instanceOf(CountingExportFactory.CountingExporter.class)); assertThat(exporter, instanceOf(CountingExportFactory.CountingExporter.class));
assertThat(((CountingExportFactory.CountingExporter) exporter).getExportedCount(), equalTo(total)); assertThat(((CountingExportFactory.CountingExporter) exporter).getExportedCount(), equalTo(total));
} }
exporters.close();
} }
static class TestFactory extends Exporter.Factory<TestFactory.TestExporter> { static class TestFactory extends Exporter.Factory<TestFactory.TestExporter> {
@ -383,15 +394,16 @@ public class ExportersTests extends ESTestCase {
static class CountingExporter extends Exporter { static class CountingExporter extends Exporter {
private static final AtomicInteger count = new AtomicInteger(0); private static final AtomicInteger count = new AtomicInteger(0);
private final CountingBulk bulk; private List<CountingBulk> bulks = new CopyOnWriteArrayList<>();
public CountingExporter(String type, Config config) { public CountingExporter(String type, Config config) {
super(type, config); super(type, config);
this.bulk = new CountingBulk(type + "#" + count.getAndIncrement());
} }
@Override @Override
public ExportBulk openBulk() { public ExportBulk openBulk() {
CountingBulk bulk = new CountingBulk(type + "#" + count.getAndIncrement());
bulks.add(bulk);
return bulk; return bulk;
} }
@ -400,7 +412,11 @@ public class ExportersTests extends ESTestCase {
} }
public int getExportedCount() { public int getExportedCount() {
return bulk.getCount().get(); int exported = 0;
for (CountingBulk bulk : bulks) {
exported += bulk.getCount();
}
return exported;
} }
} }
@ -425,8 +441,8 @@ public class ExportersTests extends ESTestCase {
protected void doClose() throws ExportException { protected void doClose() throws ExportException {
} }
AtomicInteger getCount() { int getCount() {
return count; return count.get();
} }
} }
} }