[TEST] make BulkTest more robust if test infra is slow
This commit is contained in:
parent
0cebbf1624
commit
fd8a6ac382
|
@ -569,21 +569,27 @@ public class BulkTests extends ElasticsearchIntegrationTest {
|
|||
|
||||
@Test
|
||||
public void testThatBulkProcessorCountIsCorrect() throws InterruptedException {
|
||||
final BlockingQueue<BulkResponse> responseQueue = new SynchronousQueue();
|
||||
|
||||
final AtomicReference<BulkResponse> responseRef = new AtomicReference<>();
|
||||
final AtomicReference<Throwable> failureRef = new AtomicReference<>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
|
||||
@Override
|
||||
public void beforeBulk(long executionId, BulkRequest request) {}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
|
||||
responseQueue.add(response);
|
||||
responseRef.set(response);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
|
||||
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
|
||||
failureRef.set(failure);
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
try (BulkProcessor processor = BulkProcessor.builder(client(), listener).setBulkActions(5)
|
||||
.setConcurrentRequests(1).setName("foo").build()) {
|
||||
Map<String, Object> data = Maps.newHashMap();
|
||||
|
@ -595,8 +601,11 @@ public class BulkTests extends ElasticsearchIntegrationTest {
|
|||
processor.add(new IndexRequest("test", "test", "4").source(data));
|
||||
processor.add(new IndexRequest("test", "test", "5").source(data));
|
||||
|
||||
BulkResponse response = responseQueue.poll(5, TimeUnit.SECONDS);
|
||||
assertThat("Could not get a bulk response in 5 seconds", response, is(notNullValue()));
|
||||
latch.await();
|
||||
BulkResponse response = responseRef.get();
|
||||
Throwable error = failureRef.get();
|
||||
assertThat(error, nullValue());
|
||||
assertThat("Could not get a bulk response even after an explicit flush.", response, notNullValue());
|
||||
assertThat(response.getItems().length, is(5));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue