Add BulkRequest support to High Level Rest client (#23312)
This commit adds support for BulkRequest execution in the High Level Rest client.
This commit is contained in:
parent
a4afc22df6
commit
7e3c06c55d
|
@ -28,19 +28,26 @@ import org.apache.http.entity.ByteArrayEntity;
|
||||||
import org.apache.http.entity.ContentType;
|
import org.apache.http.entity.ContentType;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
import org.elasticsearch.action.DocWriteRequest;
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
import org.elasticsearch.action.get.GetRequest;
|
import org.elasticsearch.action.get.GetRequest;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.support.ActiveShardCount;
|
import org.elasticsearch.action.support.ActiveShardCount;
|
||||||
import org.elasticsearch.action.support.WriteRequest;
|
import org.elasticsearch.action.support.WriteRequest;
|
||||||
import org.elasticsearch.action.update.UpdateRequest;
|
import org.elasticsearch.action.update.UpdateRequest;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.lucene.uid.Versions;
|
import org.elasticsearch.common.lucene.uid.Versions;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.VersionType;
|
import org.elasticsearch.index.VersionType;
|
||||||
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -77,6 +84,127 @@ final class Request {
|
||||||
return new Request("HEAD", "/", Collections.emptyMap(), null);
|
return new Request("HEAD", "/", Collections.emptyMap(), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static Request bulk(BulkRequest bulkRequest) throws IOException {
|
||||||
|
Params parameters = Params.builder();
|
||||||
|
parameters.withTimeout(bulkRequest.timeout());
|
||||||
|
parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy());
|
||||||
|
|
||||||
|
// Bulk API only supports newline delimited JSON or Smile. Before executing
|
||||||
|
// the bulk, we need to check that all requests have the same content-type
|
||||||
|
// and this content-type is supported by the Bulk API.
|
||||||
|
XContentType bulkContentType = null;
|
||||||
|
for (int i = 0; i < bulkRequest.numberOfActions(); i++) {
|
||||||
|
DocWriteRequest<?> request = bulkRequest.requests().get(i);
|
||||||
|
|
||||||
|
DocWriteRequest.OpType opType = request.opType();
|
||||||
|
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
|
||||||
|
bulkContentType = enforceSameContentType((IndexRequest) request, bulkContentType);
|
||||||
|
|
||||||
|
} else if (opType == DocWriteRequest.OpType.UPDATE) {
|
||||||
|
UpdateRequest updateRequest = (UpdateRequest) request;
|
||||||
|
if (updateRequest.doc() != null) {
|
||||||
|
bulkContentType = enforceSameContentType(updateRequest.doc(), bulkContentType);
|
||||||
|
}
|
||||||
|
if (updateRequest.upsertRequest() != null) {
|
||||||
|
bulkContentType = enforceSameContentType(updateRequest.upsertRequest(), bulkContentType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bulkContentType == null) {
|
||||||
|
bulkContentType = XContentType.JSON;
|
||||||
|
}
|
||||||
|
|
||||||
|
byte separator = bulkContentType.xContent().streamSeparator();
|
||||||
|
ContentType requestContentType = ContentType.create(bulkContentType.mediaType());
|
||||||
|
|
||||||
|
ByteArrayOutputStream content = new ByteArrayOutputStream();
|
||||||
|
for (DocWriteRequest<?> request : bulkRequest.requests()) {
|
||||||
|
DocWriteRequest.OpType opType = request.opType();
|
||||||
|
|
||||||
|
try (XContentBuilder metadata = XContentBuilder.builder(bulkContentType.xContent())) {
|
||||||
|
metadata.startObject();
|
||||||
|
{
|
||||||
|
metadata.startObject(opType.getLowercase());
|
||||||
|
if (Strings.hasLength(request.index())) {
|
||||||
|
metadata.field("_index", request.index());
|
||||||
|
}
|
||||||
|
if (Strings.hasLength(request.type())) {
|
||||||
|
metadata.field("_type", request.type());
|
||||||
|
}
|
||||||
|
if (Strings.hasLength(request.id())) {
|
||||||
|
metadata.field("_id", request.id());
|
||||||
|
}
|
||||||
|
if (Strings.hasLength(request.routing())) {
|
||||||
|
metadata.field("_routing", request.routing());
|
||||||
|
}
|
||||||
|
if (Strings.hasLength(request.parent())) {
|
||||||
|
metadata.field("_parent", request.parent());
|
||||||
|
}
|
||||||
|
if (request.version() != Versions.MATCH_ANY) {
|
||||||
|
metadata.field("_version", request.version());
|
||||||
|
}
|
||||||
|
|
||||||
|
VersionType versionType = request.versionType();
|
||||||
|
if (versionType != VersionType.INTERNAL) {
|
||||||
|
if (versionType == VersionType.EXTERNAL) {
|
||||||
|
metadata.field("_version_type", "external");
|
||||||
|
} else if (versionType == VersionType.EXTERNAL_GTE) {
|
||||||
|
metadata.field("_version_type", "external_gte");
|
||||||
|
} else if (versionType == VersionType.FORCE) {
|
||||||
|
metadata.field("_version_type", "force");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
|
||||||
|
IndexRequest indexRequest = (IndexRequest) request;
|
||||||
|
if (Strings.hasLength(indexRequest.getPipeline())) {
|
||||||
|
metadata.field("pipeline", indexRequest.getPipeline());
|
||||||
|
}
|
||||||
|
} else if (opType == DocWriteRequest.OpType.UPDATE) {
|
||||||
|
UpdateRequest updateRequest = (UpdateRequest) request;
|
||||||
|
if (updateRequest.retryOnConflict() > 0) {
|
||||||
|
metadata.field("_retry_on_conflict", updateRequest.retryOnConflict());
|
||||||
|
}
|
||||||
|
if (updateRequest.fetchSource() != null) {
|
||||||
|
metadata.field("_source", updateRequest.fetchSource());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
metadata.endObject();
|
||||||
|
}
|
||||||
|
metadata.endObject();
|
||||||
|
|
||||||
|
BytesRef metadataSource = metadata.bytes().toBytesRef();
|
||||||
|
content.write(metadataSource.bytes, metadataSource.offset, metadataSource.length);
|
||||||
|
content.write(separator);
|
||||||
|
}
|
||||||
|
|
||||||
|
BytesRef source = null;
|
||||||
|
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
|
||||||
|
IndexRequest indexRequest = (IndexRequest) request;
|
||||||
|
BytesReference indexSource = indexRequest.source();
|
||||||
|
XContentType indexXContentType = indexRequest.getContentType();
|
||||||
|
|
||||||
|
try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, indexSource, indexXContentType)) {
|
||||||
|
try (XContentBuilder builder = XContentBuilder.builder(bulkContentType.xContent())) {
|
||||||
|
builder.copyCurrentStructure(parser);
|
||||||
|
source = builder.bytes().toBytesRef();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (opType == DocWriteRequest.OpType.UPDATE) {
|
||||||
|
source = XContentHelper.toXContent((UpdateRequest) request, bulkContentType, false).toBytesRef();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (source != null) {
|
||||||
|
content.write(source.bytes, source.offset, source.length);
|
||||||
|
content.write(separator);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
HttpEntity entity = new ByteArrayEntity(content.toByteArray(), 0, content.size(), requestContentType);
|
||||||
|
return new Request(HttpPost.METHOD_NAME, "/_bulk", parameters.getParams(), entity);
|
||||||
|
}
|
||||||
|
|
||||||
static Request exists(GetRequest getRequest) {
|
static Request exists(GetRequest getRequest) {
|
||||||
Request request = get(getRequest);
|
Request request = get(getRequest);
|
||||||
return new Request(HttpHead.METHOD_NAME, request.endpoint, request.params, null);
|
return new Request(HttpHead.METHOD_NAME, request.endpoint, request.params, null);
|
||||||
|
@ -312,4 +440,26 @@ final class Request {
|
||||||
return new Params();
|
return new Params();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure that the {@link IndexRequest}'s content type is supported by the Bulk API and that it conforms
|
||||||
|
* to the current {@link BulkRequest}'s content type (if it's known at the time of this method get called).
|
||||||
|
*
|
||||||
|
* @return the {@link IndexRequest}'s content type
|
||||||
|
*/
|
||||||
|
static XContentType enforceSameContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) {
|
||||||
|
XContentType requestContentType = indexRequest.getContentType();
|
||||||
|
if (requestContentType != XContentType.JSON && requestContentType != XContentType.SMILE) {
|
||||||
|
throw new IllegalArgumentException("Unsupported content-type found for request with content-type [" + requestContentType
|
||||||
|
+ "], only JSON and SMILE are supported");
|
||||||
|
}
|
||||||
|
if (xContentType == null) {
|
||||||
|
return requestContentType;
|
||||||
|
}
|
||||||
|
if (requestContentType != xContentType) {
|
||||||
|
throw new IllegalArgumentException("Mismatching content-type found for request with content-type [" + requestContentType
|
||||||
|
+ "], previous requests have content-type [" + xContentType + "]");
|
||||||
|
}
|
||||||
|
return xContentType;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,8 @@ import org.elasticsearch.ElasticsearchStatusException;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionRequest;
|
import org.elasticsearch.action.ActionRequest;
|
||||||
import org.elasticsearch.action.ActionRequestValidationException;
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
import org.elasticsearch.action.get.GetRequest;
|
import org.elasticsearch.action.get.GetRequest;
|
||||||
import org.elasticsearch.action.get.GetResponse;
|
import org.elasticsearch.action.get.GetResponse;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
|
@ -59,6 +61,24 @@ public class RestHighLevelClient {
|
||||||
this.client = Objects.requireNonNull(client);
|
this.client = Objects.requireNonNull(client);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes a bulk request using the Bulk API
|
||||||
|
*
|
||||||
|
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API on elastic.co</a>
|
||||||
|
*/
|
||||||
|
public BulkResponse bulk(BulkRequest bulkRequest, Header... headers) throws IOException {
|
||||||
|
return performRequestAndParseEntity(bulkRequest, Request::bulk, BulkResponse::fromXContent, emptySet(), headers);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asynchronously executes a bulk request using the Bulk API
|
||||||
|
*
|
||||||
|
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API on elastic.co</a>
|
||||||
|
*/
|
||||||
|
public void bulkAsync(BulkRequest bulkRequest, ActionListener<BulkResponse> listener, Header... headers) {
|
||||||
|
performRequestAsyncAndParseEntity(bulkRequest, Request::bulk, BulkResponse::fromXContent, listener, emptySet(), headers);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
|
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -25,6 +25,10 @@ import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ElasticsearchStatusException;
|
import org.elasticsearch.ElasticsearchStatusException;
|
||||||
import org.elasticsearch.action.DocWriteRequest;
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
import org.elasticsearch.action.DocWriteResponse;
|
import org.elasticsearch.action.DocWriteResponse;
|
||||||
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||||
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
import org.elasticsearch.action.delete.DeleteRequest;
|
||||||
import org.elasticsearch.action.get.GetRequest;
|
import org.elasticsearch.action.get.GetRequest;
|
||||||
import org.elasticsearch.action.get.GetResponse;
|
import org.elasticsearch.action.get.GetResponse;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
|
@ -32,6 +36,7 @@ import org.elasticsearch.action.index.IndexResponse;
|
||||||
import org.elasticsearch.action.update.UpdateRequest;
|
import org.elasticsearch.action.update.UpdateRequest;
|
||||||
import org.elasticsearch.action.update.UpdateResponse;
|
import org.elasticsearch.action.update.UpdateResponse;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.VersionType;
|
import org.elasticsearch.index.VersionType;
|
||||||
|
@ -440,4 +445,80 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
||||||
exception.getMessage());
|
exception.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testBulk() throws IOException {
|
||||||
|
int nbItems = randomIntBetween(10, 100);
|
||||||
|
boolean[] errors = new boolean[nbItems];
|
||||||
|
|
||||||
|
XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE);
|
||||||
|
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
for (int i = 0; i < nbItems; i++) {
|
||||||
|
String id = String.valueOf(i);
|
||||||
|
boolean erroneous = randomBoolean();
|
||||||
|
errors[i] = erroneous;
|
||||||
|
|
||||||
|
DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values());
|
||||||
|
if (opType == DocWriteRequest.OpType.DELETE) {
|
||||||
|
if (erroneous == false) {
|
||||||
|
assertEquals(RestStatus.CREATED,
|
||||||
|
highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status());
|
||||||
|
}
|
||||||
|
DeleteRequest deleteRequest = new DeleteRequest("index", "test", id);
|
||||||
|
bulkRequest.add(deleteRequest);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
BytesReference source = XContentBuilder.builder(xContentType.xContent()).startObject().field("id", i).endObject().bytes();
|
||||||
|
if (opType == DocWriteRequest.OpType.INDEX) {
|
||||||
|
IndexRequest indexRequest = new IndexRequest("index", "test", id).source(source, xContentType);
|
||||||
|
if (erroneous) {
|
||||||
|
indexRequest.version(12L);
|
||||||
|
}
|
||||||
|
bulkRequest.add(indexRequest);
|
||||||
|
|
||||||
|
} else if (opType == DocWriteRequest.OpType.CREATE) {
|
||||||
|
IndexRequest createRequest = new IndexRequest("index", "test", id).source(source, xContentType).create(true);
|
||||||
|
if (erroneous) {
|
||||||
|
assertEquals(RestStatus.CREATED, highLevelClient().index(createRequest).status());
|
||||||
|
}
|
||||||
|
bulkRequest.add(createRequest);
|
||||||
|
|
||||||
|
} else if (opType == DocWriteRequest.OpType.UPDATE) {
|
||||||
|
UpdateRequest updateRequest = new UpdateRequest("index", "test", id)
|
||||||
|
.doc(new IndexRequest().source(source, xContentType));
|
||||||
|
if (erroneous == false) {
|
||||||
|
assertEquals(RestStatus.CREATED,
|
||||||
|
highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status());
|
||||||
|
}
|
||||||
|
bulkRequest.add(updateRequest);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
BulkResponse bulkResponse = execute(bulkRequest, highLevelClient()::bulk, highLevelClient()::bulkAsync);
|
||||||
|
assertEquals(RestStatus.OK, bulkResponse.status());
|
||||||
|
assertTrue(bulkResponse.getTookInMillis() > 0);
|
||||||
|
assertEquals(nbItems, bulkResponse.getItems().length);
|
||||||
|
|
||||||
|
for (int i = 0; i < nbItems; i++) {
|
||||||
|
BulkItemResponse bulkItemResponse = bulkResponse.getItems()[i];
|
||||||
|
|
||||||
|
assertEquals(i, bulkItemResponse.getItemId());
|
||||||
|
assertEquals("index", bulkItemResponse.getIndex());
|
||||||
|
assertEquals("test", bulkItemResponse.getType());
|
||||||
|
assertEquals(String.valueOf(i), bulkItemResponse.getId());
|
||||||
|
|
||||||
|
DocWriteRequest.OpType requestOpType = bulkRequest.requests().get(i).opType();
|
||||||
|
if (requestOpType == DocWriteRequest.OpType.INDEX || requestOpType == DocWriteRequest.OpType.CREATE) {
|
||||||
|
assertEquals(errors[i], bulkItemResponse.isFailed());
|
||||||
|
assertEquals(errors[i] ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.CREATED, bulkItemResponse.status());
|
||||||
|
} else if (requestOpType == DocWriteRequest.OpType.UPDATE) {
|
||||||
|
assertEquals(errors[i], bulkItemResponse.isFailed());
|
||||||
|
assertEquals(errors[i] ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.OK, bulkItemResponse.status());
|
||||||
|
} else if (requestOpType == DocWriteRequest.OpType.DELETE) {
|
||||||
|
assertFalse(bulkItemResponse.isFailed());
|
||||||
|
assertEquals(errors[i] ? RestStatus.NOT_FOUND : RestStatus.OK, bulkItemResponse.status());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,9 @@ package org.elasticsearch.client;
|
||||||
import org.apache.http.HttpEntity;
|
import org.apache.http.HttpEntity;
|
||||||
import org.apache.http.entity.ByteArrayEntity;
|
import org.apache.http.entity.ByteArrayEntity;
|
||||||
import org.elasticsearch.action.DocWriteRequest;
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
|
import org.elasticsearch.action.bulk.BulkShardRequest;
|
||||||
|
import org.elasticsearch.action.delete.DeleteRequest;
|
||||||
import org.elasticsearch.action.get.GetRequest;
|
import org.elasticsearch.action.get.GetRequest;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.support.WriteRequest;
|
import org.elasticsearch.action.support.WriteRequest;
|
||||||
|
@ -29,6 +32,7 @@ import org.elasticsearch.action.support.replication.ReplicationRequest;
|
||||||
import org.elasticsearch.action.update.UpdateRequest;
|
import org.elasticsearch.action.update.UpdateRequest;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
import org.elasticsearch.common.io.Streams;
|
||||||
import org.elasticsearch.common.lucene.uid.Versions;
|
import org.elasticsearch.common.lucene.uid.Versions;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
|
@ -40,13 +44,15 @@ import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.RandomObjects;
|
import org.elasticsearch.test.RandomObjects;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.io.InputStream;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
import static java.util.Collections.singletonMap;
|
||||||
|
import static org.elasticsearch.client.Request.enforceSameContentType;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
|
||||||
|
|
||||||
public class RequestTests extends ESTestCase {
|
public class RequestTests extends ESTestCase {
|
||||||
|
@ -361,14 +367,207 @@ public class RequestTests extends ESTestCase {
|
||||||
public void testUpdateWithDifferentContentTypes() throws IOException {
|
public void testUpdateWithDifferentContentTypes() throws IOException {
|
||||||
IllegalStateException exception = expectThrows(IllegalStateException.class, () -> {
|
IllegalStateException exception = expectThrows(IllegalStateException.class, () -> {
|
||||||
UpdateRequest updateRequest = new UpdateRequest();
|
UpdateRequest updateRequest = new UpdateRequest();
|
||||||
updateRequest.doc(new IndexRequest().source(Collections.singletonMap("field", "doc"), XContentType.JSON));
|
updateRequest.doc(new IndexRequest().source(singletonMap("field", "doc"), XContentType.JSON));
|
||||||
updateRequest.upsert(new IndexRequest().source(Collections.singletonMap("field", "upsert"), XContentType.YAML));
|
updateRequest.upsert(new IndexRequest().source(singletonMap("field", "upsert"), XContentType.YAML));
|
||||||
Request.update(updateRequest);
|
Request.update(updateRequest);
|
||||||
});
|
});
|
||||||
assertEquals("Update request cannot have different content types for doc [JSON] and upsert [YAML] documents",
|
assertEquals("Update request cannot have different content types for doc [JSON] and upsert [YAML] documents",
|
||||||
exception.getMessage());
|
exception.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testBulk() throws IOException {
|
||||||
|
Map<String, String> expectedParams = new HashMap<>();
|
||||||
|
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
if (randomBoolean()) {
|
||||||
|
String timeout = randomTimeValue();
|
||||||
|
bulkRequest.timeout(timeout);
|
||||||
|
expectedParams.put("timeout", timeout);
|
||||||
|
} else {
|
||||||
|
expectedParams.put("timeout", BulkShardRequest.DEFAULT_TIMEOUT.getStringRep());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (randomBoolean()) {
|
||||||
|
WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
|
||||||
|
bulkRequest.setRefreshPolicy(refreshPolicy);
|
||||||
|
if (refreshPolicy != WriteRequest.RefreshPolicy.NONE) {
|
||||||
|
expectedParams.put("refresh", refreshPolicy.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE);
|
||||||
|
|
||||||
|
int nbItems = randomIntBetween(10, 100);
|
||||||
|
for (int i = 0; i < nbItems; i++) {
|
||||||
|
String index = randomAsciiOfLength(5);
|
||||||
|
String type = randomAsciiOfLength(5);
|
||||||
|
String id = randomAsciiOfLength(5);
|
||||||
|
|
||||||
|
BytesReference source = RandomObjects.randomSource(random(), xContentType);
|
||||||
|
DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values());
|
||||||
|
|
||||||
|
DocWriteRequest<?> docWriteRequest = null;
|
||||||
|
if (opType == DocWriteRequest.OpType.INDEX) {
|
||||||
|
IndexRequest indexRequest = new IndexRequest(index, type, id).source(source, xContentType);
|
||||||
|
docWriteRequest = indexRequest;
|
||||||
|
if (randomBoolean()) {
|
||||||
|
indexRequest.setPipeline(randomAsciiOfLength(5));
|
||||||
|
}
|
||||||
|
if (randomBoolean()) {
|
||||||
|
indexRequest.parent(randomAsciiOfLength(5));
|
||||||
|
}
|
||||||
|
} else if (opType == DocWriteRequest.OpType.CREATE) {
|
||||||
|
IndexRequest createRequest = new IndexRequest(index, type, id).source(source, xContentType).create(true);
|
||||||
|
docWriteRequest = createRequest;
|
||||||
|
if (randomBoolean()) {
|
||||||
|
createRequest.parent(randomAsciiOfLength(5));
|
||||||
|
}
|
||||||
|
} else if (opType == DocWriteRequest.OpType.UPDATE) {
|
||||||
|
final UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(new IndexRequest().source(source, xContentType));
|
||||||
|
docWriteRequest = updateRequest;
|
||||||
|
if (randomBoolean()) {
|
||||||
|
updateRequest.retryOnConflict(randomIntBetween(1, 5));
|
||||||
|
}
|
||||||
|
if (randomBoolean()) {
|
||||||
|
randomizeFetchSourceContextParams(updateRequest::fetchSource, new HashMap<>());
|
||||||
|
}
|
||||||
|
if (randomBoolean()) {
|
||||||
|
updateRequest.parent(randomAsciiOfLength(5));
|
||||||
|
}
|
||||||
|
} else if (opType == DocWriteRequest.OpType.DELETE) {
|
||||||
|
docWriteRequest = new DeleteRequest(index, type, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (randomBoolean()) {
|
||||||
|
docWriteRequest.routing(randomAsciiOfLength(10));
|
||||||
|
}
|
||||||
|
if (randomBoolean()) {
|
||||||
|
docWriteRequest.version(randomNonNegativeLong());
|
||||||
|
}
|
||||||
|
if (randomBoolean()) {
|
||||||
|
docWriteRequest.versionType(randomFrom(VersionType.values()));
|
||||||
|
}
|
||||||
|
bulkRequest.add(docWriteRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
Request request = Request.bulk(bulkRequest);
|
||||||
|
assertEquals("/_bulk", request.endpoint);
|
||||||
|
assertEquals(expectedParams, request.params);
|
||||||
|
assertEquals("POST", request.method);
|
||||||
|
|
||||||
|
byte[] content = new byte[(int) request.entity.getContentLength()];
|
||||||
|
try (InputStream inputStream = request.entity.getContent()) {
|
||||||
|
Streams.readFully(inputStream, content);
|
||||||
|
}
|
||||||
|
|
||||||
|
BulkRequest parsedBulkRequest = new BulkRequest();
|
||||||
|
parsedBulkRequest.add(content, 0, content.length, xContentType);
|
||||||
|
assertEquals(bulkRequest.numberOfActions(), parsedBulkRequest.numberOfActions());
|
||||||
|
|
||||||
|
for (int i = 0; i < bulkRequest.numberOfActions(); i++) {
|
||||||
|
DocWriteRequest<?> originalRequest = bulkRequest.requests().get(i);
|
||||||
|
DocWriteRequest<?> parsedRequest = parsedBulkRequest.requests().get(i);
|
||||||
|
|
||||||
|
assertEquals(originalRequest.opType(), parsedRequest.opType());
|
||||||
|
assertEquals(originalRequest.index(), parsedRequest.index());
|
||||||
|
assertEquals(originalRequest.type(), parsedRequest.type());
|
||||||
|
assertEquals(originalRequest.id(), parsedRequest.id());
|
||||||
|
assertEquals(originalRequest.routing(), parsedRequest.routing());
|
||||||
|
assertEquals(originalRequest.parent(), parsedRequest.parent());
|
||||||
|
assertEquals(originalRequest.version(), parsedRequest.version());
|
||||||
|
assertEquals(originalRequest.versionType(), parsedRequest.versionType());
|
||||||
|
|
||||||
|
DocWriteRequest.OpType opType = originalRequest.opType();
|
||||||
|
if (opType == DocWriteRequest.OpType.INDEX) {
|
||||||
|
IndexRequest indexRequest = (IndexRequest) originalRequest;
|
||||||
|
IndexRequest parsedIndexRequest = (IndexRequest) parsedRequest;
|
||||||
|
|
||||||
|
assertEquals(indexRequest.getPipeline(), parsedIndexRequest.getPipeline());
|
||||||
|
assertToXContentEquivalent(indexRequest.source(), parsedIndexRequest.source(), xContentType);
|
||||||
|
} else if (opType == DocWriteRequest.OpType.UPDATE) {
|
||||||
|
UpdateRequest updateRequest = (UpdateRequest) originalRequest;
|
||||||
|
UpdateRequest parsedUpdateRequest = (UpdateRequest) parsedRequest;
|
||||||
|
|
||||||
|
assertEquals(updateRequest.retryOnConflict(), parsedUpdateRequest.retryOnConflict());
|
||||||
|
assertEquals(updateRequest.fetchSource(), parsedUpdateRequest.fetchSource());
|
||||||
|
if (updateRequest.doc() != null) {
|
||||||
|
assertToXContentEquivalent(updateRequest.doc().source(), parsedUpdateRequest.doc().source(), xContentType);
|
||||||
|
} else {
|
||||||
|
assertNull(parsedUpdateRequest.doc());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testBulkWithDifferentContentTypes() throws IOException {
|
||||||
|
{
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
bulkRequest.add(new DeleteRequest("index", "type", "0"));
|
||||||
|
bulkRequest.add(new UpdateRequest("index", "type", "1").script(new Script("test")));
|
||||||
|
bulkRequest.add(new DeleteRequest("index", "type", "2"));
|
||||||
|
|
||||||
|
Request request = Request.bulk(bulkRequest);
|
||||||
|
assertEquals(XContentType.JSON.mediaType(), request.entity.getContentType().getValue());
|
||||||
|
}
|
||||||
|
{
|
||||||
|
XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE);
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
bulkRequest.add(new DeleteRequest("index", "type", "0"));
|
||||||
|
bulkRequest.add(new IndexRequest("index", "type", "0").source(singletonMap("field", "value"), xContentType));
|
||||||
|
bulkRequest.add(new DeleteRequest("index", "type", "2"));
|
||||||
|
|
||||||
|
Request request = Request.bulk(bulkRequest);
|
||||||
|
assertEquals(xContentType.mediaType(), request.entity.getContentType().getValue());
|
||||||
|
}
|
||||||
|
{
|
||||||
|
XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE);
|
||||||
|
UpdateRequest updateRequest = new UpdateRequest("index", "type", "0");
|
||||||
|
if (randomBoolean()) {
|
||||||
|
updateRequest.doc(new IndexRequest().source(singletonMap("field", "value"), xContentType));
|
||||||
|
} else {
|
||||||
|
updateRequest.upsert(new IndexRequest().source(singletonMap("field", "value"), xContentType));
|
||||||
|
}
|
||||||
|
|
||||||
|
Request request = Request.bulk(new BulkRequest().add(updateRequest));
|
||||||
|
assertEquals(xContentType.mediaType(), request.entity.getContentType().getValue());
|
||||||
|
}
|
||||||
|
{
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
bulkRequest.add(new IndexRequest("index", "type", "0").source(singletonMap("field", "value"), XContentType.SMILE));
|
||||||
|
bulkRequest.add(new IndexRequest("index", "type", "1").source(singletonMap("field", "value"), XContentType.JSON));
|
||||||
|
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> Request.bulk(bulkRequest));
|
||||||
|
assertEquals("Mismatching content-type found for request with content-type [JSON], " +
|
||||||
|
"previous requests have content-type [SMILE]", exception.getMessage());
|
||||||
|
}
|
||||||
|
{
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
bulkRequest.add(new IndexRequest("index", "type", "0")
|
||||||
|
.source(singletonMap("field", "value"), XContentType.JSON));
|
||||||
|
bulkRequest.add(new IndexRequest("index", "type", "1")
|
||||||
|
.source(singletonMap("field", "value"), XContentType.JSON));
|
||||||
|
bulkRequest.add(new UpdateRequest("index", "type", "2")
|
||||||
|
.doc(new IndexRequest().source(singletonMap("field", "value"), XContentType.JSON))
|
||||||
|
.upsert(new IndexRequest().source(singletonMap("field", "value"), XContentType.SMILE))
|
||||||
|
);
|
||||||
|
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> Request.bulk(bulkRequest));
|
||||||
|
assertEquals("Mismatching content-type found for request with content-type [SMILE], " +
|
||||||
|
"previous requests have content-type [JSON]", exception.getMessage());
|
||||||
|
}
|
||||||
|
{
|
||||||
|
XContentType xContentType = randomFrom(XContentType.CBOR, XContentType.YAML);
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
bulkRequest.add(new DeleteRequest("index", "type", "0"));
|
||||||
|
bulkRequest.add(new IndexRequest("index", "type", "1").source(singletonMap("field", "value"), XContentType.JSON));
|
||||||
|
bulkRequest.add(new DeleteRequest("index", "type", "2"));
|
||||||
|
bulkRequest.add(new DeleteRequest("index", "type", "3"));
|
||||||
|
bulkRequest.add(new IndexRequest("index", "type", "4").source(singletonMap("field", "value"), XContentType.JSON));
|
||||||
|
bulkRequest.add(new IndexRequest("index", "type", "1").source(singletonMap("field", "value"), xContentType));
|
||||||
|
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> Request.bulk(bulkRequest));
|
||||||
|
assertEquals("Unsupported content-type found for request with content-type [" + xContentType
|
||||||
|
+ "], only JSON and SMILE are supported", exception.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testParams() {
|
public void testParams() {
|
||||||
final int nbParams = randomIntBetween(0, 10);
|
final int nbParams = randomIntBetween(0, 10);
|
||||||
Request.Params params = Request.Params.builder();
|
Request.Params params = Request.Params.builder();
|
||||||
|
@ -404,6 +603,33 @@ public class RequestTests extends ESTestCase {
|
||||||
assertEquals("/a/b", Request.endpoint("a", "b"));
|
assertEquals("/a/b", Request.endpoint("a", "b"));
|
||||||
assertEquals("/a/b/_create", Request.endpoint("a", "b", "_create"));
|
assertEquals("/a/b/_create", Request.endpoint("a", "b", "_create"));
|
||||||
assertEquals("/a/b/c/_create", Request.endpoint("a", "b", "c", "_create"));
|
assertEquals("/a/b/c/_create", Request.endpoint("a", "b", "c", "_create"));
|
||||||
|
assertEquals("/a/_create", Request.endpoint("a", null, null, "_create"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testEnforceSameContentType() {
|
||||||
|
XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE);
|
||||||
|
IndexRequest indexRequest = new IndexRequest().source(singletonMap("field", "value"), xContentType);
|
||||||
|
assertEquals(xContentType, enforceSameContentType(indexRequest, null));
|
||||||
|
assertEquals(xContentType, enforceSameContentType(indexRequest, xContentType));
|
||||||
|
|
||||||
|
XContentType bulkContentType = randomBoolean() ? xContentType : null;
|
||||||
|
|
||||||
|
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () ->
|
||||||
|
enforceSameContentType(new IndexRequest().source(singletonMap("field", "value"), XContentType.CBOR), bulkContentType));
|
||||||
|
assertEquals("Unsupported content-type found for request with content-type [CBOR], only JSON and SMILE are supported",
|
||||||
|
exception.getMessage());
|
||||||
|
|
||||||
|
exception = expectThrows(IllegalArgumentException.class, () ->
|
||||||
|
enforceSameContentType(new IndexRequest().source(singletonMap("field", "value"), XContentType.YAML), bulkContentType));
|
||||||
|
assertEquals("Unsupported content-type found for request with content-type [YAML], only JSON and SMILE are supported",
|
||||||
|
exception.getMessage());
|
||||||
|
|
||||||
|
XContentType requestContentType = xContentType == XContentType.JSON ? XContentType.SMILE : XContentType.JSON;
|
||||||
|
|
||||||
|
exception = expectThrows(IllegalArgumentException.class, () ->
|
||||||
|
enforceSameContentType(new IndexRequest().source(singletonMap("field", "value"), requestContentType), xContentType));
|
||||||
|
assertEquals("Mismatching content-type found for request with content-type [" + requestContentType + "], "
|
||||||
|
+ "previous requests have content-type [" + xContentType + "]", exception.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue