diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java new file mode 100644 index 00000000000..7f59fcc8312 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java @@ -0,0 +1,350 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import org.apache.http.entity.ContentType; +import org.apache.http.nio.entity.NStringEntity; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.get.MultiGetItemResponse; +import org.elasticsearch.action.get.MultiGetRequest; +import org.elasticsearch.action.get.MultiGetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class BulkProcessorIT extends ESRestHighLevelClientTestCase { + + private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) { + return BulkProcessor.builder(highLevelClient()::bulkAsync, listener); + } + + public void testThatBulkProcessorCountIsCorrect() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); + + int numDocs = randomIntBetween(10, 100); + try (BulkProcessor processor = initBulkProcessorBuilder(listener) + //let's make sure that the bulk action limit trips, one single execution will index all the documents + .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) + .build()) { + + MultiGetRequest multiGetRequest = indexDocs(processor, numDocs); + + latch.await(); + + assertThat(listener.beforeCounts.get(), equalTo(1)); + assertThat(listener.afterCounts.get(), equalTo(1)); + assertThat(listener.bulkFailures.size(), equalTo(0)); + assertResponseItems(listener.bulkItems, numDocs); + assertMultiGetResponse(highLevelClient().multiGet(multiGetRequest), numDocs); + } + } + + public void testBulkProcessorFlush() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); + + int numDocs = randomIntBetween(10, 100); + + try (BulkProcessor processor = initBulkProcessorBuilder(listener) + //let's make sure that this bulk won't be automatically flushed + .setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100)) + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { + + MultiGetRequest multiGetRequest = indexDocs(processor, numDocs); + + assertThat(latch.await(randomInt(500), TimeUnit.MILLISECONDS), equalTo(false)); + //we really need an explicit flush as none of the bulk thresholds was reached + processor.flush(); + latch.await(); + + assertThat(listener.beforeCounts.get(), equalTo(1)); + assertThat(listener.afterCounts.get(), equalTo(1)); + assertThat(listener.bulkFailures.size(), equalTo(0)); + assertResponseItems(listener.bulkItems, numDocs); + assertMultiGetResponse(highLevelClient().multiGet(multiGetRequest), numDocs); + } + } + + public void testBulkProcessorConcurrentRequests() throws Exception { + int bulkActions = randomIntBetween(10, 100); + int numDocs = randomIntBetween(bulkActions, bulkActions + 100); + int concurrentRequests = randomIntBetween(0, 7); + + int expectedBulkActions = numDocs / bulkActions; + + final CountDownLatch latch = new CountDownLatch(expectedBulkActions); + int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1; + final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions); + + BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch); + + MultiGetRequest multiGetRequest; + + try (BulkProcessor processor = initBulkProcessorBuilder(listener) + .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) + //set interval and size to high values + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { + + multiGetRequest = indexDocs(processor, numDocs); + + latch.await(); + + assertThat(listener.beforeCounts.get(), equalTo(expectedBulkActions)); + assertThat(listener.afterCounts.get(), equalTo(expectedBulkActions)); + assertThat(listener.bulkFailures.size(), equalTo(0)); + assertThat(listener.bulkItems.size(), equalTo(numDocs - numDocs % bulkActions)); + } + + closeLatch.await(); + + assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions)); + assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions)); + assertThat(listener.bulkFailures.size(), equalTo(0)); + assertThat(listener.bulkItems.size(), equalTo(numDocs)); + + Set ids = new HashSet<>(); + for (BulkItemResponse bulkItemResponse : listener.bulkItems) { + assertThat(bulkItemResponse.getFailureMessage(), bulkItemResponse.isFailed(), equalTo(false)); + assertThat(bulkItemResponse.getIndex(), equalTo("test")); + assertThat(bulkItemResponse.getType(), equalTo("test")); + //with concurrent requests > 1 we can't rely on the order of the bulk requests + assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(numDocs))); + //we do want to check that we don't get duplicate ids back + assertThat(ids.add(bulkItemResponse.getId()), equalTo(true)); + } + + assertMultiGetResponse(highLevelClient().multiGet(multiGetRequest), numDocs); + } + + public void testBulkProcessorWaitOnClose() throws Exception { + BulkProcessorTestListener listener = new BulkProcessorTestListener(); + + int numDocs = randomIntBetween(10, 100); + BulkProcessor processor = initBulkProcessorBuilder(listener) + //let's make sure that the bulk action limit trips, one single execution will index all the documents + .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10), + RandomPicks.randomFrom(random(), ByteSizeUnit.values()))) + .build(); + + MultiGetRequest multiGetRequest = indexDocs(processor, numDocs); + assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true)); + if (randomBoolean()) { // check if we can call it multiple times + if (randomBoolean()) { + assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true)); + } else { + processor.close(); + } + } + + assertThat(listener.beforeCounts.get(), greaterThanOrEqualTo(1)); + assertThat(listener.afterCounts.get(), greaterThanOrEqualTo(1)); + for (Throwable bulkFailure : listener.bulkFailures) { + logger.error("bulk failure", bulkFailure); + } + assertThat(listener.bulkFailures.size(), equalTo(0)); + assertResponseItems(listener.bulkItems, numDocs); + assertMultiGetResponse(highLevelClient().multiGet(multiGetRequest), numDocs); + } + + public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception { + + String createIndexBody = "{\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"blocks.write\" : true\n" + + " }\n" + + " }\n" + + " \n" + + "}"; + + NStringEntity entity = new NStringEntity(createIndexBody, ContentType.APPLICATION_JSON); + Response response = client().performRequest("PUT", "/test-ro", Collections.emptyMap(), entity); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + + int bulkActions = randomIntBetween(10, 100); + int numDocs = randomIntBetween(bulkActions, bulkActions + 100); + int concurrentRequests = randomIntBetween(0, 10); + + int expectedBulkActions = numDocs / bulkActions; + + final CountDownLatch latch = new CountDownLatch(expectedBulkActions); + int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1; + final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions); + + int testDocs = 0; + int testReadOnlyDocs = 0; + MultiGetRequest multiGetRequest = new MultiGetRequest(); + BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch); + + try (BulkProcessor processor = initBulkProcessorBuilder(listener) + .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) + //set interval and size to high values + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { + + for (int i = 1; i <= numDocs; i++) { + if (randomBoolean()) { + testDocs++; + processor.add(new IndexRequest("test", "test", Integer.toString(testDocs)) + .source(XContentType.JSON, "field", "value")); + multiGetRequest.add("test", "test", Integer.toString(testDocs)); + } else { + testReadOnlyDocs++; + processor.add(new IndexRequest("test-ro", "test", Integer.toString(testReadOnlyDocs)) + .source(XContentType.JSON, "field", "value")); + } + } + } + + closeLatch.await(); + + assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions)); + assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions)); + assertThat(listener.bulkFailures.size(), equalTo(0)); + assertThat(listener.bulkItems.size(), equalTo(testDocs + testReadOnlyDocs)); + + Set ids = new HashSet<>(); + Set readOnlyIds = new HashSet<>(); + for (BulkItemResponse bulkItemResponse : listener.bulkItems) { + assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro"))); + assertThat(bulkItemResponse.getType(), equalTo("test")); + if (bulkItemResponse.getIndex().equals("test")) { + assertThat(bulkItemResponse.isFailed(), equalTo(false)); + //with concurrent requests > 1 we can't rely on the order of the bulk requests + assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testDocs))); + //we do want to check that we don't get duplicate ids back + assertThat(ids.add(bulkItemResponse.getId()), equalTo(true)); + } else { + assertThat(bulkItemResponse.isFailed(), equalTo(true)); + //with concurrent requests > 1 we can't rely on the order of the bulk requests + assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testReadOnlyDocs))); + //we do want to check that we don't get duplicate ids back + assertThat(readOnlyIds.add(bulkItemResponse.getId()), equalTo(true)); + } + } + + assertMultiGetResponse(highLevelClient().multiGet(multiGetRequest), testDocs); + } + + private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception { + MultiGetRequest multiGetRequest = new MultiGetRequest(); + for (int i = 1; i <= numDocs; i++) { + if (randomBoolean()) { + processor.add(new IndexRequest("test", "test", Integer.toString(i)) + .source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30))); + } else { + final String source = "{ \"index\":{\"_index\":\"test\",\"_type\":\"test\",\"_id\":\"" + Integer.toString(i) + "\"} }\n" + + Strings.toString(JsonXContent.contentBuilder() + .startObject().field("field", randomRealisticUnicodeOfLengthBetween(1, 30)).endObject()) + "\n"; + processor.add(new BytesArray(source), null, null, XContentType.JSON); + } + multiGetRequest.add("test", "test", Integer.toString(i)); + } + return multiGetRequest; + } + + private static void assertResponseItems(List bulkItemResponses, int numDocs) { + assertThat(bulkItemResponses.size(), is(numDocs)); + int i = 1; + for (BulkItemResponse bulkItemResponse : bulkItemResponses) { + assertThat(bulkItemResponse.getIndex(), equalTo("test")); + assertThat(bulkItemResponse.getType(), equalTo("test")); + assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++))); + assertThat("item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(), + bulkItemResponse.isFailed(), equalTo(false)); + } + } + + private static void assertMultiGetResponse(MultiGetResponse multiGetResponse, int numDocs) { + assertThat(multiGetResponse.getResponses().length, equalTo(numDocs)); + int i = 1; + for (MultiGetItemResponse multiGetItemResponse : multiGetResponse) { + assertThat(multiGetItemResponse.getIndex(), equalTo("test")); + assertThat(multiGetItemResponse.getType(), equalTo("test")); + assertThat(multiGetItemResponse.getId(), equalTo(Integer.toString(i++))); + } + } + + private static class BulkProcessorTestListener implements BulkProcessor.Listener { + + private final CountDownLatch[] latches; + private final AtomicInteger beforeCounts = new AtomicInteger(); + private final AtomicInteger afterCounts = new AtomicInteger(); + private final List bulkItems = new CopyOnWriteArrayList<>(); + private final List bulkFailures = new CopyOnWriteArrayList<>(); + + private BulkProcessorTestListener(CountDownLatch... latches) { + this.latches = latches; + } + + @Override + public void beforeBulk(long executionId, BulkRequest request) { + beforeCounts.incrementAndGet(); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + bulkItems.addAll(Arrays.asList(response.getItems())); + afterCounts.incrementAndGet(); + for (CountDownLatch latch : latches) { + latch.countDown(); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + bulkFailures.add(failure); + afterCounts.incrementAndGet(); + for (CountDownLatch latch : latches) { + latch.countDown(); + } + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 668dd230f60..39a185741db 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -211,7 +211,6 @@ public class BulkProcessor implements Closeable { } catch (InterruptedException exc) { Thread.currentThread().interrupt(); } - onClose.run(); } /** @@ -237,7 +236,11 @@ public class BulkProcessor implements Closeable { if (bulkRequest.numberOfActions() > 0) { execute(); } - return this.bulkRequestHandler.awaitClose(timeout, unit); + try { + return this.bulkRequestHandler.awaitClose(timeout, unit); + } finally { + onClose.run(); + } } /** diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index 4ff5b69ad37..3fbfa381ad3 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -32,6 +32,8 @@ import org.junit.After; import org.junit.Before; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; public class BulkProcessorTests extends ESTestCase { @@ -97,4 +99,29 @@ public class BulkProcessorTests extends ESTestCase { assertNull(threadPool.getThreadContext().getTransient(transientKey)); bulkProcessor.close(); } + + public void testAwaitOnCloseCallsOnClose() throws Exception { + final AtomicBoolean called = new AtomicBoolean(false); + BulkProcessor bulkProcessor = new BulkProcessor((request, listener) -> { + }, BackoffPolicy.noBackoff(), new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + + } + }, 0, 10, new ByteSizeValue(1000), null, (delay, executor, command) -> null, () -> called.set(true)); + + assertFalse(called.get()); + bulkProcessor.awaitClose(100, TimeUnit.MILLISECONDS); + assertTrue(called.get()); + } }