diff --git a/src/test/java/org/elasticsearch/document/BulkTests.java b/src/test/java/org/elasticsearch/document/BulkTests.java index b017f7a7614..bc78a614744 100644 --- a/src/test/java/org/elasticsearch/document/BulkTests.java +++ b/src/test/java/org/elasticsearch/document/BulkTests.java @@ -569,21 +569,27 @@ public class BulkTests extends ElasticsearchIntegrationTest { @Test public void testThatBulkProcessorCountIsCorrect() throws InterruptedException { - final BlockingQueue responseQueue = new SynchronousQueue(); - + final AtomicReference responseRef = new AtomicReference<>(); + final AtomicReference failureRef = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) {} @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - responseQueue.add(response); + responseRef.set(response); + latch.countDown(); } @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + failureRef.set(failure); + latch.countDown(); + } }; + try (BulkProcessor processor = BulkProcessor.builder(client(), listener).setBulkActions(5) .setConcurrentRequests(1).setName("foo").build()) { Map data = Maps.newHashMap(); @@ -595,8 +601,11 @@ public class BulkTests extends ElasticsearchIntegrationTest { processor.add(new IndexRequest("test", "test", "4").source(data)); processor.add(new IndexRequest("test", "test", "5").source(data)); - BulkResponse response = responseQueue.poll(5, TimeUnit.SECONDS); - assertThat("Could not get a bulk response in 5 seconds", response, is(notNullValue())); + latch.await(); + BulkResponse response = responseRef.get(); + Throwable error = failureRef.get(); + assertThat(error, nullValue()); + assertThat("Could not get a bulk response even after an explicit flush.", response, notNullValue()); assertThat(response.getItems().length, is(5)); } }