From 5535ff0a441c7c73d3118e9fd512234a3121f62b Mon Sep 17 00:00:00 2001 From: James Baiera Date: Mon, 26 Aug 2019 14:33:33 -0400 Subject: [PATCH] Fix IngestService to respect original document content type (#45799) (#45984) Backport of #45799 This PR modifies the logic in IngestService to preserve the original content type on the IndexRequest, such that when a document with a content type like SMILE is submitted to a pipeline, the resulting document that is persisted will remain in the original content type (SMILE in this case). --- .../elasticsearch/ingest/IngestService.java | 2 +- .../ingest/IngestServiceTests.java | 30 +++++++++++++++---- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 439e0bea094..312f2967144 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -438,7 +438,7 @@ public class IngestService implements ClusterStateApplier { if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) { indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); } - indexRequest.source(ingestDocument.getSourceAndMetadata()); + indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType()); } } catch (Exception e) { totalMetrics.ingestFailed(); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index e10b1cd154d..c2f03081584 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; @@ -71,6 +72,7 @@ import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -909,20 +911,33 @@ public class IngestServiceTests extends ESTestCase { verify(completionHandler, times(1)).accept(null); } - public void testBulkRequestExecution() { + public void testBulkRequestExecution() throws Exception { BulkRequest bulkRequest = new BulkRequest(); String pipelineId = "_id"; + // Test to make sure that ingest respects content types other than the default index content type + XContentType xContentType = randomFrom(Arrays.stream(XContentType.values()) + .filter(t -> Requests.INDEX_CONTENT_TYPE.equals(t) == false) + .collect(Collectors.toList())); + + logger.info("Using [{}], not randomly determined default [{}]", xContentType, Requests.INDEX_CONTENT_TYPE); int numRequest = scaledRandomIntBetween(8, 64); for (int i = 0; i < numRequest; i++) { IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId); - indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); + indexRequest.source(xContentType, "field1", "value1"); bulkRequest.add(indexRequest); } - IngestService ingestService = createWithProcessors(emptyMap()); - PutPipelineRequest putRequest = - new PutPipelineRequest("_id", new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); + final Processor processor = mock(Processor.class); + when(processor.getType()).thenReturn("mock"); + when(processor.getTag()).thenReturn("mockTag"); + when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random())); + Map map = new HashMap<>(2); + map.put("mock", (factories, tag, config) -> processor); + + IngestService ingestService = createWithProcessors(map); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\": {}}], \"description\": \"_description\"}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); ClusterState previousClusterState = clusterState; clusterState = IngestService.innerPut(putRequest, clusterState); @@ -936,6 +951,11 @@ public class IngestServiceTests extends ESTestCase { verify(requestItemErrorHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(null); + for (DocWriteRequest docWriteRequest : bulkRequest.requests()) { + IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(docWriteRequest); + assertThat(indexRequest, notNullValue()); + assertThat(indexRequest.getContentType(), equalTo(xContentType)); + } } public void testStats() throws Exception {