Bulk Api support for global parameters (#34528)

Bulk Request in High level rest client should be consistent with what is
possible in Rest API, therefore should support global parameters. Global
parameters are passed in URL in Rest API.

Some parameters are mandatory - index, type - and would fail validation
if not provided before before the bulk is executed.
Optional parameters - routing, pipeline.

The usage of these should be consistent across sync/async execution,
bulk processor and BulkRequestBuilder

closes #26026
This commit is contained in:
Przemyslaw Gomulka 2018-10-30 09:08:12 +01:00 committed by GitHub
parent d67c88fa12
commit 995bf0ee66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 718 additions and 47 deletions

View File

@ -118,7 +118,8 @@ final class RequestConverters {
Params parameters = new Params(request);
parameters.withTimeout(bulkRequest.timeout());
parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy());
parameters.withPipeline(bulkRequest.pipeline());
parameters.withRouting(bulkRequest.routing());
// Bulk API only supports newline delimited JSON or Smile. Before executing
// the bulk, we need to check that all requests have the same content-type
// and this content-type is supported by the Bulk API.

View File

@ -28,14 +28,18 @@ import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchHit;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@ -44,10 +48,19 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.fieldFromSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasIndex;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasProperty;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasType;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
@ -268,23 +281,124 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
assertMultiGetResponse(highLevelClient().mget(multiGetRequest, RequestOptions.DEFAULT), testDocs);
}
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
@SuppressWarnings("unchecked")
public void testGlobalParametersAndSingleRequest() throws Exception {
createIndexWithMultipleShards("test");
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");
// tag::bulk-processor-mix-parameters
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
.setGlobalIndex("tweets")
.setGlobalType("_doc")
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {
processor.add(new IndexRequest() // <1>
.source(XContentType.JSON, "user", "some user"));
processor.add(new IndexRequest("blogs", "post_type", "1") // <2>
.source(XContentType.JSON, "title", "some title"));
}
// end::bulk-request-mix-pipeline
latch.await();
Iterable<SearchHit> hits = searchAll(new SearchRequest("tweets").routing("routing"));
assertThat(hits, everyItem(hasProperty(fieldFromSource("user"), equalTo("some user"))));
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
Iterable<SearchHit> blogs = searchAll(new SearchRequest("blogs").routing("routing"));
assertThat(blogs, everyItem(hasProperty(fieldFromSource("title"), equalTo("some title"))));
assertThat(blogs, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
}
@SuppressWarnings("unchecked")
public void testGlobalParametersAndBulkProcessor() throws Exception {
createIndexWithMultipleShards("test");
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");
int numDocs = randomIntBetween(10, 10);
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType("test")
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {
indexDocs(processor, numDocs, null, null, "test", "test", "pipeline_id");
latch.await();
assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs);
Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType("test"))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
@SuppressWarnings("unchecked")
private Matcher<SearchHit>[] expectedIds(int numDocs) {
return IntStream.rangeClosed(1, numDocs)
.boxed()
.map(n -> hasId(n.toString()))
.<Matcher<SearchHit>>toArray(Matcher[]::new);
}
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
String globalIndex, String globalType, String globalPipeline) throws Exception {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
processor.add(new IndexRequest("test", "test", Integer.toString(i))
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
processor.add(new IndexRequest(localIndex, localType, Integer.toString(i))
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
} else {
final String source = "{ \"index\":{\"_index\":\"test\",\"_type\":\"test\",\"_id\":\"" + Integer.toString(i) + "\"} }\n"
+ Strings.toString(JsonXContent.contentBuilder()
.startObject().field("field", randomRealisticUnicodeOfLengthBetween(1, 30)).endObject()) + "\n";
processor.add(new BytesArray(source), null, null, XContentType.JSON);
BytesArray data = bytesBulkRequest(localIndex, localType, i);
processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON);
}
multiGetRequest.add("test", "test", Integer.toString(i));
multiGetRequest.add(localIndex, localType, Integer.toString(i));
}
return multiGetRequest;
}
private static BytesArray bytesBulkRequest(String localIndex, String localType, int id) throws IOException {
String action = Strings.toString(jsonBuilder()
.startObject()
.startObject("index")
.field("_index", localIndex)
.field("_type", localType)
.field("_id", Integer.toString(id))
.endObject()
.endObject()
);
String source = Strings.toString(jsonBuilder()
.startObject()
.field("field", randomRealisticUnicodeOfLengthBetween(1, 30))
.endObject()
);
String request = action + "\n" + source + "\n";
return new BytesArray(request);
}
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
return indexDocs(processor, numDocs, "test", "test", null, null, null);
}
private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
assertThat(bulkItemResponses.size(), is(numDocs));
int i = 1;
@ -343,4 +457,5 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
}
}
}

View File

@ -0,0 +1,217 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import java.util.function.Function;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasIndex;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasProperty;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasType;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class BulkRequestWithGlobalParametersIT extends ESRestHighLevelClientTestCase {
@SuppressWarnings("unchecked")
public void testGlobalPipelineOnBulkRequest() throws IOException {
createFieldAddingPipleine("xyz", "fieldNameXYZ", "valueXYZ");
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("test", "doc", "1")
.source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest("test", "doc", "2")
.source(XContentType.JSON, "field", "bulk2"));
request.pipeline("xyz");
bulk(request);
Iterable<SearchHit> hits = searchAll("test");
assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2")));
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
}
public void testPipelineOnRequestOverridesGlobalPipeline() throws IOException {
createFieldAddingPipleine("globalId", "fieldXYZ", "valueXYZ");
createFieldAddingPipleine("perIndexId", "someNewField", "someValue");
BulkRequest request = new BulkRequest();
request.pipeline("globalId");
request.add(new IndexRequest("test", "doc", "1")
.source(XContentType.JSON, "field", "bulk1")
.setPipeline("perIndexId"));
request.add(new IndexRequest("test", "doc", "2")
.source(XContentType.JSON, "field", "bulk2")
.setPipeline("perIndexId"));
bulk(request);
Iterable<SearchHit> hits = searchAll("test");
assertThat(hits, everyItem(hasProperty(fieldFromSource("someNewField"), equalTo("someValue"))));
// global pipeline was not applied
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldXYZ"), nullValue())));
}
@SuppressWarnings("unchecked")
public void testMixPipelineOnRequestAndGlobal() throws IOException {
createFieldAddingPipleine("globalId", "fieldXYZ", "valueXYZ");
createFieldAddingPipleine("perIndexId", "someNewField", "someValue");
// tag::bulk-request-mix-pipeline
BulkRequest request = new BulkRequest();
request.pipeline("globalId");
request.add(new IndexRequest("test", "doc", "1")
.source(XContentType.JSON, "field", "bulk1")
.setPipeline("perIndexId")); // <1>
request.add(new IndexRequest("test", "doc", "2")
.source(XContentType.JSON, "field", "bulk2")); // <2>
// end::bulk-request-mix-pipeline
bulk(request);
Iterable<SearchHit> hits = searchAll("test");
assertThat(hits, containsInAnyOrder(
both(hasId("1"))
.and(hasProperty(fieldFromSource("someNewField"), equalTo("someValue"))),
both(hasId("2"))
.and(hasProperty(fieldFromSource("fieldXYZ"), equalTo("valueXYZ")))));
}
public void testGlobalIndex() throws IOException {
BulkRequest request = new BulkRequest("global_index", null);
request.add(new IndexRequest().type("doc").id("1")
.source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest().type("doc").id("2")
.source(XContentType.JSON, "field", "bulk2"));
bulk(request);
Iterable<SearchHit> hits = searchAll("global_index");
assertThat(hits, everyItem(hasIndex("global_index")));
}
@SuppressWarnings("unchecked")
public void testIndexGlobalAndPerRequest() throws IOException {
BulkRequest request = new BulkRequest("global_index", null);
request.add(new IndexRequest("local_index", "doc", "1")
.source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest().type("doc").id("2") // will take global index
.source(XContentType.JSON, "field", "bulk2"));
bulk(request);
Iterable<SearchHit> hits = searchAll("local_index", "global_index");
assertThat(hits, containsInAnyOrder(
both(hasId("1"))
.and(hasIndex("local_index")),
both(hasId("2"))
.and(hasIndex("global_index"))));
}
public void testGlobalType() throws IOException {
BulkRequest request = new BulkRequest(null, "global_type");
request.add(new IndexRequest("index").id("1")
.source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest("index").id("2")
.source(XContentType.JSON, "field", "bulk2"));
bulk(request);
Iterable<SearchHit> hits = searchAll("index");
assertThat(hits, everyItem(hasType("global_type")));
}
@SuppressWarnings("unchecked")
public void testTypeGlobalAndPerRequest() throws IOException {
BulkRequest request = new BulkRequest(null, "global_type");
request.add(new IndexRequest("index1", "local_type", "1")
.source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest("index2").id("2") // will take global type
.source(XContentType.JSON, "field", "bulk2"));
bulk(request);
Iterable<SearchHit> hits = searchAll("index1", "index2");
assertThat(hits, containsInAnyOrder(
both(hasId("1"))
.and(hasType("local_type")),
both(hasId("2"))
.and(hasType("global_type"))));
}
@SuppressWarnings("unchecked")
public void testGlobalRouting() throws IOException {
createIndexWithMultipleShards("index");
BulkRequest request = new BulkRequest(null, null);
request.add(new IndexRequest("index", "type", "1")
.source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest("index", "type", "2")
.source(XContentType.JSON, "field", "bulk1"));
request.routing("1");
bulk(request);
Iterable<SearchHit> emptyHits = searchAll(new SearchRequest("index").routing("xxx"));
assertThat(emptyHits, is(emptyIterable()));
Iterable<SearchHit> hits = searchAll(new SearchRequest("index").routing("1"));
assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2")));
}
@SuppressWarnings("unchecked")
public void testMixLocalAndGlobalRouting() throws IOException {
BulkRequest request = new BulkRequest(null, null);
request.routing("globalRouting");
request.add(new IndexRequest("index", "type", "1")
.source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest("index", "type", "2")
.routing("localRouting")
.source(XContentType.JSON, "field", "bulk1"));
bulk(request);
Iterable<SearchHit> hits = searchAll(new SearchRequest("index").routing("globalRouting", "localRouting"));
assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2")));
}
private BulkResponse bulk(BulkRequest request) throws IOException {
BulkResponse bulkResponse = execute(request, highLevelClient()::bulk, highLevelClient()::bulkAsync);
assertFalse(bulkResponse.hasFailures());
return bulkResponse;
}
@SuppressWarnings("unchecked")
private static <T> Function<SearchHit, T> fieldFromSource(String fieldName) {
return (response) -> (T) response.getSourceAsMap().get(fieldName);
}
}

View File

@ -21,7 +21,10 @@ package org.elasticsearch.client;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
@ -30,15 +33,20 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.AfterClass;
import org.junit.Before;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
@ -125,6 +133,22 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
return buildRandomXContentPipeline(pipelineBuilder);
}
protected static void createFieldAddingPipleine(String id, String fieldName, String value) throws IOException {
XContentBuilder pipeline = jsonBuilder()
.startObject()
.startArray("processors")
.startObject()
.startObject("set")
.field("field", fieldName)
.field("value", value)
.endObject()
.endObject()
.endArray()
.endObject();
createPipeline(new PutPipelineRequest(id, BytesReference.bytes(pipeline), XContentType.JSON));
}
protected static void createPipeline(String pipelineId) throws IOException {
XContentBuilder builder = buildRandomXContentPipeline();
createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType()));
@ -154,4 +178,32 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
.put(ThreadContext.PREFIX + ".Authorization", token)
.build();
}
protected Iterable<SearchHit> searchAll(String... indices) throws IOException {
SearchRequest searchRequest = new SearchRequest(indices);
return searchAll(searchRequest);
}
protected Iterable<SearchHit> searchAll(SearchRequest searchRequest) throws IOException {
refreshIndexes(searchRequest.indices());
SearchResponse search = highLevelClient().search(searchRequest, RequestOptions.DEFAULT);
return search.getHits();
}
protected void refreshIndexes(String... indices) throws IOException {
String joinedIndices = Arrays.stream(indices)
.collect(Collectors.joining(","));
Response refreshResponse = client().performRequest(new Request("POST", "/" + joinedIndices + "/_refresh"));
assertEquals(200, refreshResponse.getStatusLine().getStatusCode());
}
protected void createIndexWithMultipleShards(String index) throws IOException {
CreateIndexRequest indexRequest = new CreateIndexRequest(index);
int shards = randomIntBetween(8,10);
indexRequest.settings(Settings.builder()
.put("index.number_of_shards", shards)
.put("index.number_of_replicas", 0)
);
highLevelClient().indices().create(indexRequest, RequestOptions.DEFAULT);
}
}

View File

@ -100,6 +100,7 @@ import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.RandomObjects;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.io.InputStream;
@ -860,6 +861,21 @@ public class RequestConvertersTests extends ESTestCase {
}
}
public void testGlobalPipelineOnBulkRequest() throws IOException {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.pipeline("xyz");
bulkRequest.add(new IndexRequest("test", "doc", "11")
.source(XContentType.JSON, "field", "bulk1"));
bulkRequest.add(new IndexRequest("test", "doc", "12")
.source(XContentType.JSON, "field", "bulk2"));
bulkRequest.add(new IndexRequest("test", "doc", "13")
.source(XContentType.JSON, "field", "bulk3"));
Request request = RequestConverters.bulk(bulkRequest);
assertThat(request.getParameters(), Matchers.hasEntry("pipeline","xyz"));
}
public void testSearchNullSource() throws IOException {
SearchRequest searchRequest = new SearchRequest();
Request request = RequestConverters.search(searchRequest);

View File

@ -749,6 +749,16 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
request.waitForActiveShards(2); // <1>
request.waitForActiveShards(ActiveShardCount.ALL); // <2>
// end::bulk-request-active-shards
// tag::bulk-request-pipeline
request.pipeline("pipelineId"); // <1>
// end::bulk-request-pipeline
// tag::bulk-request-routing
request.routing("routingId"); // <1>
// end::bulk-request-routing
// tag::bulk-request-index-type
BulkRequest defaulted = new BulkRequest("posts","_doc"); // <1>
// end::bulk-request-index-type
// tag::bulk-execute-listener
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {

View File

@ -165,3 +165,26 @@ client.admin().indices().prepareRefresh().get();
client.prepareSearch().get();
--------------------------------------------------
[[java-docs-bulk-global-parameters]]
==== Global Parameters
Global parameters can be specified on the BulkRequest as well as BulkProcessor, similar to the REST API. These global
parameters serve as defaults and can be overridden by local parameters specified on each sub request. Some parameters
have to be set before any sub request is added - index, type - and you have to specify them during BulkRequest or
BulkProcessor creation. Some are optional - pipeline, routing - and can be specified at any point before the bulk is sent.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/BulkProcessorIT.java[bulk-processor-mix-parameters]
--------------------------------------------------
<1> global parameters from the BulkRequest will be applied on a sub request
<2> local pipeline parameter on a sub request will override global parameters from BulkRequest
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/BulkRequestWithGlobalParametersIT.java[bulk-request-mix-pipeline]
--------------------------------------------------
<1> local pipeline parameter on a sub request will override global pipeline from the BulkRequest
<2> global parameter from the BulkRequest will be applied on a sub request

View File

@ -70,6 +70,25 @@ the index/update/delete operations.
`ActiveShardCount.ALL`, `ActiveShardCount.ONE` or
`ActiveShardCount.DEFAULT` (default)
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-request-pipeline]
--------------------------------------------------
<1> Global pipelineId used on all sub requests, unless overridden on a sub request
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-request-routing]
--------------------------------------------------
<1> Global routingId used on all sub requests, unless overridden on a sub request
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-request-index-type]
--------------------------------------------------
<1> A bulk request with global index and type used on all sub requests, unless overridden on a sub request.
Both parameters are @Nullable and can only be set during BulkRequest creation.
include::../execution.asciidoc[]
[id="{upid}-{api}-response"]

View File

@ -35,12 +35,25 @@ import java.util.Locale;
*/
public interface DocWriteRequest<T> extends IndicesRequest {
/**
* Set the index for this request
* @return the Request
*/
T index(String index);
/**
* Get the index that this request operates on
* @return the index
*/
String index();
/**
* Set the type for this request
* @return the Request
*/
T type(String type);
/**
* Get the type that this request operates on
* @return the type

View File

@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
/**
* A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request
@ -88,6 +89,10 @@ public class BulkProcessor implements Closeable {
private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
private TimeValue flushInterval = null;
private BackoffPolicy backoffPolicy = BackoffPolicy.exponentialBackoff();
private String globalIndex;
private String globalType;
private String globalRouting;
private String globalPipeline;
private Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener,
Scheduler scheduler, Runnable onClose) {
@ -136,6 +141,26 @@ public class BulkProcessor implements Closeable {
return this;
}
public Builder setGlobalIndex(String globalIndex) {
this.globalIndex = globalIndex;
return this;
}
public Builder setGlobalType(String globalType) {
this.globalType = globalType;
return this;
}
public Builder setGlobalRouting(String globalRouting) {
this.globalRouting = globalRouting;
return this;
}
public Builder setGlobalPipeline(String globalPipeline) {
this.globalPipeline = globalPipeline;
return this;
}
/**
* Sets a custom backoff policy. The backoff policy defines how the bulk processor should handle retries of bulk requests internally
* in case they have failed due to resource constraints (i.e. a thread pool was full).
@ -156,8 +181,14 @@ public class BulkProcessor implements Closeable {
* Builds a new bulk processor.
*/
public BulkProcessor build() {
return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval,
scheduler, onClose);
return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions,
bulkSize, flushInterval, scheduler, onClose, createBulkRequestWithGlobalDefaults());
}
private Supplier<BulkRequest> createBulkRequestWithGlobalDefaults() {
return () -> new BulkRequest(globalIndex, globalType)
.pipeline(globalPipeline)
.routing(globalRouting);
}
}
@ -184,6 +215,7 @@ public class BulkProcessor implements Closeable {
private final AtomicLong executionIdGen = new AtomicLong();
private BulkRequest bulkRequest;
private final Supplier<BulkRequest> bulkRequestSupplier;
private final BulkRequestHandler bulkRequestHandler;
private final Runnable onClose;
@ -191,10 +223,11 @@ public class BulkProcessor implements Closeable {
BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
Scheduler scheduler, Runnable onClose) {
Scheduler scheduler, Runnable onClose, Supplier<BulkRequest> bulkRequestSupplier) {
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
this.bulkRequest = bulkRequestSupplier.get();
this.bulkRequestSupplier = bulkRequestSupplier;
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
// Start period flushing task after everything is setup
this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
@ -335,7 +368,7 @@ public class BulkProcessor implements Closeable {
final BulkRequest bulkRequest = this.bulkRequest;
final long executionId = executionIdGen.incrementAndGet();
this.bulkRequest = new BulkRequest();
this.bulkRequest = bulkRequestSupplier.get();
this.bulkRequestHandler.execute(bulkRequest, executionId);
}

View File

@ -90,12 +90,21 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
private String globalPipeline;
private String globalRouting;
private String globalIndex;
private String globalType;
private long sizeInBytes = 0;
public BulkRequest() {
}
public BulkRequest(@Nullable String globalIndex, @Nullable String globalType) {
this.globalIndex = globalIndex;
this.globalType = globalType;
}
/**
* Adds a list of requests to be executed. Either index or delete requests.
*/
@ -154,6 +163,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
BulkRequest internalAdd(IndexRequest request, @Nullable Object payload) {
Objects.requireNonNull(request, "'request' must not be null");
applyGlobalMandatoryParameters(request);
requests.add(request);
addPayload(payload);
// lack of source is validated in validate() method
@ -175,6 +186,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
BulkRequest internalAdd(UpdateRequest request, @Nullable Object payload) {
Objects.requireNonNull(request, "'request' must not be null");
applyGlobalMandatoryParameters(request);
requests.add(request);
addPayload(payload);
if (request.doc() != null) {
@ -199,6 +212,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
public BulkRequest add(DeleteRequest request, @Nullable Object payload) {
Objects.requireNonNull(request, "'request' must not be null");
applyGlobalMandatoryParameters(request);
requests.add(request);
addPayload(payload);
sizeInBytes += REQUEST_OVERHEAD;
@ -327,13 +342,13 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
String index = defaultIndex;
String type = defaultType;
String id = null;
String routing = defaultRouting;
String routing = valueOrDefault(defaultRouting, globalRouting);
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
String opType = null;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
int retryOnConflict = 0;
String pipeline = defaultPipeline;
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
@ -503,6 +518,15 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
return this;
}
public final BulkRequest pipeline(String globalPipeline) {
this.globalPipeline = globalPipeline;
return this;
}
public final BulkRequest routing(String globalRouting){
this.globalRouting = globalRouting;
return this;
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to {@code 1m}.
*/
@ -514,6 +538,14 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
return timeout;
}
public String pipeline() {
return globalPipeline;
}
public String routing() {
return globalRouting;
}
private int findNextMarker(byte marker, int from, BytesReference data, int length) {
for (int i = from; i < length; i++) {
if (data.get(i) == marker) {
@ -579,4 +611,15 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
return "requests[" + requests.size() + "], indices[" + Strings.collectionToDelimitedString(indices, ", ") + "]";
}
private void applyGlobalMandatoryParameters(DocWriteRequest<?> request) {
request.index(valueOrDefault(request.index(), globalIndex));
request.type(valueOrDefault(request.type(), globalType));
}
private static String valueOrDefault(String value, String globalDefault) {
if (Strings.isNullOrEmpty(value) && !Strings.isNullOrEmpty(globalDefault)) {
return globalDefault;
}
return value;
}
}

View File

@ -41,6 +41,10 @@ import org.elasticsearch.common.xcontent.XContentType;
public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkResponse>
implements WriteRequestBuilder<BulkRequestBuilder> {
public BulkRequestBuilder(ElasticsearchClient client, BulkAction action, @Nullable String globalIndex, @Nullable String globalType) {
super(client, action, new BulkRequest(globalIndex, globalType));
}
public BulkRequestBuilder(ElasticsearchClient client, BulkAction action) {
super(client, action, new BulkRequest());
}
@ -153,4 +157,14 @@ public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkRe
public int numberOfActions() {
return request.numberOfActions();
}
public BulkRequestBuilder pipeline(String globalPipeline) {
request.pipeline(globalPipeline);
return this;
}
public BulkRequestBuilder routing(String globalRouting) {
request.routing(globalRouting);
return this;
}
}

View File

@ -110,6 +110,7 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
/**
* Sets the type of the document to delete.
*/
@Override
public DeleteRequest type(String type) {
this.type = type;
return this;

View File

@ -214,6 +214,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
/**
* Sets the type of the indexed document.
*/
@Override
public IndexRequest type(String type) {
this.type = type;
return this;

View File

@ -232,6 +232,11 @@ public interface Client extends ElasticsearchClient, Releasable {
*/
BulkRequestBuilder prepareBulk();
/**
* Executes a bulk of index / delete operations with default index and/or type
*/
BulkRequestBuilder prepareBulk(@Nullable String globalIndex, @Nullable String globalType);
/**
* Gets the document that was indexed from an index with a type and id.
*

View File

@ -471,6 +471,11 @@ public abstract class AbstractClient extends AbstractComponent implements Client
return new BulkRequestBuilder(this, BulkAction.INSTANCE);
}
@Override
public BulkRequestBuilder prepareBulk(@Nullable String globalIndex, @Nullable String globalType) {
return new BulkRequestBuilder(this, BulkAction.INSTANCE, globalIndex, globalType);
}
@Override
public ActionFuture<GetResponse> get(final GetRequest request) {
return execute(GetAction.INSTANCE, request);

View File

@ -20,22 +20,40 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.IngestTestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
public class BulkIntegrationIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(IngestTestPlugin.class);
}
public void testBulkIndexCreatesMapping() throws Exception {
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/bulk-log.json");
BulkRequestBuilder bulkBuilder = client().prepareBulk();
@ -81,4 +99,52 @@ public class BulkIntegrationIT extends ESIntegTestCase {
assertFalse(bulkResponse.hasFailures());
assertFalse(client().prepareGet("index3", "type", "id").setRouting("1").get().isExists());
}
public void testBulkWithGlobalDefaults() throws Exception {
// all requests in the json are missing index and type parameters: "_index" : "test", "_type" : "type1",
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk-missing-index-type.json");
{
BulkRequestBuilder bulkBuilder = client().prepareBulk();
bulkBuilder.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON);
ActionRequestValidationException ex = expectThrows(ActionRequestValidationException.class, bulkBuilder::get);
assertThat(ex.validationErrors(), containsInAnyOrder(
"index is missing",
"index is missing",
"index is missing",
"type is missing",
"type is missing",
"type is missing"));
}
{
createSamplePipeline("pipeline");
BulkRequestBuilder bulkBuilder = client().prepareBulk("test","type1")
.routing("routing")
.pipeline("pipeline");
bulkBuilder.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON);
BulkResponse bulkItemResponses = bulkBuilder.get();
assertFalse(bulkItemResponses.hasFailures());
}
}
private void createSamplePipeline(String pipelineId) throws IOException, ExecutionException, InterruptedException {
XContentBuilder pipeline = jsonBuilder()
.startObject()
.startArray("processors")
.startObject()
.startObject("test")
.endObject()
.endObject()
.endArray()
.endObject();
AcknowledgedResponse acknowledgedResponse = client().admin()
.cluster()
.putPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(pipeline), XContentType.JSON))
.get();
assertTrue(acknowledgedResponse.isAcknowledged());
}
}

View File

@ -72,19 +72,9 @@ public class BulkProcessorTests extends ESTestCase {
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
threadPool.getThreadContext().putHeader(headerKey, headerValue);
threadPool.getThreadContext().putTransient(transientKey, transientValue);
bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
}
}, 1, bulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), flushInterval, threadPool, () -> {});
bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), emptyListener(),
1, bulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), flushInterval,
threadPool, () -> {}, BulkRequest::new);
}
assertNull(threadPool.getThreadContext().getHeader(headerKey));
assertNull(threadPool.getThreadContext().getTransient(transientKey));
@ -100,28 +90,32 @@ public class BulkProcessorTests extends ESTestCase {
bulkProcessor.close();
}
public void testAwaitOnCloseCallsOnClose() throws Exception {
final AtomicBoolean called = new AtomicBoolean(false);
BulkProcessor bulkProcessor = new BulkProcessor((request, listener) -> {
}, BackoffPolicy.noBackoff(), new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
}
}, 0, 10, new ByteSizeValue(1000), null, (delay, executor, command) -> null, () -> called.set(true));
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {};
BulkProcessor bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), emptyListener(),
0, 10, new ByteSizeValue(1000), null,
(delay, executor, command) -> null, () -> called.set(true), BulkRequest::new);
assertFalse(called.get());
bulkProcessor.awaitClose(100, TimeUnit.MILLISECONDS);
assertTrue(called.get());
}
private BulkProcessor.Listener emptyListener() {
return new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
}
};
}
}

View File

@ -0,0 +1,5 @@
{ "index":{"_id":"1"} }
{ "field1" : "value1" }
{ "delete" : { "_id" : "2" } }
{ "create" : { "_id" : "3" } }
{ "field1" : "value3" }

View File

@ -58,6 +58,7 @@ import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.test.NotEqualMessageBuilder;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.core.CombinableMatcher;
import java.io.IOException;
import java.nio.file.Files;
@ -70,6 +71,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@ -472,6 +474,14 @@ public class ElasticsearchAssertions {
return new ElasticsearchMatchers.SearchHitHasScoreMatcher(score);
}
public static <T, V> CombinableMatcher<T> hasProperty(Function<? super T, ? extends V> property, Matcher<V> valueMatcher) {
return ElasticsearchMatchers.HasPropertyLambdaMatcher.hasProperty(property, valueMatcher);
}
public static Function<SearchHit, Object> fieldFromSource(String fieldName) {
return (response) -> response.getSourceAsMap().get(fieldName);
}
public static <T extends Query> T assertBooleanSubQuery(Query query, Class<T> subqueryType, int i) {
assertThat(query, instanceOf(BooleanQuery.class));
BooleanQuery q = (BooleanQuery) query;

View File

@ -20,7 +20,12 @@ package org.elasticsearch.test.hamcrest;
import org.elasticsearch.search.SearchHit;
import org.hamcrest.Description;
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.hamcrest.core.CombinableMatcher;
import java.util.function.Function;
public class ElasticsearchMatchers {
@ -115,4 +120,27 @@ public class ElasticsearchMatchers {
description.appendText("searchHit score should be ").appendValue(score);
}
}
public static class HasPropertyLambdaMatcher<T, V> extends FeatureMatcher<T, V> {
private final Function<? super T, ? extends V> property;
private HasPropertyLambdaMatcher(Matcher<? super V> subMatcher, Function<? super T, ? extends V> property) {
super(subMatcher, "object with", "lambda");
this.property = property;
}
@Override
protected V featureValueOf(T actual) {
return property.apply(actual);
}
/**
* @param valueMatcher The matcher to apply to the property
* @param property The lambda to fetch property
*/
public static <T, V> CombinableMatcher<T> hasProperty(Function<? super T, ? extends V> property, Matcher<V> valueMatcher) {
return new CombinableMatcher<>(new HasPropertyLambdaMatcher<>(valueMatcher, property));
}
}
}