Fix IngestService to respect original document content type (#45799) (#45984)

Backport of #45799

This PR modifies the logic in IngestService to preserve the original content type 
on the IndexRequest, such that when a document with a content type like SMILE 
is submitted to a pipeline, the resulting document that is persisted will remain in 
the original content type (SMILE in this case).
This commit is contained in:
James Baiera 2019-08-26 14:33:33 -04:00 committed by GitHub
parent ff7fd9b9e2
commit 5535ff0a44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 6 deletions

View File

@ -438,7 +438,7 @@ public class IngestService implements ClusterStateApplier {
if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) { if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) {
indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE)));
} }
indexRequest.source(ingestDocument.getSourceAndMetadata()); indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType());
} }
} catch (Exception e) { } catch (Exception e) {
totalMetrics.ingestFailed(); totalMetrics.ingestFailed();

View File

@ -29,6 +29,7 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest;
@ -71,6 +72,7 @@ import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
@ -909,20 +911,33 @@ public class IngestServiceTests extends ESTestCase {
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(null);
} }
public void testBulkRequestExecution() { public void testBulkRequestExecution() throws Exception {
BulkRequest bulkRequest = new BulkRequest(); BulkRequest bulkRequest = new BulkRequest();
String pipelineId = "_id"; String pipelineId = "_id";
// Test to make sure that ingest respects content types other than the default index content type
XContentType xContentType = randomFrom(Arrays.stream(XContentType.values())
.filter(t -> Requests.INDEX_CONTENT_TYPE.equals(t) == false)
.collect(Collectors.toList()));
logger.info("Using [{}], not randomly determined default [{}]", xContentType, Requests.INDEX_CONTENT_TYPE);
int numRequest = scaledRandomIntBetween(8, 64); int numRequest = scaledRandomIntBetween(8, 64);
for (int i = 0; i < numRequest; i++) { for (int i = 0; i < numRequest; i++) {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId);
indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); indexRequest.source(xContentType, "field1", "value1");
bulkRequest.add(indexRequest); bulkRequest.add(indexRequest);
} }
IngestService ingestService = createWithProcessors(emptyMap()); final Processor processor = mock(Processor.class);
PutPipelineRequest putRequest = when(processor.getType()).thenReturn("mock");
new PutPipelineRequest("_id", new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); when(processor.getTag()).thenReturn("mockTag");
when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random()));
Map<String, Processor.Factory> map = new HashMap<>(2);
map.put("mock", (factories, tag, config) -> processor);
IngestService ingestService = createWithProcessors(map);
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
new BytesArray("{\"processors\": [{\"mock\": {}}], \"description\": \"_description\"}"), XContentType.JSON);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
ClusterState previousClusterState = clusterState; ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState); clusterState = IngestService.innerPut(putRequest, clusterState);
@ -936,6 +951,11 @@ public class IngestServiceTests extends ESTestCase {
verify(requestItemErrorHandler, never()).accept(any(), any()); verify(requestItemErrorHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(null); verify(completionHandler, times(1)).accept(null);
for (DocWriteRequest<?> docWriteRequest : bulkRequest.requests()) {
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(docWriteRequest);
assertThat(indexRequest, notNullValue());
assertThat(indexRequest.getContentType(), equalTo(xContentType));
}
} }
public void testStats() throws Exception { public void testStats() throws Exception {