REST high-level client: add simulate pipeline API (#31158)

relates to #27205
This commit is contained in:
Sohaib Iftikhar 2018-06-22 09:59:04 +02:00 committed by Luca Cavanna
parent 0352d88621
commit eade161894
21 changed files with 1182 additions and 129 deletions

View File

@ -24,6 +24,8 @@ import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import java.io.IOException;
@ -125,4 +127,37 @@ public final class IngestClient {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::deletePipeline, options,
WritePipelineResponse::fromXContent, listener, emptySet());
}
/**
* Simulate a pipeline on a set of documents provided in the request
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/simulate-pipeline-api.html">
* Simulate Pipeline API on elastic.co</a>
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public SimulatePipelineResponse simulatePipeline(SimulatePipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::simulatePipeline, options,
SimulatePipelineResponse::fromXContent, emptySet());
}
/**
* Asynchronously simulate a pipeline on a set of documents provided in the request
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/simulate-pipeline-api.html">
* Simulate Pipeline API on elastic.co</a>
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void simulatePipelineAsync(SimulatePipelineRequest request,
RequestOptions options,
ActionListener<SimulatePipelineResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::simulatePipeline, options,
SimulatePipelineResponse::fromXContent, listener, emptySet());
}
}

View File

@ -71,6 +71,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
@ -886,6 +887,20 @@ final class RequestConverters {
return request;
}
static Request simulatePipeline(SimulatePipelineRequest simulatePipelineRequest) throws IOException {
EndpointBuilder builder = new EndpointBuilder().addPathPartAsIs("_ingest/pipeline");
if (simulatePipelineRequest.getId() != null && !simulatePipelineRequest.getId().isEmpty()) {
builder.addPathPart(simulatePipelineRequest.getId());
}
builder.addPathPartAsIs("_simulate");
String endpoint = builder.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params(request);
params.putParam("verbose", Boolean.toString(simulatePipelineRequest.isVerbose()));
request.setEntity(createEntity(simulatePipelineRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
static Request getAlias(GetAliasesRequest getAliasesRequest) {
String[] indices = getAliasesRequest.indices() == null ? Strings.EMPTY_ARRAY : getAliasesRequest.indices();
String[] aliases = getAliasesRequest.aliases() == null ? Strings.EMPTY_ARRAY : getAliasesRequest.aliases();

View File

@ -85,9 +85,7 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
}
}
protected static XContentBuilder buildRandomXContentPipeline() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
protected static XContentBuilder buildRandomXContentPipeline(XContentBuilder pipelineBuilder) throws IOException {
pipelineBuilder.startObject();
{
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
@ -114,6 +112,12 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
return pipelineBuilder;
}
protected static XContentBuilder buildRandomXContentPipeline() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
return buildRandomXContentPipeline(pipelineBuilder);
}
protected static void createPipeline(String pipelineId) throws IOException {
XContentBuilder builder = buildRandomXContentPipeline();
createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType()));

View File

@ -23,12 +23,22 @@ import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
import org.elasticsearch.action.ingest.SimulateDocumentResult;
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.PipelineConfiguration;
import java.io.IOException;
import java.util.List;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
public class IngestClientIT extends ESRestHighLevelClientTestCase {
@ -80,4 +90,93 @@ public class IngestClientIT extends ESRestHighLevelClientTestCase {
execute(request, highLevelClient().ingest()::deletePipeline, highLevelClient().ingest()::deletePipelineAsync);
assertTrue(response.isAcknowledged());
}
public void testSimulatePipeline() throws IOException {
testSimulatePipeline(false, false);
}
public void testSimulatePipelineWithFailure() throws IOException {
testSimulatePipeline(false, true);
}
public void testSimulatePipelineVerbose() throws IOException {
testSimulatePipeline(true, false);
}
public void testSimulatePipelineVerboseWithFailure() throws IOException {
testSimulatePipeline(true, true);
}
private void testSimulatePipeline(boolean isVerbose,
boolean isFailure) throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder builder = XContentBuilder.builder(xContentType.xContent());
String rankValue = isFailure ? "non-int" : Integer.toString(1234);
builder.startObject();
{
builder.field("pipeline");
buildRandomXContentPipeline(builder);
builder.startArray("docs");
{
builder.startObject()
.field("_index", "index")
.field("_type", "doc")
.field("_id", "doc_" + 1)
.startObject("_source").field("foo", "rab_" + 1).field("rank", rankValue).endObject()
.endObject();
}
builder.endArray();
}
builder.endObject();
SimulatePipelineRequest request = new SimulatePipelineRequest(
BytesReference.bytes(builder),
builder.contentType()
);
request.setVerbose(isVerbose);
SimulatePipelineResponse response =
execute(request, highLevelClient().ingest()::simulatePipeline, highLevelClient().ingest()::simulatePipelineAsync);
List<SimulateDocumentResult> results = response.getResults();
assertEquals(1, results.size());
if (isVerbose) {
assertThat(results.get(0), instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult) results.get(0);
assertEquals(2, verboseResult.getProcessorResults().size());
if (isFailure) {
assertNotNull(verboseResult.getProcessorResults().get(1).getFailure());
assertThat(verboseResult.getProcessorResults().get(1).getFailure().getMessage(),
containsString("unable to convert [non-int] to integer"));
} else {
assertEquals(
verboseResult.getProcessorResults().get(0).getIngestDocument()
.getFieldValue("foo", String.class),
"bar"
);
assertEquals(
Integer.valueOf(1234),
verboseResult.getProcessorResults().get(1).getIngestDocument()
.getFieldValue("rank", Integer.class)
);
}
} else {
assertThat(results.get(0), instanceOf(SimulateDocumentBaseResult.class));
SimulateDocumentBaseResult baseResult = (SimulateDocumentBaseResult)results.get(0);
if (isFailure) {
assertNotNull(baseResult.getFailure());
assertThat(baseResult.getFailure().getMessage(),
containsString("unable to convert [non-int] to integer"));
} else {
assertNotNull(baseResult.getIngestDocument());
assertEquals(
baseResult.getIngestDocument().getFieldValue("foo", String.class),
"bar"
);
assertEquals(
Integer.valueOf(1234),
baseResult.getIngestDocument()
.getFieldValue("rank", Integer.class)
);
}
}
}
}

View File

@ -74,6 +74,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
@ -1534,6 +1535,34 @@ public class RequestConvertersTests extends ESTestCase {
assertEquals(expectedParams, expectedRequest.getParameters());
}
public void testSimulatePipeline() throws IOException {
String pipelineId = randomBoolean() ? "some_pipeline_id" : null;
boolean verbose = randomBoolean();
String json = "{\"pipeline\":{" +
"\"description\":\"_description\"," +
"\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]}," +
"\"docs\":[{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}]}";
SimulatePipelineRequest request = new SimulatePipelineRequest(
new BytesArray(json.getBytes(StandardCharsets.UTF_8)),
XContentType.JSON
);
request.setId(pipelineId);
request.setVerbose(verbose);
Map<String, String> expectedParams = new HashMap<>();
expectedParams.put("verbose", Boolean.toString(verbose));
Request expectedRequest = RequestConverters.simulatePipeline(request);
StringJoiner endpoint = new StringJoiner("/", "/", "");
endpoint.add("_ingest/pipeline");
if (pipelineId != null && !pipelineId.isEmpty())
endpoint.add(pipelineId);
endpoint.add("_simulate");
assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
assertEquals(HttpPost.METHOD_NAME, expectedRequest.getMethod());
assertEquals(expectedParams, expectedRequest.getParameters());
assertToXContentBody(request, expectedRequest.getEntity());
}
public void testClusterHealth() {
ClusterHealthRequest healthRequest = new ClusterHealthRequest();
Map<String, String> expectedParams = new HashMap<>();

View File

@ -25,6 +25,12 @@ import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
import org.elasticsearch.action.ingest.SimulateDocumentResult;
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RequestOptions;
@ -277,4 +283,109 @@ public class IngestClientDocumentationIT extends ESRestHighLevelClientTestCase {
}
}
public void testSimulatePipeline() throws IOException {
RestHighLevelClient client = highLevelClient();
{
// tag::simulate-pipeline-request
String source =
"{\"" +
"pipeline\":{" +
"\"description\":\"_description\"," +
"\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]" +
"}," +
"\"docs\":[" +
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"bar\"}}," +
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}" +
"]" +
"}";
SimulatePipelineRequest request = new SimulatePipelineRequest(
new BytesArray(source.getBytes(StandardCharsets.UTF_8)), // <1>
XContentType.JSON // <2>
);
// end::simulate-pipeline-request
// tag::simulate-pipeline-request-pipeline-id
request.setId("my-pipeline-id"); // <1>
// end::simulate-pipeline-request-pipeline-id
// For testing we set this back to null
request.setId(null);
// tag::simulate-pipeline-request-verbose
request.setVerbose(true); // <1>
// end::simulate-pipeline-request-verbose
// tag::simulate-pipeline-execute
SimulatePipelineResponse response = client.ingest().simulatePipeline(request, RequestOptions.DEFAULT); // <1>
// end::simulate-pipeline-execute
// tag::simulate-pipeline-response
for (SimulateDocumentResult result: response.getResults()) { // <1>
if (request.isVerbose()) {
assert result instanceof SimulateDocumentVerboseResult;
SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult)result; // <2>
for (SimulateProcessorResult processorResult: verboseResult.getProcessorResults()) { // <3>
processorResult.getIngestDocument(); // <4>
processorResult.getFailure(); // <5>
}
} else {
assert result instanceof SimulateDocumentBaseResult;
SimulateDocumentBaseResult baseResult = (SimulateDocumentBaseResult)result; // <6>
baseResult.getIngestDocument(); // <7>
baseResult.getFailure(); // <8>
}
}
// end::simulate-pipeline-response
assert(response.getResults().size() > 0);
}
}
public void testSimulatePipelineAsync() throws Exception {
RestHighLevelClient client = highLevelClient();
{
String source =
"{\"" +
"pipeline\":{" +
"\"description\":\"_description\"," +
"\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]" +
"}," +
"\"docs\":[" +
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"bar\"}}," +
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}" +
"]" +
"}";
SimulatePipelineRequest request = new SimulatePipelineRequest(
new BytesArray(source.getBytes(StandardCharsets.UTF_8)),
XContentType.JSON
);
// tag::simulate-pipeline-execute-listener
ActionListener<SimulatePipelineResponse> listener =
new ActionListener<SimulatePipelineResponse>() {
@Override
public void onResponse(SimulatePipelineResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::simulate-pipeline-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::simulate-pipeline-execute-async
client.ingest().simulatePipelineAsync(request, RequestOptions.DEFAULT, listener); // <1>
// end::simulate-pipeline-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
}

View File

@ -0,0 +1,90 @@
[[java-rest-high-ingest-simulate-pipeline]]
=== Simulate Pipeline API
[[java-rest-high-ingest-simulate-pipeline-request]]
==== Simulate Pipeline Request
A `SimulatePipelineRequest` requires a source and a `XContentType`. The source consists
of the request body. See the https://www.elastic.co/guide/en/elasticsearch/reference/master/simulate-pipeline-api.html[docs]
for more details on the request body.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-request]
--------------------------------------------------
<1> The request body as a `ByteArray`.
<2> The XContentType for the request body supplied above.
==== Optional arguments
The following arguments can optionally be provided:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-request-pipeline-id]
--------------------------------------------------
<1> You can either specify an existing pipeline to execute against the provided documents, or supply a
pipeline definition in the body of the request. This option sets the id for an existing pipeline.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-request-verbose]
--------------------------------------------------
<1> To see the intermediate results of each processor in the simulate request, you can add the verbose parameter
to the request.
[[java-rest-high-ingest-simulate-pipeline-sync]]
==== Synchronous Execution
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-execute]
--------------------------------------------------
<1> Execute the request and get back the response in a `SimulatePipelineResponse` object.
[[java-rest-high-ingest-simulate-pipeline-async]]
==== Asynchronous Execution
The asynchronous execution of a simulate pipeline request requires both the `SimulatePipelineRequest`
instance and an `ActionListener` instance to be passed to the asynchronous
method:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-execute-async]
--------------------------------------------------
<1> The `SimulatePipelineRequest` to execute and the `ActionListener` to use when
the execution completes
The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.
A typical listener for `SimulatePipelineResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument
<2> Called in case of failure. The raised exception is provided as an argument
[[java-rest-high-ingest-simulate-pipeline-response]]
==== Simulate Pipeline Response
The returned `SimulatePipelineResponse` allows to retrieve information about the executed
operation as follows:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-response]
--------------------------------------------------
<1> Get results for each of the documents provided as instance of `List<SimulateDocumentResult>`.
<2> If the request was in verbose mode cast the response to `SimulateDocumentVerboseResult`.
<3> Check the result after each processor is applied.
<4> Get the ingest document for the result obtained in 3.
<5> Or get the failure for the result obtained in 3.
<6> Get the result as `SimulateDocumentBaseResult` if the result was not verbose.
<7> Get the ingest document for the result obtained in 6.
<8> Or get the failure for the result obtained in 6.

View File

@ -123,10 +123,12 @@ The Java High Level REST Client supports the following Ingest APIs:
* <<java-rest-high-ingest-put-pipeline>>
* <<java-rest-high-ingest-get-pipeline>>
* <<java-rest-high-ingest-delete-pipeline>>
* <<java-rest-high-ingest-simulate-pipeline>>
include::ingest/put_pipeline.asciidoc[]
include::ingest/get_pipeline.asciidoc[]
include::ingest/delete_pipeline.asciidoc[]
include::ingest/simulate_pipeline.asciidoc[]
== Snapshot APIs

View File

@ -19,13 +19,18 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.ingest.IngestDocument;
import java.io.IOException;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
/**
* Holds the end result of what a pipeline did to sample document provided via the simulate api.
*/
@ -33,6 +38,33 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult
private final WriteableIngestDocument ingestDocument;
private final Exception failure;
public static final ConstructingObjectParser<SimulateDocumentBaseResult, Void> PARSER =
new ConstructingObjectParser<>(
"simulate_document_base_result",
true,
a -> {
if (a[1] == null) {
assert a[0] != null;
return new SimulateDocumentBaseResult(((WriteableIngestDocument)a[0]).getIngestDocument());
} else {
assert a[0] == null;
return new SimulateDocumentBaseResult((ElasticsearchException)a[1]);
}
}
);
static {
PARSER.declareObject(
optionalConstructorArg(),
WriteableIngestDocument.INGEST_DOC_PARSER,
new ParseField(WriteableIngestDocument.DOC_FIELD)
);
PARSER.declareObject(
optionalConstructorArg(),
(p, c) -> ElasticsearchException.fromXContent(p),
new ParseField("error")
);
}
public SimulateDocumentBaseResult(IngestDocument ingestDocument) {
this.ingestDocument = new WriteableIngestDocument(ingestDocument);
failure = null;
@ -89,4 +121,8 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult
builder.endObject();
return builder;
}
public static SimulateDocumentBaseResult fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
}

View File

@ -18,21 +18,38 @@
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
/**
* Holds the result of what a pipeline did to a sample document via the simulate api, but instead of {@link SimulateDocumentBaseResult}
* this result class holds the intermediate result each processor did to the sample document.
*/
public final class SimulateDocumentVerboseResult implements SimulateDocumentResult {
public static final String PROCESSOR_RESULT_FIELD = "processor_results";
private final List<SimulateProcessorResult> processorResults;
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<SimulateDocumentVerboseResult, Void> PARSER =
new ConstructingObjectParser<>(
"simulate_document_verbose_result",
true,
a -> new SimulateDocumentVerboseResult((List<SimulateProcessorResult>)a[0])
);
static {
PARSER.declareObjectArray(constructorArg(), SimulateProcessorResult.PARSER, new ParseField(PROCESSOR_RESULT_FIELD));
}
public SimulateDocumentVerboseResult(List<SimulateProcessorResult> processorResults) {
this.processorResults = processorResults;
}
@ -63,7 +80,7 @@ public final class SimulateDocumentVerboseResult implements SimulateDocumentResu
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("processor_results");
builder.startArray(PROCESSOR_RESULT_FIELD);
for (SimulateProcessorResult processorResult : processorResults) {
processorResult.toXContent(builder, params);
}
@ -71,4 +88,8 @@ public final class SimulateDocumentVerboseResult implements SimulateDocumentResu
builder.endObject();
return builder;
}
public static SimulateDocumentVerboseResult fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
}

View File

@ -25,6 +25,8 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
@ -42,7 +44,7 @@ import java.util.Objects;
import static org.elasticsearch.ingest.IngestDocument.MetaData;
public class SimulatePipelineRequest extends ActionRequest {
public class SimulatePipelineRequest extends ActionRequest implements ToXContentObject {
private String id;
private boolean verbose;
@ -126,6 +128,12 @@ public class SimulatePipelineRequest extends ActionRequest {
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.rawValue(source.streamInput(), xContentType);
return builder;
}
public static final class Fields {
static final String PIPELINE = "pipeline";
static final String DOCS = "docs";

View File

@ -19,22 +19,90 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
public class SimulatePipelineResponse extends ActionResponse implements ToXContentObject {
private String pipelineId;
private boolean verbose;
private List<SimulateDocumentResult> results;
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<SimulatePipelineResponse, Void> PARSER =
new ConstructingObjectParser<>(
"simulate_pipeline_response",
true,
a -> {
List<SimulateDocumentResult> results = (List<SimulateDocumentResult>)a[0];
boolean verbose = false;
if (results.size() > 0) {
if (results.get(0) instanceof SimulateDocumentVerboseResult) {
verbose = true;
}
}
return new SimulatePipelineResponse(null, verbose, results);
}
);
static {
PARSER.declareObjectArray(
constructorArg(),
(parser, context) -> {
Token token = parser.currentToken();
ensureExpectedToken(Token.START_OBJECT, token, parser::getTokenLocation);
SimulateDocumentResult result = null;
while ((token = parser.nextToken()) != Token.END_OBJECT) {
ensureExpectedToken(token, Token.FIELD_NAME, parser::getTokenLocation);
String fieldName = parser.currentName();
token = parser.nextToken();
if (token == Token.START_ARRAY) {
if (fieldName.equals(SimulateDocumentVerboseResult.PROCESSOR_RESULT_FIELD)) {
List<SimulateProcessorResult> results = new ArrayList<>();
while ((token = parser.nextToken()) == Token.START_OBJECT) {
results.add(SimulateProcessorResult.fromXContent(parser));
}
ensureExpectedToken(Token.END_ARRAY, token, parser::getTokenLocation);
result = new SimulateDocumentVerboseResult(results);
} else {
parser.skipChildren();
}
} else if (token.equals(Token.START_OBJECT)) {
switch (fieldName) {
case WriteableIngestDocument.DOC_FIELD:
result = new SimulateDocumentBaseResult(
WriteableIngestDocument.INGEST_DOC_PARSER.apply(parser, null).getIngestDocument()
);
break;
case "error":
result = new SimulateDocumentBaseResult(ElasticsearchException.fromXContent(parser));
break;
default:
parser.skipChildren();
break;
}
} // else it is a value skip it
}
assert result != null;
return result;
},
new ParseField(Fields.DOCUMENTS));
}
public SimulatePipelineResponse() {
}
@ -98,6 +166,10 @@ public class SimulatePipelineResponse extends ActionResponse implements ToXConte
return builder;
}
public static SimulatePipelineResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
static final class Fields {
static final String DOCUMENTS = "docs";
}

View File

@ -19,33 +19,91 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import java.io.IOException;
class SimulateProcessorResult implements Writeable, ToXContentObject {
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
public class SimulateProcessorResult implements Writeable, ToXContentObject {
private static final String IGNORED_ERROR_FIELD = "ignored_error";
private final String processorTag;
private final WriteableIngestDocument ingestDocument;
private final Exception failure;
SimulateProcessorResult(String processorTag, IngestDocument ingestDocument, Exception failure) {
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<ElasticsearchException, Void> IGNORED_ERROR_PARSER =
new ConstructingObjectParser<>(
"ignored_error_parser",
true,
a -> (ElasticsearchException)a[0]
);
static {
IGNORED_ERROR_PARSER.declareObject(
constructorArg(),
(p, c) -> ElasticsearchException.fromXContent(p),
new ParseField("error")
);
}
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<SimulateProcessorResult, Void> PARSER =
new ConstructingObjectParser<>(
"simulate_processor_result",
true,
a -> {
String processorTag = a[0] == null ? null : (String)a[0];
IngestDocument document = a[1] == null ? null : ((WriteableIngestDocument)a[1]).getIngestDocument();
Exception failure = null;
if (a[2] != null) {
failure = (ElasticsearchException)a[2];
} else if (a[3] != null) {
failure = (ElasticsearchException)a[3];
}
return new SimulateProcessorResult(processorTag, document, failure);
}
);
static {
PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.TAG_KEY));
PARSER.declareObject(
optionalConstructorArg(),
WriteableIngestDocument.INGEST_DOC_PARSER,
new ParseField(WriteableIngestDocument.DOC_FIELD)
);
PARSER.declareObject(
optionalConstructorArg(),
IGNORED_ERROR_PARSER,
new ParseField(IGNORED_ERROR_FIELD)
);
PARSER.declareObject(
optionalConstructorArg(),
(p, c) -> ElasticsearchException.fromXContent(p),
new ParseField("error")
);
}
public SimulateProcessorResult(String processorTag, IngestDocument ingestDocument, Exception failure) {
this.processorTag = processorTag;
this.ingestDocument = (ingestDocument == null) ? null : new WriteableIngestDocument(ingestDocument);
this.failure = failure;
}
SimulateProcessorResult(String processorTag, IngestDocument ingestDocument) {
public SimulateProcessorResult(String processorTag, IngestDocument ingestDocument) {
this(processorTag, ingestDocument, null);
}
SimulateProcessorResult(String processorTag, Exception failure) {
public SimulateProcessorResult(String processorTag, Exception failure) {
this(processorTag, null, failure);
}
@ -98,7 +156,7 @@ class SimulateProcessorResult implements Writeable, ToXContentObject {
}
if (failure != null && ingestDocument != null) {
builder.startObject("ignored_error");
builder.startObject(IGNORED_ERROR_FIELD);
ElasticsearchException.generateFailureXContent(builder, params, failure, true);
builder.endObject();
} else if (failure != null) {
@ -112,4 +170,8 @@ class SimulateProcessorResult implements Writeable, ToXContentObject {
builder.endObject();
return builder;
}
public static SimulateProcessorResult fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
}

View File

@ -20,24 +20,91 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestDocument.MetaData;
import java.io.IOException;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
final class WriteableIngestDocument implements Writeable, ToXContentFragment {
static final String SOURCE_FIELD = "_source";
static final String INGEST_FIELD = "_ingest";
static final String DOC_FIELD = "doc";
private final IngestDocument ingestDocument;
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<WriteableIngestDocument, Void> INGEST_DOC_PARSER =
new ConstructingObjectParser<>(
"ingest_document",
true,
a -> {
HashMap<String, Object> sourceAndMetadata = new HashMap<>();
sourceAndMetadata.put(MetaData.INDEX.getFieldName(), a[0]);
sourceAndMetadata.put(MetaData.TYPE.getFieldName(), a[1]);
sourceAndMetadata.put(MetaData.ID.getFieldName(), a[2]);
if (a[3] != null) {
sourceAndMetadata.put(MetaData.ROUTING.getFieldName(), a[3]);
}
if (a[4] != null) {
sourceAndMetadata.put(MetaData.VERSION.getFieldName(), a[4]);
}
if (a[5] != null) {
sourceAndMetadata.put(MetaData.VERSION_TYPE.getFieldName(), a[5]);
}
sourceAndMetadata.putAll((Map<String, Object>)a[6]);
return new WriteableIngestDocument(new IngestDocument(sourceAndMetadata, (Map<String, Object>)a[7]));
}
);
static {
INGEST_DOC_PARSER.declareString(constructorArg(), new ParseField(MetaData.INDEX.getFieldName()));
INGEST_DOC_PARSER.declareString(constructorArg(), new ParseField(MetaData.TYPE.getFieldName()));
INGEST_DOC_PARSER.declareString(constructorArg(), new ParseField(MetaData.ID.getFieldName()));
INGEST_DOC_PARSER.declareString(optionalConstructorArg(), new ParseField(MetaData.ROUTING.getFieldName()));
INGEST_DOC_PARSER.declareLong(optionalConstructorArg(), new ParseField(MetaData.VERSION.getFieldName()));
INGEST_DOC_PARSER.declareString(optionalConstructorArg(), new ParseField(MetaData.VERSION_TYPE.getFieldName()));
INGEST_DOC_PARSER.declareObject(constructorArg(), (p, c) -> p.map(), new ParseField(SOURCE_FIELD));
INGEST_DOC_PARSER.declareObject(
constructorArg(),
(p, c) -> {
Map<String, Object> ingestMap = p.map();
ingestMap.computeIfPresent(
"timestamp",
(k, o) -> ZonedDateTime.parse((String)o)
);
return ingestMap;
},
new ParseField(INGEST_FIELD)
);
}
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<WriteableIngestDocument, Void> PARSER =
new ConstructingObjectParser<>(
"writeable_ingest_document",
true,
a -> (WriteableIngestDocument)a[0]
);
static {
PARSER.declareObject(constructorArg(), INGEST_DOC_PARSER, new ParseField(DOC_FIELD));
}
WriteableIngestDocument(IngestDocument ingestDocument) {
assert ingestDocument != null;
this.ingestDocument = ingestDocument;
@ -67,19 +134,25 @@ final class WriteableIngestDocument implements Writeable, ToXContentFragment {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("doc");
Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
builder.startObject(DOC_FIELD);
Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.getMetadata();
for (Map.Entry<IngestDocument.MetaData, Object> metadata : metadataMap.entrySet()) {
if (metadata.getValue() != null) {
builder.field(metadata.getKey().getFieldName(), metadata.getValue().toString());
}
}
builder.field("_source", ingestDocument.getSourceAndMetadata());
builder.field("_ingest", ingestDocument.getIngestMetadata());
Map<String, Object> source = IngestDocument.deepCopyMap(ingestDocument.getSourceAndMetadata());
metadataMap.keySet().forEach(mD -> source.remove(mD.getFieldName()));
builder.field(SOURCE_FIELD, source);
builder.field(INGEST_FIELD, ingestDocument.getIngestMetadata());
builder.endObject();
return builder;
}
public static WriteableIngestDocument fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@ -570,6 +570,17 @@ public final class IngestDocument {
return metadataMap;
}
/**
* Does the same thing as {@link #extractMetadata} but does not mutate the map.
*/
public Map<MetaData, Object> getMetadata() {
Map<MetaData, Object> metadataMap = new EnumMap<>(MetaData.class);
for (MetaData metaData : MetaData.values()) {
metadataMap.put(metaData, sourceAndMetadata.get(metaData.getFieldName()));
}
return metadataMap;
}
/**
* Returns the available ingest metadata fields, by default only timestamp, but it is possible to set additional ones.
* Use only for reading values, modify them instead using {@link #setFieldValue(String, Object)} and {@link #removeField(String)}
@ -588,7 +599,7 @@ public final class IngestDocument {
}
@SuppressWarnings("unchecked")
private static <K, V> Map<K, V> deepCopyMap(Map<K, V> source) {
public static <K, V> Map<K, V> deepCopyMap(Map<K, V> source) {
return (Map<K, V>) deepCopy(source);
}

View File

@ -0,0 +1,138 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.StringJoiner;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.elasticsearch.action.ingest.WriteableIngestDocumentTests.createRandomIngestDoc;
public class SimulateDocumentBaseResultTests extends AbstractXContentTestCase<SimulateDocumentBaseResult> {
public void testSerialization() throws IOException {
boolean isFailure = randomBoolean();
SimulateDocumentBaseResult simulateDocumentBaseResult = createTestInstance(isFailure);
BytesStreamOutput out = new BytesStreamOutput();
simulateDocumentBaseResult.writeTo(out);
StreamInput streamInput = out.bytes().streamInput();
SimulateDocumentBaseResult otherSimulateDocumentBaseResult = new SimulateDocumentBaseResult(streamInput);
if (isFailure) {
assertThat(otherSimulateDocumentBaseResult.getIngestDocument(), equalTo(simulateDocumentBaseResult.getIngestDocument()));
assertThat(otherSimulateDocumentBaseResult.getFailure(), instanceOf(IllegalArgumentException.class));
IllegalArgumentException e = (IllegalArgumentException) otherSimulateDocumentBaseResult.getFailure();
assertThat(e.getMessage(), equalTo("test"));
} else {
assertIngestDocument(otherSimulateDocumentBaseResult.getIngestDocument(), simulateDocumentBaseResult.getIngestDocument());
}
}
static SimulateDocumentBaseResult createTestInstance(boolean isFailure) {
SimulateDocumentBaseResult simulateDocumentBaseResult;
if (isFailure) {
simulateDocumentBaseResult = new SimulateDocumentBaseResult(new IllegalArgumentException("test"));
} else {
IngestDocument ingestDocument = createRandomIngestDoc();
simulateDocumentBaseResult = new SimulateDocumentBaseResult(ingestDocument);
}
return simulateDocumentBaseResult;
}
private static SimulateDocumentBaseResult createTestInstanceWithFailures() {
return createTestInstance(randomBoolean());
}
@Override
protected SimulateDocumentBaseResult createTestInstance() {
return createTestInstance(false);
}
@Override
protected SimulateDocumentBaseResult doParseInstance(XContentParser parser) {
return SimulateDocumentBaseResult.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
// We cannot have random fields in the _source field and _ingest field
return field ->
field.contains(
new StringJoiner(".")
.add(WriteableIngestDocument.DOC_FIELD)
.add(WriteableIngestDocument.SOURCE_FIELD).toString()
) ||
field.contains(
new StringJoiner(".")
.add(WriteableIngestDocument.DOC_FIELD)
.add(WriteableIngestDocument.INGEST_FIELD).toString()
);
}
public static void assertEqualDocs(SimulateDocumentBaseResult response, SimulateDocumentBaseResult parsedResponse) {
assertEquals(response.getIngestDocument(), parsedResponse.getIngestDocument());
if (response.getFailure() != null) {
assertNotNull(parsedResponse.getFailure());
assertThat(
parsedResponse.getFailure().getMessage(),
containsString(response.getFailure().getMessage())
);
} else {
assertNull(parsedResponse.getFailure());
}
}
@Override
public void assertEqualInstances(SimulateDocumentBaseResult response, SimulateDocumentBaseResult parsedResponse) {
assertEqualDocs(response, parsedResponse);
}
/**
* Test parsing {@link SimulateDocumentBaseResult} with inner failures as they don't support asserting on xcontent
* equivalence, given that exceptions are not parsed back as the same original class. We run the usual
* {@link AbstractXContentTestCase#testFromXContent()} without failures, and this other test with failures where
* we disable asserting on xcontent equivalence at the end.
*/
public void testFromXContentWithFailures() throws IOException {
Supplier<SimulateDocumentBaseResult> instanceSupplier = SimulateDocumentBaseResultTests::createTestInstanceWithFailures;
//exceptions are not of the same type whenever parsed back
boolean assertToXContentEquivalence = false;
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields(),
getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
this::assertEqualInstances, assertToXContentEquivalence, getToXContentParams());
}
}

View File

@ -1,60 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
public class SimulateDocumentSimpleResultTests extends ESTestCase {
public void testSerialization() throws IOException {
boolean isFailure = randomBoolean();
SimulateDocumentBaseResult simulateDocumentBaseResult;
if (isFailure) {
simulateDocumentBaseResult = new SimulateDocumentBaseResult(new IllegalArgumentException("test"));
} else {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
simulateDocumentBaseResult = new SimulateDocumentBaseResult(ingestDocument);
}
BytesStreamOutput out = new BytesStreamOutput();
simulateDocumentBaseResult.writeTo(out);
StreamInput streamInput = out.bytes().streamInput();
SimulateDocumentBaseResult otherSimulateDocumentBaseResult = new SimulateDocumentBaseResult(streamInput);
if (isFailure) {
assertThat(otherSimulateDocumentBaseResult.getIngestDocument(), equalTo(simulateDocumentBaseResult.getIngestDocument()));
assertThat(otherSimulateDocumentBaseResult.getFailure(), instanceOf(IllegalArgumentException.class));
IllegalArgumentException e = (IllegalArgumentException) otherSimulateDocumentBaseResult.getFailure();
assertThat(e.getMessage(), equalTo("test"));
} else {
assertIngestDocument(otherSimulateDocumentBaseResult.getIngestDocument(), simulateDocumentBaseResult.getIngestDocument());
}
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringJoiner;
import java.util.function.Predicate;
import java.util.function.Supplier;
public class SimulateDocumentVerboseResultTests extends AbstractXContentTestCase<SimulateDocumentVerboseResult> {
static SimulateDocumentVerboseResult createTestInstance(boolean withFailures) {
int numDocs = randomIntBetween(0, 10);
List<SimulateProcessorResult> results = new ArrayList<>();
for (int i = 0; i<numDocs; i++) {
boolean isSuccessful = !(withFailures && randomBoolean());
boolean isIgnoredError = withFailures && randomBoolean();
results.add(
SimulateProcessorResultTests
.createTestInstance(isSuccessful, isIgnoredError)
);
}
return new SimulateDocumentVerboseResult(results);
}
private static SimulateDocumentVerboseResult createTestInstanceWithFailures() {
return createTestInstance(true);
}
@Override
protected SimulateDocumentVerboseResult createTestInstance() {
return createTestInstance(false);
}
@Override
protected SimulateDocumentVerboseResult doParseInstance(XContentParser parser) {
return SimulateDocumentVerboseResult.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
static void assertEqualDocs(SimulateDocumentVerboseResult response,
SimulateDocumentVerboseResult parsedResponse) {
assertEquals(response.getProcessorResults().size(), parsedResponse.getProcessorResults().size());
for (int i=0; i < response.getProcessorResults().size(); i++) {
SimulateProcessorResultTests.assertEqualProcessorResults(
response.getProcessorResults().get(i),
parsedResponse.getProcessorResults().get(i)
);
}
}
@Override
protected void assertEqualInstances(SimulateDocumentVerboseResult response,
SimulateDocumentVerboseResult parsedResponse) {
assertEqualDocs(response, parsedResponse);
}
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
// We cannot have random fields in the _source field and _ingest field
return field ->
field.contains(
new StringJoiner(".")
.add(WriteableIngestDocument.DOC_FIELD)
.add(WriteableIngestDocument.SOURCE_FIELD).toString()
) ||
field.contains(
new StringJoiner(".")
.add(WriteableIngestDocument.DOC_FIELD)
.add(WriteableIngestDocument.INGEST_FIELD).toString()
);
}
/**
* Test parsing {@link SimulateDocumentVerboseResult} with inner failures as they don't support asserting on xcontent
* equivalence, given that exceptions are not parsed back as the same original class. We run the usual
* {@link AbstractXContentTestCase#testFromXContent()} without failures, and this other test with failures where we
* disable asserting on xcontent equivalence at the end.
*/
public void testFromXContentWithFailures() throws IOException {
Supplier<SimulateDocumentVerboseResult> instanceSupplier = SimulateDocumentVerboseResultTests::createTestInstanceWithFailures;
//exceptions are not of the same type whenever parsed back
boolean assertToXContentEquivalence = false;
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields(),
getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
this::assertEqualInstances, assertToXContentEquivalence, getToXContentParams());
}
}

View File

@ -21,57 +21,29 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringJoiner;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.nullValue;
public class SimulatePipelineResponseTests extends ESTestCase {
public class SimulatePipelineResponseTests extends AbstractXContentTestCase<SimulatePipelineResponse> {
public void testSerialization() throws IOException {
boolean isVerbose = randomBoolean();
String id = randomBoolean() ? randomAlphaOfLengthBetween(1, 10) : null;
int numResults = randomIntBetween(1, 10);
List<SimulateDocumentResult> results = new ArrayList<>(numResults);
for (int i = 0; i < numResults; i++) {
boolean isFailure = randomBoolean();
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
if (isVerbose) {
int numProcessors = randomIntBetween(1, 10);
List<SimulateProcessorResult> processorResults = new ArrayList<>(numProcessors);
for (int j = 0; j < numProcessors; j++) {
String processorTag = randomAlphaOfLengthBetween(1, 10);
SimulateProcessorResult processorResult;
if (isFailure) {
processorResult = new SimulateProcessorResult(processorTag, new IllegalArgumentException("test"));
} else {
processorResult = new SimulateProcessorResult(processorTag, ingestDocument);
}
processorResults.add(processorResult);
}
results.add(new SimulateDocumentVerboseResult(processorResults));
} else {
results.add(new SimulateDocumentBaseResult(ingestDocument));
SimulateDocumentBaseResult simulateDocumentBaseResult;
if (isFailure) {
simulateDocumentBaseResult = new SimulateDocumentBaseResult(new IllegalArgumentException("test"));
} else {
simulateDocumentBaseResult = new SimulateDocumentBaseResult(ingestDocument);
}
results.add(simulateDocumentBaseResult);
}
}
SimulatePipelineResponse response = new SimulatePipelineResponse(id, isVerbose, results);
SimulatePipelineResponse response = createInstance(id, isVerbose, true);
BytesStreamOutput out = new BytesStreamOutput();
response.writeTo(out);
StreamInput streamInput = out.bytes().streamInput();
@ -120,4 +92,97 @@ public class SimulatePipelineResponseTests extends ESTestCase {
}
}
}
static SimulatePipelineResponse createInstance(String pipelineId, boolean isVerbose, boolean withFailure) {
int numResults = randomIntBetween(1, 10);
List<SimulateDocumentResult> results = new ArrayList<>(numResults);
for (int i = 0; i < numResults; i++) {
if (isVerbose) {
results.add(
SimulateDocumentVerboseResultTests.createTestInstance(withFailure)
);
} else {
results.add(
SimulateDocumentBaseResultTests.createTestInstance(withFailure && randomBoolean())
);
}
}
return new SimulatePipelineResponse(pipelineId, isVerbose, results);
}
private static SimulatePipelineResponse createTestInstanceWithFailures() {
boolean isVerbose = randomBoolean();
return createInstance(null, isVerbose, false);
}
@Override
protected SimulatePipelineResponse createTestInstance() {
boolean isVerbose = randomBoolean();
// since the pipeline id is not serialized with XContent we set it to null for equality tests.
// we test failures separately since comparing XContent is not possible with failures
return createInstance(null, isVerbose, false);
}
@Override
protected SimulatePipelineResponse doParseInstance(XContentParser parser) {
return SimulatePipelineResponse.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected void assertEqualInstances(SimulatePipelineResponse response,
SimulatePipelineResponse parsedResponse) {
assertEquals(response.getPipelineId(), parsedResponse.getPipelineId());
assertEquals(response.isVerbose(), parsedResponse.isVerbose());
assertEquals(response.getResults().size(), parsedResponse.getResults().size());
for (int i=0; i < response.getResults().size(); i++) {
if (response.isVerbose()) {
assertThat(response.getResults().get(i), instanceOf(SimulateDocumentVerboseResult.class));
assertThat(parsedResponse.getResults().get(i), instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult responseResult = (SimulateDocumentVerboseResult)response.getResults().get(i);
SimulateDocumentVerboseResult parsedResult = (SimulateDocumentVerboseResult)parsedResponse.getResults().get(i);
SimulateDocumentVerboseResultTests.assertEqualDocs(responseResult, parsedResult);
} else {
assertThat(response.getResults().get(i), instanceOf(SimulateDocumentBaseResult.class));
assertThat(parsedResponse.getResults().get(i), instanceOf(SimulateDocumentBaseResult.class));
SimulateDocumentBaseResult responseResult = (SimulateDocumentBaseResult)response.getResults().get(i);
SimulateDocumentBaseResult parsedResult = (SimulateDocumentBaseResult)parsedResponse.getResults().get(i);
SimulateDocumentBaseResultTests.assertEqualDocs(responseResult, parsedResult);
}
}
}
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
// We cannot have random fields in the _source field and _ingest field
return field ->
field.contains(
new StringJoiner(".")
.add(WriteableIngestDocument.DOC_FIELD)
.add(WriteableIngestDocument.SOURCE_FIELD).toString()
) ||
field.contains(
new StringJoiner(".")
.add(WriteableIngestDocument.DOC_FIELD)
.add(WriteableIngestDocument.INGEST_FIELD).toString()
);
}
/**
* Test parsing {@link SimulatePipelineResponse} with inner failures as they don't support asserting on xcontent equivalence, given that
* exceptions are not parsed back as the same original class. We run the usual {@link AbstractXContentTestCase#testFromXContent()}
* without failures, and this other test with failures where we disable asserting on xcontent equivalence at the end.
*/
public void testFromXContentWithFailures() throws IOException {
Supplier<SimulatePipelineResponse> instanceSupplier = SimulatePipelineResponseTests::createTestInstanceWithFailures;
//exceptions are not of the same type whenever parsed back
boolean assertToXContentEquivalence = false;
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields(), getShuffleFieldsExceptions(),
getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
this::assertEqualInstances, assertToXContentEquivalence, getToXContentParams());
}
}

View File

@ -21,35 +21,29 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.StringJoiner;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.elasticsearch.action.ingest.WriteableIngestDocumentTests.createRandomIngestDoc;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class SimulateProcessorResultTests extends ESTestCase {
public class SimulateProcessorResultTests extends AbstractXContentTestCase<SimulateProcessorResult> {
public void testSerialization() throws IOException {
String processorTag = randomAlphaOfLengthBetween(1, 10);
boolean isSuccessful = randomBoolean();
boolean isIgnoredException = randomBoolean();
SimulateProcessorResult simulateProcessorResult;
if (isSuccessful) {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
if (isIgnoredException) {
simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument, new IllegalArgumentException("test"));
} else {
simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument);
}
} else {
simulateProcessorResult = new SimulateProcessorResult(processorTag, new IllegalArgumentException("test"));
}
SimulateProcessorResult simulateProcessorResult = createTestInstance(isSuccessful, isIgnoredException);
BytesStreamOutput out = new BytesStreamOutput();
simulateProcessorResult.writeTo(out);
@ -72,4 +66,96 @@ public class SimulateProcessorResultTests extends ESTestCase {
assertThat(e.getMessage(), equalTo("test"));
}
}
static SimulateProcessorResult createTestInstance(boolean isSuccessful,
boolean isIgnoredException) {
String processorTag = randomAlphaOfLengthBetween(1, 10);
SimulateProcessorResult simulateProcessorResult;
if (isSuccessful) {
IngestDocument ingestDocument = createRandomIngestDoc();
if (isIgnoredException) {
simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument, new IllegalArgumentException("test"));
} else {
simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument);
}
} else {
simulateProcessorResult = new SimulateProcessorResult(processorTag, new IllegalArgumentException("test"));
}
return simulateProcessorResult;
}
private static SimulateProcessorResult createTestInstanceWithFailures() {
boolean isSuccessful = randomBoolean();
boolean isIgnoredException = randomBoolean();
return createTestInstance(isSuccessful, isIgnoredException);
}
@Override
protected SimulateProcessorResult createTestInstance() {
// we test failures separately since comparing XContent is not possible with failures
return createTestInstance(true, false);
}
@Override
protected SimulateProcessorResult doParseInstance(XContentParser parser) {
return SimulateProcessorResult.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
// We cannot have random fields in the _source field and _ingest field
return field ->
field.startsWith(
new StringJoiner(".")
.add(WriteableIngestDocument.DOC_FIELD)
.add(WriteableIngestDocument.SOURCE_FIELD).toString()
) ||
field.startsWith(
new StringJoiner(".")
.add(WriteableIngestDocument.DOC_FIELD)
.add(WriteableIngestDocument.INGEST_FIELD).toString()
);
}
static void assertEqualProcessorResults(SimulateProcessorResult response,
SimulateProcessorResult parsedResponse) {
assertEquals(response.getProcessorTag(), parsedResponse.getProcessorTag());
assertEquals(response.getIngestDocument(), parsedResponse.getIngestDocument());
if (response.getFailure() != null ) {
assertNotNull(parsedResponse.getFailure());
assertThat(
parsedResponse.getFailure().getMessage(),
containsString(response.getFailure().getMessage())
);
} else {
assertNull(parsedResponse.getFailure());
}
}
@Override
protected void assertEqualInstances(SimulateProcessorResult response, SimulateProcessorResult parsedResponse) {
assertEqualProcessorResults(response, parsedResponse);
}
/**
* Test parsing {@link SimulateProcessorResult} with inner failures as they don't support asserting on xcontent equivalence, given that
* exceptions are not parsed back as the same original class. We run the usual {@link AbstractXContentTestCase#testFromXContent()}
* without failures, and this other test with failures where we disable asserting on xcontent equivalence at the end.
*/
public void testFromXContentWithFailures() throws IOException {
Supplier<SimulateProcessorResult> instanceSupplier = SimulateProcessorResultTests::createTestInstanceWithFailures;
//with random fields insertion in the inner exceptions, some random stuff may be parsed back as metadata,
//but that does not bother our assertions, as we only want to test that we don't break.
boolean supportsUnknownFields = true;
//exceptions are not of the same type whenever parsed back
boolean assertToXContentEquivalence = false;
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields,
getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
this::assertEqualInstances, assertToXContentEquivalence, getToXContentParams());
}
}

View File

@ -25,14 +25,19 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.test.RandomObjects;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.StringJoiner;
import java.util.function.Predicate;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
@ -40,7 +45,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
public class WriteableIngestDocumentTests extends ESTestCase {
public class WriteableIngestDocumentTests extends AbstractXContentTestCase<WriteableIngestDocument> {
public void testEqualsAndHashcode() throws Exception {
Map<String, Object> sourceAndMetadata = RandomDocumentPicks.randomSource(random());
@ -147,4 +152,42 @@ public class WriteableIngestDocumentTests extends ESTestCase {
IngestDocument serializedIngestDocument = new IngestDocument(toXContentSource, toXContentIngestMetadata);
assertThat(serializedIngestDocument, equalTo(serializedIngestDocument));
}
static IngestDocument createRandomIngestDoc() {
XContentType xContentType = randomFrom(XContentType.values());
BytesReference sourceBytes = RandomObjects.randomSource(random(), xContentType);
Map<String, Object> randomSource = XContentHelper.convertToMap(sourceBytes, false, xContentType).v2();
return RandomDocumentPicks.randomIngestDocument(random(), randomSource);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected WriteableIngestDocument createTestInstance() {
return new WriteableIngestDocument(createRandomIngestDoc());
}
@Override
protected WriteableIngestDocument doParseInstance(XContentParser parser) {
return WriteableIngestDocument.fromXContent(parser);
}
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
// We cannot have random fields in the _source field and _ingest field
return field ->
field.startsWith(
new StringJoiner(".")
.add(WriteableIngestDocument.DOC_FIELD)
.add(WriteableIngestDocument.SOURCE_FIELD).toString()
) ||
field.startsWith(
new StringJoiner(".")
.add(WriteableIngestDocument.DOC_FIELD)
.add(WriteableIngestDocument.INGEST_FIELD).toString()
);
}
}