diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml index 08a2b7beba..99495241ef 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml @@ -41,6 +41,18 @@ language governing permissions and limitations under the License. --> org.apache.nifi nifi-utils + + org.apache.nifi + nifi-record-path + + + org.apache.nifi + nifi-record-serialization-service-api + + + org.apache.nifi + nifi-record + org.apache.commons commons-lang3 @@ -55,6 +67,11 @@ language governing permissions and limitations under the License. --> nifi-mock test + + org.apache.nifi + nifi-mock-record-utils + test + org.elasticsearch elasticsearch diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java new file mode 100644 index 0000000000..35402d794b --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +/** + * A domain-specific exception for when a valid Elasticsearch document identifier is expected but not found + */ +public class IdentifierNotFoundException extends Exception { + + public IdentifierNotFoundException() { + } + + public IdentifierNotFoundException(String message) { + super(message); + } + + public IdentifierNotFoundException(String message, Throwable cause) { + super(message, cause); + } + + public IdentifierNotFoundException(Throwable cause) { + super(cause); + } + + public IdentifierNotFoundException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java new file mode 100644 index 0000000000..24d0057590 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java @@ -0,0 +1,565 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.util.RecordPathCache; +import org.apache.nifi.record.path.validation.RecordPathValidator; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.type.MapDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http", "record"}) +@CapabilityDescription("Writes the records from a FlowFile into to Elasticsearch, using the specified parameters such as " + + "the index to insert into and the type of the document, as well as the operation type (index, upsert, delete, etc.). Note: The Bulk API is used to " + + "send the records. This means that the entire contents of the incoming flow file are read into memory, and each record is transformed into a JSON document " + + "which is added to a single HTTP request body. For very large flow files (files with a large number of records, e.g.), this could cause memory usage issues.") +public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); + + public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") + .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") + .build(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("put-es-record-record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder() + .name("put-es-record-id-path") + .displayName("Identifier Record Path") + .description("A RecordPath pointing to a field in the record(s) that contains the identifier for the document. If the Index Operation is \"index\", " + + "this property may be left empty or evaluate to an empty value, in which case the document's identifier will be " + + "auto-generated by Elasticsearch. For all other Index Operations, the field's value must be non-empty.") + .required(false) + .addValidator(new RecordPathValidator()) + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("put-es-record-index") + .displayName("Index") + .description("The name of the index to insert into") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( + AttributeExpression.ResultType.STRING, true)) + .build(); + + static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("put-es-record-type") + .displayName("Type") + .description("The type of this document (used by Elasticsearch for indexing and searching)") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder() + .name("put-es-record-index-op") + .displayName("Index Operation") + .description("The type of the operation used to index (index, update, upsert, delete)") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .defaultValue("index") + .build(); + + private static final Set relationships; + private static final List propertyDescriptors; + + private volatile RecordPathCache recordPathCache; + + private final JsonFactory factory = new JsonFactory(); + + static { + final Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + _rels.add(REL_RETRY); + relationships = Collections.unmodifiableSet(_rels); + + final List descriptors = new ArrayList<>(); + descriptors.add(ES_URL); + descriptors.add(PROP_SSL_CONTEXT_SERVICE); + descriptors.add(USERNAME); + descriptors.add(PASSWORD); + descriptors.add(CONNECT_TIMEOUT); + descriptors.add(RESPONSE_TIMEOUT); + descriptors.add(RECORD_READER); + descriptors.add(ID_RECORD_PATH); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(INDEX_OP); + + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List problems = new ArrayList<>(super.customValidate(validationContext)); + // Since Expression Language is allowed for index operation, we can't guarantee that we can catch + // all invalid configurations, but we should catch them as soon as we can. For example, if the + // Identifier Record Path property is empty, the Index Operation must evaluate to "index". + String idPath = validationContext.getProperty(ID_RECORD_PATH).getValue(); + String indexOp = validationContext.getProperty(INDEX_OP).getValue(); + + if (StringUtils.isEmpty(idPath)) { + switch (indexOp.toLowerCase()) { + case "update": + case "upsert": + case "delete": + case "": + problems.add(new ValidationResult.Builder() + .valid(false) + .subject(INDEX_OP.getDisplayName()) + .explanation("If Identifier Record Path is not set, Index Operation must evaluate to \"index\"") + .build()); + break; + default: + break; + } + } + return problems; + } + + @OnScheduled + public void setup(ProcessContext context) { + super.setup(context); + recordPathCache = new RecordPathCache(10); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + + // Authentication + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions(flowFile).getValue(); + + OkHttpClient okHttpClient = getClient(); + final ComponentLog logger = getLogger(); + + final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue()); + final URL url; + try { + url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk"); + } catch (MalformedURLException mue) { + // Since we have a URL validator, something has gone very wrong, throw a ProcessException + context.yield(); + throw new ProcessException(mue); + } + + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isEmpty(index)) { + logger.error("No value for index in for {}, transferring to failure", new Object[]{flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue(); + String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isEmpty(indexOp)) { + logger.error("No Index operation specified for {}, transferring to failure.", new Object[]{flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + switch (indexOp.toLowerCase()) { + case "index": + case "update": + case "upsert": + case "delete": + break; + default: + logger.error("Index operation {} not supported for {}, transferring to failure.", new Object[]{indexOp, flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue(); + final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path); + final StringBuilder sb = new StringBuilder(); + + try (final InputStream in = session.read(flowFile); + final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) { + + Record record; + while ((record = reader.nextRecord()) != null) { + + final String id; + if (recordPath != null) { + Optional idPathValue = recordPath.evaluate(record).getSelectedFields().findFirst(); + if (!idPathValue.isPresent() || idPathValue.get().getValue() == null) { + throw new IdentifierNotFoundException("Identifier Record Path specified but no value was found, transferring {} to failure."); + } + id = idPathValue.get().getValue().toString(); + } else { + id = null; + } + + // The ID must be valid for all operations except "index". For that case, + // a missing ID indicates one is to be auto-generated by Elasticsearch + if (id == null && !indexOp.equalsIgnoreCase("index")) { + throw new IdentifierNotFoundException("Index operation {} requires a valid identifier value from a flow file attribute, transferring to failure."); + } + + final StringBuilder json = new StringBuilder(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + JsonGenerator generator = factory.createJsonGenerator(out); + writeRecord(record, record.getSchema(), generator); + generator.flush(); + generator.close(); + json.append(out.toString()); + + if (indexOp.equalsIgnoreCase("index")) { + sb.append("{\"index\": { \"_index\": \""); + sb.append(index); + sb.append("\", \"_type\": \""); + sb.append(docType); + sb.append("\""); + if (!StringUtils.isEmpty(id)) { + sb.append(", \"_id\": \""); + sb.append(id); + sb.append("\""); + } + sb.append("}}\n"); + sb.append(json); + sb.append("\n"); + } else if (indexOp.equalsIgnoreCase("upsert") || indexOp.equalsIgnoreCase("update")) { + sb.append("{\"update\": { \"_index\": \""); + sb.append(index); + sb.append("\", \"_type\": \""); + sb.append(docType); + sb.append("\", \"_id\": \""); + sb.append(id); + sb.append("\" }\n"); + sb.append("{\"doc\": "); + sb.append(json); + sb.append(", \"doc_as_upsert\": "); + sb.append(indexOp.equalsIgnoreCase("upsert")); + sb.append(" }\n"); + } else if (indexOp.equalsIgnoreCase("delete")) { + sb.append("{\"delete\": { \"_index\": \""); + sb.append(index); + sb.append("\", \"_type\": \""); + sb.append(docType); + sb.append("\", \"_id\": \""); + sb.append(id); + sb.append("\" }\n"); + } + } + } catch (IdentifierNotFoundException infe) { + logger.error(infe.getMessage(), new Object[]{flowFile}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + + } catch (final IOException | SchemaNotFoundException | MalformedRecordException e) { + logger.error("Could not parse incoming data", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), sb.toString()); + final Response getResponse; + try { + getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody); + } catch (final Exception e) { + logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + final int statusCode = getResponse.code(); + + if (isSuccess(statusCode)) { + ResponseBody responseBody = getResponse.body(); + try { + final byte[] bodyBytes = responseBody.bytes(); + + JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes)); + boolean errors = responseJson.get("errors").asBoolean(false); + // ES has no rollback, so if errors occur, log them and route the whole flow file to failure + if (errors) { + ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items"); + if (itemNodeArray.size() > 0) { + // All items are returned whether they succeeded or failed, so iterate through the item array + // at the same time as the flow file list, logging failures accordingly + for (int i = itemNodeArray.size() - 1; i >= 0; i--) { + JsonNode itemNode = itemNodeArray.get(i); + int status = itemNode.findPath("status").asInt(); + if (!isSuccess(status)) { + String reason = itemNode.findPath("//error/reason").asText(); + logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", + new Object[]{flowFile, reason}); + } + } + } + session.transfer(flowFile, REL_FAILURE); + } else { + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().send(flowFile, url.toString()); + } + + } catch (IOException ioe) { + // Something went wrong when parsing the response, log the error and route to failure + logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, ioe); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + } else if (statusCode / 100 == 5) { + // 5xx -> RETRY, but a server error might last a while, so yield + logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...", + new Object[]{statusCode, getResponse.message()}); + session.transfer(flowFile, REL_RETRY); + context.yield(); + } else { // 1xx, 3xx, 4xx, etc. -> NO RETRY + logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()}); + session.transfer(flowFile, REL_FAILURE); + } + getResponse.close(); + } + + private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator) + throws IOException { + RecordSchema schema = record.getSchema(); + + generator.writeStartObject(); + for (int i = 0; i < schema.getFieldCount(); i++) { + final RecordField field = schema.getField(i); + final String fieldName = field.getFieldName(); + final Object value = record.getValue(field); + if (value == null) { + generator.writeNullField(fieldName); + continue; + } + + generator.writeFieldName(fieldName); + final DataType dataType = schema.getDataType(fieldName).get(); + + writeValue(generator, value, fieldName, dataType); + } + generator.writeEndObject(); + } + + @SuppressWarnings("unchecked") + private void writeValue(final JsonGenerator generator, final Object value, final String fieldName, final DataType dataType) throws IOException { + if (value == null) { + generator.writeNull(); + return; + } + + final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType; + final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName); + if (coercedValue == null) { + generator.writeNull(); + return; + } + + switch (chosenDataType.getFieldType()) { + case DATE: { + final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat())); + if (DataTypeUtils.isLongTypeCompatible(stringValue)) { + generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName)); + } else { + generator.writeString(stringValue); + } + break; + } + case TIME: { + final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIME.getDefaultFormat())); + if (DataTypeUtils.isLongTypeCompatible(stringValue)) { + generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName)); + } else { + generator.writeString(stringValue); + } + break; + } + case TIMESTAMP: { + final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat())); + if (DataTypeUtils.isLongTypeCompatible(stringValue)) { + generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName)); + } else { + generator.writeString(stringValue); + } + break; + } + case DOUBLE: + generator.writeNumber(DataTypeUtils.toDouble(coercedValue, fieldName)); + break; + case FLOAT: + generator.writeNumber(DataTypeUtils.toFloat(coercedValue, fieldName)); + break; + case LONG: + generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName)); + break; + case INT: + case BYTE: + case SHORT: + generator.writeNumber(DataTypeUtils.toInteger(coercedValue, fieldName)); + break; + case CHAR: + case STRING: + generator.writeString(coercedValue.toString()); + break; + case BIGINT: + if (coercedValue instanceof Long) { + generator.writeNumber((Long) coercedValue); + } else { + generator.writeNumber((BigInteger) coercedValue); + } + break; + case BOOLEAN: + final String stringValue = coercedValue.toString(); + if ("true".equalsIgnoreCase(stringValue)) { + generator.writeBoolean(true); + } else if ("false".equalsIgnoreCase(stringValue)) { + generator.writeBoolean(false); + } else { + generator.writeString(stringValue); + } + break; + case RECORD: { + final Record record = (Record) coercedValue; + final RecordDataType recordDataType = (RecordDataType) chosenDataType; + final RecordSchema childSchema = recordDataType.getChildSchema(); + writeRecord(record, childSchema, generator); + break; + } + case MAP: { + final MapDataType mapDataType = (MapDataType) chosenDataType; + final DataType valueDataType = mapDataType.getValueType(); + final Map map = (Map) coercedValue; + generator.writeStartObject(); + for (final Map.Entry entry : map.entrySet()) { + final String mapKey = entry.getKey(); + final Object mapValue = entry.getValue(); + generator.writeFieldName(mapKey); + writeValue(generator, mapValue, fieldName + "." + mapKey, valueDataType); + } + generator.writeEndObject(); + break; + } + case ARRAY: + default: + if (coercedValue instanceof Object[]) { + final Object[] values = (Object[]) coercedValue; + final ArrayDataType arrayDataType = (ArrayDataType) dataType; + final DataType elementType = arrayDataType.getElementType(); + writeArray(values, fieldName, generator, elementType); + } else { + generator.writeString(coercedValue.toString()); + } + break; + } + } + + private void writeArray(final Object[] values, final String fieldName, final JsonGenerator generator, final DataType elementType) throws IOException { + generator.writeStartArray(); + for (final Object element : values) { + writeValue(generator, element, fieldName, elementType); + } + generator.writeEndArray(); + } +} + diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index a6cd087d76..11a66e6ff4 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -16,5 +16,6 @@ org.apache.nifi.processors.elasticsearch.FetchElasticsearch org.apache.nifi.processors.elasticsearch.PutElasticsearch org.apache.nifi.processors.elasticsearch.FetchElasticsearchHttp org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp +org.apache.nifi.processors.elasticsearch.PutElasticsearchHttpRecord org.apache.nifi.processors.elasticsearch.QueryElasticsearchHttp org.apache.nifi.processors.elasticsearch.ScrollElasticsearchHttp diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java new file mode 100644 index 0000000000..e93123647b --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import okhttp3.Call; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.net.ConnectException; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestPutElasticsearchHttpRecord { + + private TestRunner runner; + + @After + public void teardown() { + runner = null; + } + + @Test + public void testPutElasticSearchOnTriggerIndex() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures + generateTestData(); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); + runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); + + runner.enqueue(new byte[0], new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + List provEvents = runner.getProvenanceEvents(); + assertNotNull(provEvents); + assertEquals(1, provEvents.size()); + assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType()); + } + + @Test + public void testPutElasticSearchOnTriggerUpdate() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures + generateTestData(); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); + runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); + runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update"); + runner.enqueue(new byte[0], new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testPutElasticSearchOnTriggerDelete() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures + generateTestData(); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); + runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); + runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE"); + runner.enqueue(new byte[0], new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testPutElasticSearchOnTriggerEL() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures + generateTestData(); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}"); + + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); + runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); + runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}"); + runner.assertValid(); + + runner.setVariable("es.url", "http://127.0.0.1:9200"); + runner.setVariable("connect.timeout", "5s"); + + runner.enqueue(new byte[0], new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testPutElasticSearchOnTriggerBadIndexOp() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures + generateTestData(); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); + runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); + runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "${no.attr}"); + runner.enqueue(new byte[0], new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testPutElasticSearchInvalidConfig() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures + generateTestData(); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); + runner.assertValid(); + runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, ""); + runner.assertNotValid(); + runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "index"); + runner.assertValid(); + runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "upsert"); + runner.assertNotValid(); + } + + @Test + public void testPutElasticSearchOnTriggerWithFailures() throws IOException { + PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(true); + processor.setStatus(100, "Should fail"); + runner = TestRunners.newTestRunner(processor); // simulate failures + generateTestData(); + + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); + runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); + + runner.enqueue(new byte[0], new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1); + runner.clearTransferState(); + + processor.setStatus(500, "Should retry"); + runner.enqueue(new byte[0], new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_RETRY, 1); + } + + @Test + public void testPutElasticSearchOnTriggerWithConnectException() throws IOException { + PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(true); + processor.setStatus(-1, "Connection Exception"); + runner = TestRunners.newTestRunner(processor); // simulate failures + generateTestData(); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); + runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); + + runner.enqueue(new byte[0], new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1); + } + + @Test + public void testPutElasticsearchOnTriggerWithNoIdPath() throws Exception { + runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); + generateTestData(); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); + runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/none"); // Field does not exist + + runner.enqueue(new byte[0]); + runner.run(1, true, true); + + runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1); + runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0); + } + + @Test + public void testPutElasticsearchOnTriggerWithNoIdField() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(true)); // simulate failures + generateTestData(); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); + + runner.enqueue(new byte[0]); + runner.run(1, true, true); + + runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1); + runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0); + } + + @Test + public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); + generateTestData(); + runner.setValidateExpressionUsage(false); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "${i}"); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "${type}"); + runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); + + runner.enqueue(new byte[0], new HashMap() {{ + put("doc_id", "28039652144"); + put("i", "doc"); + put("type", "status"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0); + assertNotNull(out); + runner.clearTransferState(); + + // Now try an empty attribute value, should fail + runner.enqueue(new byte[0], new HashMap() {{ + put("doc_id", "28039652144"); + put("type", "status"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1); + final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0); + assertNotNull(out2); + } + + @Test + public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures + generateTestData(); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); + runner.assertValid(); + runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); + runner.assertValid(); + + runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "index_fail"); + runner.assertValid(); + + runner.enqueue(new byte[0], new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0); + assertNotNull(out); + } + + /** + * A Test class that extends the processor in order to inject/mock behavior + */ + private static class PutElasticsearchHttpRecordTestProcessor extends PutElasticsearchHttpRecord { + boolean responseHasFailures = false; + OkHttpClient client; + int statusCode = 200; + String statusMessage = "OK"; + + PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) { + this.responseHasFailures = responseHasFailures; + } + + void setStatus(int code, String message) { + statusCode = code; + statusMessage = message; + } + + @Override + protected void createElasticsearchClient(ProcessContext context) throws ProcessException { + client = mock(OkHttpClient.class); + + when(client.newCall(any(Request.class))).thenAnswer(invocationOnMock -> { + final Call call = mock(Call.class); + if (statusCode != -1) { + Request realRequest = (Request) invocationOnMock.getArguments()[0]; + StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \""); + sb.append(responseHasFailures); + sb.append("\", \"items\": ["); + if (responseHasFailures) { + // This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside + sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\","); + sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\","); + sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at "); + sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},"); + } + sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":"); + sb.append(statusCode); + sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}"); + + sb.append("]}"); + Response mockResponse = new Response.Builder() + .request(realRequest) + .protocol(Protocol.HTTP_1_1) + .code(statusCode) + .message(statusMessage) + .body(ResponseBody.create(MediaType.parse("application/json"), sb.toString())) + .build(); + + when(call.execute()).thenReturn(mockResponse); + } else { + when(call.execute()).thenThrow(ConnectException.class); + } + return call; + }); + } + + protected OkHttpClient getClient() { + return client; + } + } + + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Integration test section below + // + // The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution. + // However if you wish to execute them as part of a test phase, comment out the @Ignored line for each + // desired test. + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Tests basic ES functionality against a local or test ES cluster + */ + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testPutElasticSearchBasic() { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecord()); + runner.setValidateExpressionUsage(false); + + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); + runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); + runner.assertValid(); + + runner.enqueue(new byte[0], new HashMap() {{ + put("doc_id", "28039652140"); + }}); + + runner.enqueue(new byte[0]); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1); + List provEvents = runner.getProvenanceEvents(); + assertNotNull(provEvents); + assertEquals(1, provEvents.size()); + assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType()); + } + + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testPutElasticSearchBatch() throws IOException { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecord()); + runner.setValidateExpressionUsage(false); + + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); + runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); + runner.assertValid(); + + for (int i = 0; i < 100; i++) { + long newId = 28039652140L + i; + final String newStrId = Long.toString(newId); + runner.enqueue(new byte[0], new HashMap() {{ + put("doc_id", newStrId); + }}); + } + runner.run(); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 100); + } + + private void generateTestData() throws IOException { + + final MockRecordParser parser = new MockRecordParser(); + try { + runner.addControllerService("parser", parser); + } catch (InitializationException e) { + throw new IOException(e); + } + runner.enableControllerService(parser); + runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "parser"); + + parser.addSchemaField("id", RecordFieldType.INT); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("code", RecordFieldType.INT); + + parser.addRecord(1, "rec1", 101); + parser.addRecord(2, "rec2", 102); + parser.addRecord(3, "rec3", 103); + parser.addRecord(4, "rec4", 104); + } +}