Add tests for ingesting CBOR data attachments (#49715)

Our docs specifically mention that CBOR is supported when ingesting attachments. However this is not tested anywhere.

This adds a test, that uses specifically CBOR format in its IndexRequest and another one that behaves like CBOR in the ingest attachment unit tests.
This commit is contained in:
Alexander Reelsen 2019-12-06 14:33:02 +01:00
parent e60837aa3b
commit d299bf5760
2 changed files with 41 additions and 3 deletions

View File

@ -284,7 +284,7 @@ public class AttachmentProcessorTests extends ESTestCase {
private Map<String, Object> parseDocument(String file, AttachmentProcessor processor, Map<String, Object> optionalFields)
throws Exception {
Map<String, Object> document = new HashMap<>();
document.put("source_field", getAsBase64(file));
document.put("source_field", getAsBinaryOrBase64(file));
document.putAll(optionalFields);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
@ -335,11 +335,16 @@ public class AttachmentProcessorTests extends ESTestCase {
assertThat(attachmentData.get("content_length"), is(56L));
}
private String getAsBase64(String filename) throws Exception {
private Object getAsBinaryOrBase64(String filename) throws Exception {
String path = "/org/elasticsearch/ingest/attachment/test/sample-files/" + filename;
try (InputStream is = AttachmentProcessorTests.class.getResourceAsStream(path)) {
byte bytes[] = IOUtils.toByteArray(is);
return Base64.getEncoder().encodeToString(bytes);
// behave like CBOR from time to time
if (rarely()) {
return bytes;
} else {
return Base64.getEncoder().encodeToString(bytes);
}
}
}
}

View File

@ -46,7 +46,9 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.cbor.CborXContent;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.MockScriptEngine;
@ -61,6 +63,7 @@ import org.hamcrest.CustomTypeSafeMatcher;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@ -70,6 +73,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
@ -1227,6 +1231,35 @@ public class IngestServiceTests extends ESTestCase {
assertThat(counter.get(), equalTo(2));
}
public void testCBORParsing() throws Exception {
AtomicReference<Object> reference = new AtomicReference<>();
Consumer<IngestDocument> executor = doc -> reference.set(doc.getFieldValueAsBytes("data"));
final IngestService ingestService = createWithProcessors(Collections.singletonMap("foo",
(factories, tag, config) -> new FakeProcessor("foo", tag, executor)));
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
ClusterState previousClusterState = clusterState;
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
new BytesArray("{\"processors\": [{\"foo\" : {}}]}"), XContentType.JSON);
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
assertThat(ingestService.getPipeline("_id"), notNullValue());
try (XContentBuilder builder = CborXContent.contentBuilder()) {
builder.startObject();
builder.field("data", "This is my data".getBytes(StandardCharsets.UTF_8));
builder.endObject();
IndexRequest indexRequest =
new IndexRequest("_index").id("_doc-id").source(builder).setPipeline("_id").setFinalPipeline("_none");
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest),
(integer, e) -> {}, (thread, e) -> {}, indexReq -> {});
}
assertThat(reference.get(), is(instanceOf(byte[].class)));
}
private IngestDocument eqIndexTypeId(final Map<String, Object> source) {
return argThat(new IngestDocumentMatcher("_index", "_type", "_id", -3L, VersionType.INTERNAL, source));
}