diff --git a/src/test/java/org/elasticsearch/document/BulkTests.java b/src/test/java/org/elasticsearch/document/BulkTests.java index a44de738675..b017f7a7614 100644 --- a/src/test/java/org/elasticsearch/document/BulkTests.java +++ b/src/test/java/org/elasticsearch/document/BulkTests.java @@ -40,12 +40,9 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; -import java.util.Arrays; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; @@ -624,19 +621,24 @@ public class BulkTests extends ElasticsearchIntegrationTest { @Test public void testBulkProcessorFlush() throws InterruptedException { - final BlockingQueue responseQueue = new SynchronousQueue(); - + final AtomicReference responseRef = new AtomicReference<>(); + final AtomicReference failureRef = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) {} @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - responseQueue.add(response); + responseRef.set(response); + latch.countDown(); } @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + failureRef.set(failure); + latch.countDown(); + } }; try (BulkProcessor processor = BulkProcessor.builder(client(), listener).setBulkActions(6) @@ -651,10 +653,13 @@ public class BulkTests extends ElasticsearchIntegrationTest { processor.add(new IndexRequest("test", "test", "5").source(data)); processor.flush(); - - BulkResponse response = responseQueue.poll(1, TimeUnit.SECONDS); - assertThat("Could not get a bulk response even after an explicit flush.", response, is(notNullValue())); + latch.await(); + BulkResponse response = responseRef.get(); + Throwable error = failureRef.get(); + assertThat(error, nullValue()); + assertThat("Could not get a bulk response even after an explicit flush.", response, notNullValue()); assertThat(response.getItems().length, is(5)); } } } +