Reject refresh usage in bulk items when using and fix NPE when no source

The REST bulk API rejects use of `refresh` at the item level. But the Java API lets the user setting it.

We need to have the same behavior and don't let think the user he can define `refresh` per bulk item.

Note that the user can still define `refresh` on the bulk itself.

Also a user can create with Java API an IndexRequest without any source which is causing a NPE when evaluating the bulk item size.

Closes #7361.
Closes #15120.
This commit is contained in:
David Pilato 2015-11-28 15:15:52 +01:00
parent 059a675aa5
commit 854099f1d5
2 changed files with 56 additions and 8 deletions

View File

@ -46,9 +46,10 @@ import java.util.List;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* A bulk request holds an ordered {@link IndexRequest}s and {@link DeleteRequest}s and allows to executes
* it in a single batch.
* A bulk request holds an ordered {@link IndexRequest}s, {@link DeleteRequest}s and {@link UpdateRequest}s
* and allows to executes it in a single batch.
*
* Note that we only support refresh on the bulk request not per item.
* @see org.elasticsearch.client.Client#bulk(BulkRequest)
*/
public class BulkRequest extends ActionRequest<BulkRequest> implements CompositeIndicesRequest {
@ -89,6 +90,12 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
return add(request, null);
}
/**
* Add a request to the current BulkRequest.
* @param request Request to add
* @param payload Optional payload
* @return the current bulk request
*/
public BulkRequest add(ActionRequest request, @Nullable Object payload) {
if (request instanceof IndexRequest) {
add((IndexRequest) request, payload);
@ -127,7 +134,8 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
BulkRequest internalAdd(IndexRequest request, @Nullable Object payload) {
requests.add(request);
addPayload(payload);
sizeInBytes += request.source().length() + REQUEST_OVERHEAD;
// lack of source is validated in validate() method
sizeInBytes += (request.source() != null ? request.source().length() : 0) + REQUEST_OVERHEAD;
return this;
}
@ -478,8 +486,14 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
if (requests.isEmpty()) {
validationException = addValidationError("no requests added", validationException);
}
for (int i = 0; i < requests.size(); i++) {
ActionRequestValidationException ex = requests.get(i).validate();
for (ActionRequest request : requests) {
// We first check if refresh has been set
if ((request instanceof DeleteRequest && ((DeleteRequest)request).refresh()) ||
(request instanceof UpdateRequest && ((UpdateRequest)request).refresh()) ||
(request instanceof IndexRequest && ((IndexRequest)request).refresh())) {
validationException = addValidationError("Refresh is not supported on an item request, set the refresh flag on the BulkRequest instead.", validationException);
}
ActionRequestValidationException ex = request.validate();
if (ex != null) {
if (validationException == null) {
validationException = new ActionRequestValidationException();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.bulk;
import org.apache.lucene.util.Constants;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
@ -36,9 +37,7 @@ import java.util.List;
import java.util.Map;
import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.*;
public class BulkRequestTests extends ESTestCase {
public void testSimpleBulk1() throws Exception {
@ -171,4 +170,39 @@ public class BulkRequestTests extends ESTestCase {
bulkRequest.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null);
assertThat(bulkRequest.numberOfActions(), equalTo(9));
}
// issue 7361
public void testBulkRequestWithRefresh() throws Exception {
BulkRequest bulkRequest = new BulkRequest();
// We force here a "id is missing" validation error
bulkRequest.add(new DeleteRequest("index", "type", null).refresh(true));
// We force here a "type is missing" validation error
bulkRequest.add(new DeleteRequest("index", null, "id"));
bulkRequest.add(new DeleteRequest("index", "type", "id").refresh(true));
bulkRequest.add(new UpdateRequest("index", "type", "id").doc("{}").refresh(true));
bulkRequest.add(new IndexRequest("index", "type", "id").source("{}").refresh(true));
ActionRequestValidationException validate = bulkRequest.validate();
assertThat(validate, notNullValue());
assertThat(validate.validationErrors(), not(empty()));
assertThat(validate.validationErrors(), contains(
"Refresh is not supported on an item request, set the refresh flag on the BulkRequest instead.",
"id is missing",
"type is missing",
"Refresh is not supported on an item request, set the refresh flag on the BulkRequest instead.",
"Refresh is not supported on an item request, set the refresh flag on the BulkRequest instead.",
"Refresh is not supported on an item request, set the refresh flag on the BulkRequest instead."));
}
// issue 15120
public void testBulkNoSource() throws Exception {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new UpdateRequest("index", "type", "id"));
bulkRequest.add(new IndexRequest("index", "type", "id"));
ActionRequestValidationException validate = bulkRequest.validate();
assertThat(validate, notNullValue());
assertThat(validate.validationErrors(), not(empty()));
assertThat(validate.validationErrors(), contains(
"script or doc is missing",
"source is missing"));
}
}