Don't emit deprecation warnings on calls to the monitoring bulk API. (#39805) (#39838)

The monitoring bulk API accepts the same format as the bulk API, yet its concept
of types is different from "mapping types" and the deprecation warning is only
emitted as a side-effect of this API reusing the parsing logic of bulk requests.

This commit extracts the parsing logic from `_bulk` into its own class with a
new flag that allows to configure whether usage of `_type` should emit a warning
or not. Support for payloads has been removed for simplicity since they were
unused.

@jakelandis has a separate change that removes this notion of type from the
monitoring bulk API that we are considering bringing to 8.0.
This commit is contained in:
Adrien Grand 2019-03-11 07:58:28 +01:00 committed by GitHub
parent 2bbef67770
commit b841de2e38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 473 additions and 337 deletions

View File

@ -76,7 +76,7 @@ public class RestNoopBulkAction extends BaseRestHandler {
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh")); bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(request.requiredContent(), defaultIndex, defaultType, defaultRouting, bulkRequest.add(request.requiredContent(), defaultIndex, defaultType, defaultRouting,
null, defaultPipeline, null, true, request.getXContentType()); null, defaultPipeline, true, request.getXContentType());
// short circuit the call to the transport layer // short circuit the call to the transport layer
return channel -> { return channel -> {

View File

@ -447,7 +447,7 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30))); .source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
} else { } else {
BytesArray data = bytesBulkRequest(localIndex, localType, i); BytesArray data = bytesBulkRequest(localIndex, localType, i);
processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON); processor.add(data, globalIndex, globalType, globalPipeline, XContentType.JSON);
if (localType != null) { if (localType != null) {
// If the payload contains types, parsing it into a bulk request results in a warning. // If the payload contains types, parsing it into a bulk request results in a warning.

View File

@ -301,11 +301,7 @@ public class BulkProcessor implements Closeable {
* Adds either a delete or an index request. * Adds either a delete or an index request.
*/ */
public BulkProcessor add(DocWriteRequest<?> request) { public BulkProcessor add(DocWriteRequest<?> request) {
return add(request, null); internalAdd(request);
}
public BulkProcessor add(DocWriteRequest<?> request, @Nullable Object payload) {
internalAdd(request, payload);
return this; return this;
} }
@ -319,9 +315,9 @@ public class BulkProcessor implements Closeable {
} }
} }
private synchronized void internalAdd(DocWriteRequest<?> request, @Nullable Object payload) { private synchronized void internalAdd(DocWriteRequest<?> request) {
ensureOpen(); ensureOpen();
bulkRequest.add(request, payload); bulkRequest.add(request);
executeIfNeeded(); executeIfNeeded();
} }
@ -330,16 +326,16 @@ public class BulkProcessor implements Closeable {
*/ */
public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
XContentType xContentType) throws Exception { XContentType xContentType) throws Exception {
return add(data, defaultIndex, defaultType, null, null, xContentType); return add(data, defaultIndex, defaultType, null, xContentType);
} }
/** /**
* Adds the data from the bytes to be processed by the bulk processor * Adds the data from the bytes to be processed by the bulk processor
*/ */
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultPipeline, @Nullable Object payload, @Nullable String defaultPipeline,
XContentType xContentType) throws Exception { XContentType xContentType) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, payload, true, xContentType); bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, true, xContentType);
executeIfNeeded(); executeIfNeeded();
return this; return this;
} }

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.bulk; package org.elasticsearch.action.bulk;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.CompositeIndicesRequest;
@ -31,28 +30,17 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -60,7 +48,6 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
/** /**
* A bulk request holds an ordered {@link IndexRequest}s, {@link DeleteRequest}s and {@link UpdateRequest}s * A bulk request holds an ordered {@link IndexRequest}s, {@link DeleteRequest}s and {@link UpdateRequest}s
@ -72,19 +59,6 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_T
public class BulkRequest extends ActionRequest implements CompositeIndicesRequest, WriteRequest<BulkRequest> { public class BulkRequest extends ActionRequest implements CompositeIndicesRequest, WriteRequest<BulkRequest> {
private static final int REQUEST_OVERHEAD = 50; private static final int REQUEST_OVERHEAD = 50;
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(BulkRequest.class));
private static final ParseField INDEX = new ParseField("_index");
private static final ParseField TYPE = new ParseField("_type");
private static final ParseField ID = new ParseField("_id");
private static final ParseField ROUTING = new ParseField("routing");
private static final ParseField OP_TYPE = new ParseField("op_type");
private static final ParseField VERSION = new ParseField("version");
private static final ParseField VERSION_TYPE = new ParseField("version_type");
private static final ParseField RETRY_ON_CONFLICT = new ParseField("retry_on_conflict");
private static final ParseField PIPELINE = new ParseField("pipeline");
private static final ParseField SOURCE = new ParseField("_source");
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
/** /**
* Requests that are part of this request. It is only possible to add things that are both {@link ActionRequest}s and * Requests that are part of this request. It is only possible to add things that are both {@link ActionRequest}s and
@ -93,7 +67,6 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
*/ */
final List<DocWriteRequest<?>> requests = new ArrayList<>(); final List<DocWriteRequest<?>> requests = new ArrayList<>();
private final Set<String> indices = new HashSet<>(); private final Set<String> indices = new HashSet<>();
List<Object> payloads = null;
protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT; protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
@ -131,23 +104,18 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
return this; return this;
} }
public BulkRequest add(DocWriteRequest<?> request) {
return add(request, null);
}
/** /**
* Add a request to the current BulkRequest. * Add a request to the current BulkRequest.
* @param request Request to add * @param request Request to add
* @param payload Optional payload
* @return the current bulk request * @return the current bulk request
*/ */
public BulkRequest add(DocWriteRequest<?> request, @Nullable Object payload) { public BulkRequest add(DocWriteRequest<?> request) {
if (request instanceof IndexRequest) { if (request instanceof IndexRequest) {
add((IndexRequest) request, payload); add((IndexRequest) request);
} else if (request instanceof DeleteRequest) { } else if (request instanceof DeleteRequest) {
add((DeleteRequest) request, payload); add((DeleteRequest) request);
} else if (request instanceof UpdateRequest) { } else if (request instanceof UpdateRequest) {
add((UpdateRequest) request, payload); add((UpdateRequest) request);
} else { } else {
throw new IllegalArgumentException("No support for request [" + request + "]"); throw new IllegalArgumentException("No support for request [" + request + "]");
} }
@ -170,19 +138,14 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
* (for example, if no id is provided, one will be generated, or usage of the create flag). * (for example, if no id is provided, one will be generated, or usage of the create flag).
*/ */
public BulkRequest add(IndexRequest request) { public BulkRequest add(IndexRequest request) {
return internalAdd(request, null); return internalAdd(request);
} }
public BulkRequest add(IndexRequest request, @Nullable Object payload) { BulkRequest internalAdd(IndexRequest request) {
return internalAdd(request, payload);
}
BulkRequest internalAdd(IndexRequest request, @Nullable Object payload) {
Objects.requireNonNull(request, "'request' must not be null"); Objects.requireNonNull(request, "'request' must not be null");
applyGlobalMandatoryParameters(request); applyGlobalMandatoryParameters(request);
requests.add(request); requests.add(request);
addPayload(payload);
// lack of source is validated in validate() method // lack of source is validated in validate() method
sizeInBytes += (request.source() != null ? request.source().length() : 0) + REQUEST_OVERHEAD; sizeInBytes += (request.source() != null ? request.source().length() : 0) + REQUEST_OVERHEAD;
indices.add(request.index()); indices.add(request.index());
@ -193,19 +156,14 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
* Adds an {@link UpdateRequest} to the list of actions to execute. * Adds an {@link UpdateRequest} to the list of actions to execute.
*/ */
public BulkRequest add(UpdateRequest request) { public BulkRequest add(UpdateRequest request) {
return internalAdd(request, null); return internalAdd(request);
} }
public BulkRequest add(UpdateRequest request, @Nullable Object payload) { BulkRequest internalAdd(UpdateRequest request) {
return internalAdd(request, payload);
}
BulkRequest internalAdd(UpdateRequest request, @Nullable Object payload) {
Objects.requireNonNull(request, "'request' must not be null"); Objects.requireNonNull(request, "'request' must not be null");
applyGlobalMandatoryParameters(request); applyGlobalMandatoryParameters(request);
requests.add(request); requests.add(request);
addPayload(payload);
if (request.doc() != null) { if (request.doc() != null) {
sizeInBytes += request.doc().source().length(); sizeInBytes += request.doc().source().length();
} }
@ -223,34 +181,15 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
* Adds an {@link DeleteRequest} to the list of actions to execute. * Adds an {@link DeleteRequest} to the list of actions to execute.
*/ */
public BulkRequest add(DeleteRequest request) { public BulkRequest add(DeleteRequest request) {
return add(request, null);
}
public BulkRequest add(DeleteRequest request, @Nullable Object payload) {
Objects.requireNonNull(request, "'request' must not be null"); Objects.requireNonNull(request, "'request' must not be null");
applyGlobalMandatoryParameters(request); applyGlobalMandatoryParameters(request);
requests.add(request); requests.add(request);
addPayload(payload);
sizeInBytes += REQUEST_OVERHEAD; sizeInBytes += REQUEST_OVERHEAD;
indices.add(request.index()); indices.add(request.index());
return this; return this;
} }
private void addPayload(Object payload) {
if (payloads == null) {
if (payload == null) {
return;
}
payloads = new ArrayList<>(requests.size() + 10);
// add requests#size-1 elements to the payloads if it null (we add for an *existing* request)
for (int i = 1; i < requests.size(); i++) {
payloads.add(null);
}
}
payloads.add(payload);
}
/** /**
* The list of requests in this bulk request. * The list of requests in this bulk request.
*/ */
@ -258,17 +197,6 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
return this.requests; return this.requests;
} }
/**
* The list of optional payloads associated with requests in the same order as the requests. Note, elements within
* it might be null if no payload has been provided.
* <p>
* Note, if no payloads have been provided, this method will return null (as to conserve memory overhead).
*/
@Nullable
public List<Object> payloads() {
return this.payloads;
}
/** /**
* The number of actions in the bulk request. * The number of actions in the bulk request.
*/ */
@ -316,7 +244,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
@Deprecated @Deprecated
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
XContentType xContentType) throws IOException { XContentType xContentType) throws IOException {
return add(data, defaultIndex, defaultType, null, null, null, null, true, xContentType); return add(data, defaultIndex, defaultType, null, null, null, true, xContentType);
} }
/** /**
@ -324,7 +252,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
*/ */
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, public BulkRequest add(BytesReference data, @Nullable String defaultIndex,
XContentType xContentType) throws IOException { XContentType xContentType) throws IOException {
return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, null, true, xContentType); return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, true, xContentType);
} }
/** /**
@ -334,7 +262,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
@Deprecated @Deprecated
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, boolean allowExplicitIndex, public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, boolean allowExplicitIndex,
XContentType xContentType) throws IOException { XContentType xContentType) throws IOException {
return add(data, defaultIndex, defaultType, null, null, null, null, allowExplicitIndex, xContentType); return add(data, defaultIndex, defaultType, null, null, null, allowExplicitIndex, xContentType);
} }
/** /**
@ -342,209 +270,32 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
*/ */
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, boolean allowExplicitIndex, public BulkRequest add(BytesReference data, @Nullable String defaultIndex, boolean allowExplicitIndex,
XContentType xContentType) throws IOException { XContentType xContentType) throws IOException {
return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, null, allowExplicitIndex, xContentType); return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, allowExplicitIndex, xContentType);
} }
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, public BulkRequest add(BytesReference data, @Nullable String defaultIndex,
@Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext, @Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline, @Nullable Object payload, boolean allowExplicitIndex, @Nullable String defaultPipeline, boolean allowExplicitIndex,
XContentType xContentType) throws IOException { XContentType xContentType) throws IOException {
return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, defaultRouting, defaultFetchSourceContext, return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, defaultRouting, defaultFetchSourceContext,
defaultPipeline, payload, allowExplicitIndex, xContentType); defaultPipeline, allowExplicitIndex, xContentType);
} }
/** /**
* @deprecated use {@link #add(BytesReference, String, String, FetchSourceContext, String, Object, boolean, XContentType)} instead * @deprecated use {@link #add(BytesReference, String, String, FetchSourceContext, String, boolean, XContentType)} instead
*/ */
@Deprecated @Deprecated
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext, @Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline, @Nullable Object payload, boolean allowExplicitIndex, @Nullable String defaultPipeline, boolean allowExplicitIndex,
XContentType xContentType) throws IOException { XContentType xContentType) throws IOException {
XContent xContent = xContentType.xContent();
int line = 0;
int from = 0;
byte marker = xContent.streamSeparator();
boolean typesDeprecationLogged = false;
while (true) {
int nextMarker = findNextMarker(marker, from, data);
if (nextMarker == -1) {
break;
}
line++;
// now parse the action
// EMPTY is safe here because we never call namedObject
try (InputStream stream = data.slice(from, nextMarker - from).streamInput();
XContentParser parser = xContent
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
// move pointers
from = nextMarker + 1;
// Move to START_OBJECT
XContentParser.Token token = parser.nextToken();
if (token == null) {
continue;
}
if (token != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected "
+ XContentParser.Token.START_OBJECT + " but found [" + token + "]");
}
// Move to FIELD_NAME, that's the action
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected "
+ XContentParser.Token.FIELD_NAME + " but found [" + token + "]");
}
String action = parser.currentName();
String index = defaultIndex;
String type = defaultType;
String id = null;
String routing = valueOrDefault(defaultRouting, globalRouting); String routing = valueOrDefault(defaultRouting, globalRouting);
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
String opType = null;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
int retryOnConflict = 0;
String pipeline = valueOrDefault(defaultPipeline, globalPipeline); String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
new BulkRequestParser(true).parse(data, defaultIndex, defaultType, routing, defaultFetchSourceContext, pipeline,
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) allowExplicitIndex, xContentType, this::internalAdd, this::internalAdd, this::add);
// or START_OBJECT which will have another set of parameters
token = parser.nextToken();
if (token == XContentParser.Token.START_OBJECT) {
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (INDEX.match(currentFieldName, parser.getDeprecationHandler())){
if (!allowExplicitIndex) {
throw new IllegalArgumentException("explicit index in bulk is not allowed");
}
index = parser.text();
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
if (typesDeprecationLogged == false) {
deprecationLogger.deprecatedAndMaybeLog("bulk_with_types", RestBulkAction.TYPES_DEPRECATION_MESSAGE);
typesDeprecationLogged = true;
}
type = parser.text();
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
id = parser.text();
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
routing = parser.text();
} else if (OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
opType = parser.text();
} else if (VERSION.match(currentFieldName, parser.getDeprecationHandler())) {
version = parser.longValue();
} else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
versionType = VersionType.fromString(parser.text());
} else if (IF_SEQ_NO.match(currentFieldName, parser.getDeprecationHandler())) {
ifSeqNo = parser.longValue();
} else if (IF_PRIMARY_TERM.match(currentFieldName, parser.getDeprecationHandler())) {
ifPrimaryTerm = parser.longValue();
} else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) {
retryOnConflict = parser.intValue();
} else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
pipeline = parser.text();
} else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else {
throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter ["
+ currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_ARRAY) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line +
"], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]");
} else if (token == XContentParser.Token.START_OBJECT && SOURCE.match(currentFieldName,
parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (token != XContentParser.Token.VALUE_NULL) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line
+ "], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]");
}
}
} else if (token != XContentParser.Token.END_OBJECT) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected "
+ XContentParser.Token.START_OBJECT + " or " + XContentParser.Token.END_OBJECT + " but found [" + token + "]");
}
if ("delete".equals(action)) {
add(new DeleteRequest(index, type, id).routing(routing)
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm), payload);
} else {
nextMarker = findNextMarker(marker, from, data);
if (nextMarker == -1) {
break;
}
line++;
// we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks
// of index request.
if ("index".equals(action)) {
if (opType == null) {
internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType)
.setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType), payload);
} else {
internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType)
.create("create".equals(opType)).setPipeline(pipeline)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
}
} else if ("create".equals(action)) {
internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType)
.create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
throw new IllegalArgumentException("Update requests do not support versioning. " +
"Please use `if_seq_no` and `if_primary_term` instead");
}
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.routing(routing);
// EMPTY is safe here because we never call namedObject
try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput();
XContentParser sliceParser = xContent.createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, dataStream)) {
updateRequest.fromXContent(sliceParser);
}
if (fetchSourceContext != null) {
updateRequest.fetchSource(fetchSourceContext);
}
IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {
upsertRequest.setPipeline(defaultPipeline);
}
internalAdd(updateRequest, payload);
}
// move pointers
from = nextMarker + 1;
}
}
}
return this; return this;
} }
/**
* Returns the sliced {@link BytesReference}. If the {@link XContentType} is JSON, the byte preceding the marker is checked to see
* if it is a carriage return and if so, the BytesReference is sliced so that the carriage return is ignored
*/
private BytesReference sliceTrimmingCarriageReturn(BytesReference bytesReference, int from, int nextMarker, XContentType xContentType) {
final int length;
if (XContentType.JSON == xContentType && bytesReference.get(nextMarker - 1) == (byte) '\r') {
length = nextMarker - from - 1;
} else {
length = nextMarker - from;
}
return bytesReference.slice(from, length);
}
/** /**
* Sets the number of shard copies that must be active before proceeding with the write. * Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
@ -614,18 +365,6 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
return globalRouting; return globalRouting;
} }
private static int findNextMarker(byte marker, int from, BytesReference data) {
final int res = data.indexOf(marker, from);
if (res != -1) {
assert res >= 0;
return res;
}
if (from != data.length()) {
throw new IllegalArgumentException("The bulk request must be terminated by a newline [\n]");
}
return res;
}
@Override @Override
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null; ActionRequestValidationException validationException = null;

View File

@ -0,0 +1,309 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.Consumer;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
/**
* Helper to parse bulk requests. This should be considered an internal class.
*/
public final class BulkRequestParser {
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(BulkRequestParser.class));
private static final ParseField INDEX = new ParseField("_index");
private static final ParseField TYPE = new ParseField("_type");
private static final ParseField ID = new ParseField("_id");
private static final ParseField ROUTING = new ParseField("routing");
private static final ParseField OP_TYPE = new ParseField("op_type");
private static final ParseField VERSION = new ParseField("version");
private static final ParseField VERSION_TYPE = new ParseField("version_type");
private static final ParseField RETRY_ON_CONFLICT = new ParseField("retry_on_conflict");
private static final ParseField PIPELINE = new ParseField("pipeline");
private static final ParseField SOURCE = new ParseField("_source");
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
private final boolean warnOnTypeUsage;
/**
* Create a new parser.
* @param warnOnTypeUsage whether it warns upon types being explicitly specified
*/
public BulkRequestParser(boolean warnOnTypeUsage) {
this.warnOnTypeUsage = warnOnTypeUsage;
}
private static int findNextMarker(byte marker, int from, BytesReference data) {
final int res = data.indexOf(marker, from);
if (res != -1) {
assert res >= 0;
return res;
}
if (from != data.length()) {
throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]");
}
return res;
}
/**
* Returns the sliced {@link BytesReference}. If the {@link XContentType} is JSON, the byte preceding the marker is checked to see
* if it is a carriage return and if so, the BytesReference is sliced so that the carriage return is ignored
*/
private static BytesReference sliceTrimmingCarriageReturn(BytesReference bytesReference, int from, int nextMarker,
XContentType xContentType) {
final int length;
if (XContentType.JSON == xContentType && bytesReference.get(nextMarker - 1) == (byte) '\r') {
length = nextMarker - from - 1;
} else {
length = nextMarker - from;
}
return bytesReference.slice(from, length);
}
/**
* Parse the provided {@code data} assuming the provided default values. Index requests
* will be passed to the {@code indexRequestConsumer}, update requests to the
* {@code updateRequestConsumer} and delete requests to the {@code deleteRequestConsumer}.
*/
public void parse(
BytesReference data, @Nullable String defaultIndex,
@Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline, boolean allowExplicitIndex,
XContentType xContentType,
Consumer<IndexRequest> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer) throws IOException {
parse(data, defaultIndex, null, defaultRouting, defaultFetchSourceContext, defaultPipeline, allowExplicitIndex, xContentType,
indexRequestConsumer, updateRequestConsumer, deleteRequestConsumer);
}
/**
* Parse the provided {@code data} assuming the provided default values. Index requests
* will be passed to the {@code indexRequestConsumer}, update requests to the
* {@code updateRequestConsumer} and delete requests to the {@code deleteRequestConsumer}.
* @deprecated Use {@link #parse(BytesReference, String, String, FetchSourceContext, String, boolean, XContentType,
* Consumer, Consumer, Consumer)} instead.
*/
@Deprecated
public void parse(
BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline, boolean allowExplicitIndex,
XContentType xContentType,
Consumer<IndexRequest> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer) throws IOException {
XContent xContent = xContentType.xContent();
int line = 0;
int from = 0;
byte marker = xContent.streamSeparator();
boolean typesDeprecationLogged = false;
while (true) {
int nextMarker = findNextMarker(marker, from, data);
if (nextMarker == -1) {
break;
}
line++;
// now parse the action
// EMPTY is safe here because we never call namedObject
try (InputStream stream = data.slice(from, nextMarker - from).streamInput();
XContentParser parser = xContent
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
// move pointers
from = nextMarker + 1;
// Move to START_OBJECT
XContentParser.Token token = parser.nextToken();
if (token == null) {
continue;
}
if (token != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected "
+ XContentParser.Token.START_OBJECT + " but found [" + token + "]");
}
// Move to FIELD_NAME, that's the action
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected "
+ XContentParser.Token.FIELD_NAME + " but found [" + token + "]");
}
String action = parser.currentName();
String index = defaultIndex;
String type = defaultType;
String id = null;
String routing = defaultRouting;
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
String opType = null;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
int retryOnConflict = 0;
String pipeline = defaultPipeline;
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
token = parser.nextToken();
if (token == XContentParser.Token.START_OBJECT) {
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (INDEX.match(currentFieldName, parser.getDeprecationHandler())){
if (!allowExplicitIndex) {
throw new IllegalArgumentException("explicit index in bulk is not allowed");
}
index = parser.text();
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
if (warnOnTypeUsage && typesDeprecationLogged == false) {
deprecationLogger.deprecatedAndMaybeLog("bulk_with_types", RestBulkAction.TYPES_DEPRECATION_MESSAGE);
typesDeprecationLogged = true;
}
type = parser.text();
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
id = parser.text();
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
routing = parser.text();
} else if (OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
opType = parser.text();
} else if (VERSION.match(currentFieldName, parser.getDeprecationHandler())) {
version = parser.longValue();
} else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
versionType = VersionType.fromString(parser.text());
} else if (IF_SEQ_NO.match(currentFieldName, parser.getDeprecationHandler())) {
ifSeqNo = parser.longValue();
} else if (IF_PRIMARY_TERM.match(currentFieldName, parser.getDeprecationHandler())) {
ifPrimaryTerm = parser.longValue();
} else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) {
retryOnConflict = parser.intValue();
} else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
pipeline = parser.text();
} else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else {
throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter ["
+ currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_ARRAY) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line +
"], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]");
} else if (token == XContentParser.Token.START_OBJECT && SOURCE.match(currentFieldName,
parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (token != XContentParser.Token.VALUE_NULL) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line
+ "], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]");
}
}
} else if (token != XContentParser.Token.END_OBJECT) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected "
+ XContentParser.Token.START_OBJECT + " or " + XContentParser.Token.END_OBJECT + " but found [" + token + "]");
}
if ("delete".equals(action)) {
deleteRequestConsumer.accept(new DeleteRequest(index, type, id).routing(routing)
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
} else {
nextMarker = findNextMarker(marker, from, data);
if (nextMarker == -1) {
break;
}
line++;
// we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks
// of index request.
if ("index".equals(action)) {
if (opType == null) {
indexRequestConsumer.accept(new IndexRequest(index, type, id).routing(routing)
.version(version).versionType(versionType)
.setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType));
} else {
indexRequestConsumer.accept(new IndexRequest(index, type, id).routing(routing)
.version(version).versionType(versionType)
.create("create".equals(opType)).setPipeline(pipeline)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType));
}
} else if ("create".equals(action)) {
indexRequestConsumer.accept(new IndexRequest(index, type, id).routing(routing)
.version(version).versionType(versionType)
.create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType));
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
throw new IllegalArgumentException("Update requests do not support versioning. " +
"Please use `if_seq_no` and `if_primary_term` instead");
}
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.routing(routing);
// EMPTY is safe here because we never call namedObject
try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput();
XContentParser sliceParser = xContent.createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, dataStream)) {
updateRequest.fromXContent(sliceParser);
}
if (fetchSourceContext != null) {
updateRequest.fetchSource(fetchSourceContext);
}
IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {
upsertRequest.setPipeline(defaultPipeline);
}
updateRequestConsumer.accept(updateRequest);
}
// move pointers
from = nextMarker + 1;
}
}
}
}
}

View File

@ -95,7 +95,7 @@ public class RestBulkAction extends BaseRestHandler {
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh")); bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(request.requiredContent(), defaultIndex, defaultType, defaultRouting, bulkRequest.add(request.requiredContent(), defaultIndex, defaultType, defaultRouting,
defaultFetchSourceContext, defaultPipeline, null, allowExplicitIndex, request.getXContentType()); defaultFetchSourceContext, defaultPipeline, allowExplicitIndex, request.getXContentType());
return channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel)); return channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel));
} }

View File

@ -0,0 +1,114 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
public class BulkRequestParserTests extends ESTestCase {
public void testIndexRequest() throws IOException {
BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON,
indexRequest -> {
assertFalse(parsed.get());
assertEquals("foo", indexRequest.index());
assertEquals("bar", indexRequest.id());
parsed.set(true);
},
req -> fail(), req -> fail());
assertTrue(parsed.get());
}
public void testDeleteRequest() throws IOException {
BytesArray request = new BytesArray("{ \"delete\":{ \"_id\": \"bar\" } }\n");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON,
req -> fail(), req -> fail(),
deleteRequest -> {
assertFalse(parsed.get());
assertEquals("foo", deleteRequest.index());
assertEquals("bar", deleteRequest.id());
parsed.set(true);
});
assertTrue(parsed.get());
}
public void testUpdateRequest() throws IOException {
BytesArray request = new BytesArray("{ \"update\":{ \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON,
req -> fail(),
updateRequest -> {
assertFalse(parsed.get());
assertEquals("foo", updateRequest.index());
assertEquals("bar", updateRequest.id());
parsed.set(true);
},
req -> fail());
assertTrue(parsed.get());
}
public void testBarfOnLackOfTrailingNewline() throws IOException {
BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON,
indexRequest -> fail(), req -> fail(), req -> fail()));
assertEquals("The bulk request must be terminated by a newline [\\n]", e.getMessage());
}
public void testFailOnExplicitIndex() throws IOException {
BytesArray request = new BytesArray("{ \"index\":{ \"_index\": \"foo\", \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> parser.parse(request, null, null, null, null, null, false, XContentType.JSON,
req -> fail(), req -> fail(), req -> fail()));
assertEquals("explicit index in bulk is not allowed", ex.getMessage());
}
public void testTypeWarning() throws IOException {
BytesArray request = new BytesArray("{ \"index\":{ \"_type\": \"quux\", \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(true);
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON,
indexRequest -> {
assertFalse(parsed.get());
assertEquals("foo", indexRequest.index());
assertEquals("bar", indexRequest.id());
parsed.set(true);
},
req -> fail(), req -> fail());
assertTrue(parsed.get());
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}
}

View File

@ -352,7 +352,7 @@ public class BulkRequestTests extends ESTestCase {
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk11.json"); String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk11.json");
IllegalArgumentException expectThrows = expectThrows(IllegalArgumentException.class, () -> new BulkRequest() IllegalArgumentException expectThrows = expectThrows(IllegalArgumentException.class, () -> new BulkRequest()
.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, XContentType.JSON)); .add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, XContentType.JSON));
assertEquals("The bulk request must be terminated by a newline [\n]", expectThrows.getMessage()); assertEquals("The bulk request must be terminated by a newline [\\n]", expectThrows.getMessage());
String bulkActionWithNewLine = bulkAction + "\n"; String bulkActionWithNewLine = bulkAction + "\n";
BulkRequest bulkRequestWithNewLine = new BulkRequest(); BulkRequest bulkRequestWithNewLine = new BulkRequest();

View File

@ -7,10 +7,7 @@ package org.elasticsearch.xpack.core.monitoring.action;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequestParser;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -75,20 +72,14 @@ public class MonitoringBulkRequest extends ActionRequest {
final long timestamp, final long timestamp,
final long intervalMillis) throws IOException { final long intervalMillis) throws IOException {
// MonitoringBulkRequest accepts a body request that has the same format as the BulkRequest: // MonitoringBulkRequest accepts a body request that has the same format as the BulkRequest
// instead of duplicating the parsing logic here we use a new BulkRequest instance to parse the content. new BulkRequestParser(false).parse(content, null, defaultType, null, null, null, true, xContentType,
final BulkRequest bulkRequest = Requests.bulkRequest().add(content, null, defaultType, xContentType); indexRequest -> {
for (DocWriteRequest request : bulkRequest.requests()) {
if (request instanceof IndexRequest) {
final IndexRequest indexRequest = (IndexRequest) request;
// we no longer accept non-timestamped indexes from Kibana, LS, or Beats because we do not use the data // we no longer accept non-timestamped indexes from Kibana, LS, or Beats because we do not use the data
// and it was duplicated anyway; by simply dropping it, we allow BWC for older clients that still send it // and it was duplicated anyway; by simply dropping it, we allow BWC for older clients that still send it
if (MonitoringIndex.from(indexRequest.index()) != MonitoringIndex.TIMESTAMPED) { if (MonitoringIndex.from(indexRequest.index()) != MonitoringIndex.TIMESTAMPED) {
continue; return;
} }
final BytesReference source = indexRequest.source(); final BytesReference source = indexRequest.source();
if (source.length() == 0) { if (source.length() == 0) {
throw new IllegalArgumentException("source is missing for monitoring document [" throw new IllegalArgumentException("source is missing for monitoring document ["
@ -96,11 +87,12 @@ public class MonitoringBulkRequest extends ActionRequest {
} }
// builds a new monitoring document based on the index request // builds a new monitoring document based on the index request
add(new MonitoringBulkDoc(system, indexRequest.type(), indexRequest.id(), timestamp, intervalMillis, source, xContentType)); add(new MonitoringBulkDoc(system, indexRequest.type(), indexRequest.id(), timestamp, intervalMillis, source,
} else { xContentType));
throw new IllegalArgumentException("monitoring bulk requests should only contain index requests"); },
} updateRequest -> { throw new IllegalArgumentException("monitoring bulk requests should only contain index requests"); },
} deleteRequest -> { throw new IllegalArgumentException("monitoring bulk requests should only contain index requests"); });
return this; return this;
} }

View File

@ -14,7 +14,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.RandomObjects; import org.elasticsearch.test.RandomObjects;
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
@ -142,8 +141,6 @@ public class MonitoringBulkRequestTests extends ESTestCase {
assertThat(bulkDoc.getXContentType(), equalTo(xContentType)); assertThat(bulkDoc.getXContentType(), equalTo(xContentType));
++count; ++count;
} }
//This test's JSON contains outdated references to types
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
} }
public void testAddRequestContentWithEmptySource() throws IOException { public void testAddRequestContentWithEmptySource() throws IOException {
@ -191,8 +188,6 @@ public class MonitoringBulkRequestTests extends ESTestCase {
); );
assertThat(e.getMessage(), containsString("source is missing for monitoring document [][doc][" + nbDocs + "]")); assertThat(e.getMessage(), containsString("source is missing for monitoring document [][doc][" + nbDocs + "]"));
//This test's JSON contains outdated references to types
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
} }
public void testAddRequestContentWithUnrecognizedIndexName() throws IOException { public void testAddRequestContentWithUnrecognizedIndexName() throws IOException {
@ -230,8 +225,6 @@ public class MonitoringBulkRequestTests extends ESTestCase {
); );
assertThat(e.getMessage(), containsString("unrecognized index name [" + indexName + "]")); assertThat(e.getMessage(), containsString("unrecognized index name [" + indexName + "]"));
//This test's JSON contains outdated references to types
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
} }
public void testSerialization() throws IOException { public void testSerialization() throws IOException {

View File

@ -23,7 +23,6 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener; import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.xpack.core.XPackClient; import org.elasticsearch.xpack.core.XPackClient;
@ -122,8 +121,6 @@ public class RestMonitoringBulkActionTests extends ESTestCase {
assertThat(restResponse.status(), is(RestStatus.OK)); assertThat(restResponse.status(), is(RestStatus.OK));
assertThat(restResponse.content().utf8ToString(), assertThat(restResponse.content().utf8ToString(),
is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":false,\"errors\":false}")); is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":false,\"errors\":false}"));
//This test's JSON contains outdated references to types
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
} }
public void testNoErrorsButIgnored() throws Exception { public void testNoErrorsButIgnored() throws Exception {
@ -134,8 +131,6 @@ public class RestMonitoringBulkActionTests extends ESTestCase {
assertThat(restResponse.status(), is(RestStatus.OK)); assertThat(restResponse.status(), is(RestStatus.OK));
assertThat(restResponse.content().utf8ToString(), assertThat(restResponse.content().utf8ToString(),
is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":true,\"errors\":false}")); is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":true,\"errors\":false}"));
//This test's JSON contains outdated references to types
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
} }
public void testWithErrors() throws Exception { public void testWithErrors() throws Exception {
@ -155,8 +150,6 @@ public class RestMonitoringBulkActionTests extends ESTestCase {
assertThat(restResponse.status(), is(RestStatus.INTERNAL_SERVER_ERROR)); assertThat(restResponse.status(), is(RestStatus.INTERNAL_SERVER_ERROR));
assertThat(restResponse.content().utf8ToString(), assertThat(restResponse.content().utf8ToString(),
is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":false,\"errors\":true,\"error\":" + errorJson + "}")); is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":false,\"errors\":true,\"error\":" + errorJson + "}"));
//This test's JSON contains outdated references to types
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
} }
/** /**