[TEST] Make BulkTests#testBulkProcessorFlush more robust

This commit is contained in:
Simon Willnauer 2014-04-03 13:32:41 +02:00
parent 6f5b7fa086
commit 697432390d
1 changed files with 17 additions and 12 deletions

View File

@ -40,12 +40,9 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
@ -624,19 +621,24 @@ public class BulkTests extends ElasticsearchIntegrationTest {
@Test
public void testBulkProcessorFlush() throws InterruptedException {
final BlockingQueue<BulkResponse> responseQueue = new SynchronousQueue();
final AtomicReference<BulkResponse> responseRef = new AtomicReference<>();
final AtomicReference<Throwable> failureRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
responseQueue.add(response);
responseRef.set(response);
latch.countDown();
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
failureRef.set(failure);
latch.countDown();
}
};
try (BulkProcessor processor = BulkProcessor.builder(client(), listener).setBulkActions(6)
@ -651,10 +653,13 @@ public class BulkTests extends ElasticsearchIntegrationTest {
processor.add(new IndexRequest("test", "test", "5").source(data));
processor.flush();
BulkResponse response = responseQueue.poll(1, TimeUnit.SECONDS);
assertThat("Could not get a bulk response even after an explicit flush.", response, is(notNullValue()));
latch.await();
BulkResponse response = responseRef.get();
Throwable error = failureRef.get();
assertThat(error, nullValue());
assertThat("Could not get a bulk response even after an explicit flush.", response, notNullValue());
assertThat(response.getItems().length, is(5));
}
}
}