NIFI-4002: Add PutElasticsearchHttpRecord processor

This closes #1878

Signed-off-by: Bryan Rosander <brosander@apache.org>
This commit is contained in:
Matt Burgess 2017-06-01 12:28:28 -04:00 committed by Bryan Rosander
parent de6a98618a
commit 0bddcfe730
No known key found for this signature in database
GPG Key ID: 8BF5FDF157DCD9F3
5 changed files with 1105 additions and 0 deletions

View File

@ -41,6 +41,18 @@ language governing permissions and limitations under the License. -->
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId> <artifactId>nifi-utils</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-path</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
@ -55,6 +67,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-mock</artifactId> <artifactId>nifi-mock</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.elasticsearch</groupId> <groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId> <artifactId>elasticsearch</artifactId>

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
/**
* A domain-specific exception for when a valid Elasticsearch document identifier is expected but not found
*/
public class IdentifierNotFoundException extends Exception {
public IdentifierNotFoundException() {
}
public IdentifierNotFoundException(String message) {
super(message);
}
public IdentifierNotFoundException(String message, Throwable cause) {
super(message, cause);
}
public IdentifierNotFoundException(Throwable cause) {
super(cause);
}
public IdentifierNotFoundException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,565 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StringUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.node.ArrayNode;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http", "record"})
@CapabilityDescription("Writes the records from a FlowFile into to Elasticsearch, using the specified parameters such as "
+ "the index to insert into and the type of the document, as well as the operation type (index, upsert, delete, etc.). Note: The Bulk API is used to "
+ "send the records. This means that the entire contents of the incoming flow file are read into memory, and each record is transformed into a JSON document "
+ "which is added to a single HTTP request body. For very large flow files (files with a large number of records, e.g.), this could cause memory usage issues.")
public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build();
public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
.build();
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("put-es-record-record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-es-record-id-path")
.displayName("Identifier Record Path")
.description("A RecordPath pointing to a field in the record(s) that contains the identifier for the document. If the Index Operation is \"index\", "
+ "this property may be left empty or evaluate to an empty value, in which case the document's identifier will be "
+ "auto-generated by Elasticsearch. For all other Index Operations, the field's value must be non-empty.")
.required(false)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(true)
.build();
static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.name("put-es-record-index")
.displayName("Index")
.description("The name of the index to insert into")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
AttributeExpression.ResultType.STRING, true))
.build();
static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("put-es-record-type")
.displayName("Type")
.description("The type of this document (used by Elasticsearch for indexing and searching)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
.name("put-es-record-index-op")
.displayName("Index Operation")
.description("The type of the operation used to index (index, update, upsert, delete)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.defaultValue("index")
.build();
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
private volatile RecordPathCache recordPathCache;
private final JsonFactory factory = new JsonFactory();
static {
final Set<Relationship> _rels = new HashSet<>();
_rels.add(REL_SUCCESS);
_rels.add(REL_FAILURE);
_rels.add(REL_RETRY);
relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ES_URL);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(CONNECT_TIMEOUT);
descriptors.add(RESPONSE_TIMEOUT);
descriptors.add(RECORD_READER);
descriptors.add(ID_RECORD_PATH);
descriptors.add(INDEX);
descriptors.add(TYPE);
descriptors.add(INDEX_OP);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
// Since Expression Language is allowed for index operation, we can't guarantee that we can catch
// all invalid configurations, but we should catch them as soon as we can. For example, if the
// Identifier Record Path property is empty, the Index Operation must evaluate to "index".
String idPath = validationContext.getProperty(ID_RECORD_PATH).getValue();
String indexOp = validationContext.getProperty(INDEX_OP).getValue();
if (StringUtils.isEmpty(idPath)) {
switch (indexOp.toLowerCase()) {
case "update":
case "upsert":
case "delete":
case "":
problems.add(new ValidationResult.Builder()
.valid(false)
.subject(INDEX_OP.getDisplayName())
.explanation("If Identifier Record Path is not set, Index Operation must evaluate to \"index\"")
.build());
break;
default:
break;
}
}
return problems;
}
@OnScheduled
public void setup(ProcessContext context) {
super.setup(context);
recordPathCache = new RecordPathCache(10);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
// Authentication
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
OkHttpClient okHttpClient = getClient();
final ComponentLog logger = getLogger();
final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
final URL url;
try {
url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk");
} catch (MalformedURLException mue) {
// Since we have a URL validator, something has gone very wrong, throw a ProcessException
context.yield();
throw new ProcessException(mue);
}
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(index)) {
logger.error("No value for index in for {}, transferring to failure", new Object[]{flowFile});
session.transfer(flowFile, REL_FAILURE);
return;
}
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(indexOp)) {
logger.error("No Index operation specified for {}, transferring to failure.", new Object[]{flowFile});
session.transfer(flowFile, REL_FAILURE);
return;
}
switch (indexOp.toLowerCase()) {
case "index":
case "update":
case "upsert":
case "delete":
break;
default:
logger.error("Index operation {} not supported for {}, transferring to failure.", new Object[]{indexOp, flowFile});
session.transfer(flowFile, REL_FAILURE);
return;
}
final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path);
final StringBuilder sb = new StringBuilder();
try (final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
Record record;
while ((record = reader.nextRecord()) != null) {
final String id;
if (recordPath != null) {
Optional<FieldValue> idPathValue = recordPath.evaluate(record).getSelectedFields().findFirst();
if (!idPathValue.isPresent() || idPathValue.get().getValue() == null) {
throw new IdentifierNotFoundException("Identifier Record Path specified but no value was found, transferring {} to failure.");
}
id = idPathValue.get().getValue().toString();
} else {
id = null;
}
// The ID must be valid for all operations except "index". For that case,
// a missing ID indicates one is to be auto-generated by Elasticsearch
if (id == null && !indexOp.equalsIgnoreCase("index")) {
throw new IdentifierNotFoundException("Index operation {} requires a valid identifier value from a flow file attribute, transferring to failure.");
}
final StringBuilder json = new StringBuilder();
ByteArrayOutputStream out = new ByteArrayOutputStream();
JsonGenerator generator = factory.createJsonGenerator(out);
writeRecord(record, record.getSchema(), generator);
generator.flush();
generator.close();
json.append(out.toString());
if (indexOp.equalsIgnoreCase("index")) {
sb.append("{\"index\": { \"_index\": \"");
sb.append(index);
sb.append("\", \"_type\": \"");
sb.append(docType);
sb.append("\"");
if (!StringUtils.isEmpty(id)) {
sb.append(", \"_id\": \"");
sb.append(id);
sb.append("\"");
}
sb.append("}}\n");
sb.append(json);
sb.append("\n");
} else if (indexOp.equalsIgnoreCase("upsert") || indexOp.equalsIgnoreCase("update")) {
sb.append("{\"update\": { \"_index\": \"");
sb.append(index);
sb.append("\", \"_type\": \"");
sb.append(docType);
sb.append("\", \"_id\": \"");
sb.append(id);
sb.append("\" }\n");
sb.append("{\"doc\": ");
sb.append(json);
sb.append(", \"doc_as_upsert\": ");
sb.append(indexOp.equalsIgnoreCase("upsert"));
sb.append(" }\n");
} else if (indexOp.equalsIgnoreCase("delete")) {
sb.append("{\"delete\": { \"_index\": \"");
sb.append(index);
sb.append("\", \"_type\": \"");
sb.append(docType);
sb.append("\", \"_id\": \"");
sb.append(id);
sb.append("\" }\n");
}
}
} catch (IdentifierNotFoundException infe) {
logger.error(infe.getMessage(), new Object[]{flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
} catch (final IOException | SchemaNotFoundException | MalformedRecordException e) {
logger.error("Could not parse incoming data", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), sb.toString());
final Response getResponse;
try {
getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody);
} catch (final Exception e) {
logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final int statusCode = getResponse.code();
if (isSuccess(statusCode)) {
ResponseBody responseBody = getResponse.body();
try {
final byte[] bodyBytes = responseBody.bytes();
JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
boolean errors = responseJson.get("errors").asBoolean(false);
// ES has no rollback, so if errors occur, log them and route the whole flow file to failure
if (errors) {
ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
if (itemNodeArray.size() > 0) {
// All items are returned whether they succeeded or failed, so iterate through the item array
// at the same time as the flow file list, logging failures accordingly
for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
JsonNode itemNode = itemNodeArray.get(i);
int status = itemNode.findPath("status").asInt();
if (!isSuccess(status)) {
String reason = itemNode.findPath("//error/reason").asText();
logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure",
new Object[]{flowFile, reason});
}
}
}
session.transfer(flowFile, REL_FAILURE);
} else {
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, url.toString());
}
} catch (IOException ioe) {
// Something went wrong when parsing the response, log the error and route to failure
logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, ioe);
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
} else if (statusCode / 100 == 5) {
// 5xx -> RETRY, but a server error might last a while, so yield
logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...",
new Object[]{statusCode, getResponse.message()});
session.transfer(flowFile, REL_RETRY);
context.yield();
} else { // 1xx, 3xx, 4xx, etc. -> NO RETRY
logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
session.transfer(flowFile, REL_FAILURE);
}
getResponse.close();
}
private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator)
throws IOException {
RecordSchema schema = record.getSchema();
generator.writeStartObject();
for (int i = 0; i < schema.getFieldCount(); i++) {
final RecordField field = schema.getField(i);
final String fieldName = field.getFieldName();
final Object value = record.getValue(field);
if (value == null) {
generator.writeNullField(fieldName);
continue;
}
generator.writeFieldName(fieldName);
final DataType dataType = schema.getDataType(fieldName).get();
writeValue(generator, value, fieldName, dataType);
}
generator.writeEndObject();
}
@SuppressWarnings("unchecked")
private void writeValue(final JsonGenerator generator, final Object value, final String fieldName, final DataType dataType) throws IOException {
if (value == null) {
generator.writeNull();
return;
}
final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
if (coercedValue == null) {
generator.writeNull();
return;
}
switch (chosenDataType.getFieldType()) {
case DATE: {
final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
} else {
generator.writeString(stringValue);
}
break;
}
case TIME: {
final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIME.getDefaultFormat()));
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
} else {
generator.writeString(stringValue);
}
break;
}
case TIMESTAMP: {
final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
} else {
generator.writeString(stringValue);
}
break;
}
case DOUBLE:
generator.writeNumber(DataTypeUtils.toDouble(coercedValue, fieldName));
break;
case FLOAT:
generator.writeNumber(DataTypeUtils.toFloat(coercedValue, fieldName));
break;
case LONG:
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
break;
case INT:
case BYTE:
case SHORT:
generator.writeNumber(DataTypeUtils.toInteger(coercedValue, fieldName));
break;
case CHAR:
case STRING:
generator.writeString(coercedValue.toString());
break;
case BIGINT:
if (coercedValue instanceof Long) {
generator.writeNumber((Long) coercedValue);
} else {
generator.writeNumber((BigInteger) coercedValue);
}
break;
case BOOLEAN:
final String stringValue = coercedValue.toString();
if ("true".equalsIgnoreCase(stringValue)) {
generator.writeBoolean(true);
} else if ("false".equalsIgnoreCase(stringValue)) {
generator.writeBoolean(false);
} else {
generator.writeString(stringValue);
}
break;
case RECORD: {
final Record record = (Record) coercedValue;
final RecordDataType recordDataType = (RecordDataType) chosenDataType;
final RecordSchema childSchema = recordDataType.getChildSchema();
writeRecord(record, childSchema, generator);
break;
}
case MAP: {
final MapDataType mapDataType = (MapDataType) chosenDataType;
final DataType valueDataType = mapDataType.getValueType();
final Map<String, ?> map = (Map<String, ?>) coercedValue;
generator.writeStartObject();
for (final Map.Entry<String, ?> entry : map.entrySet()) {
final String mapKey = entry.getKey();
final Object mapValue = entry.getValue();
generator.writeFieldName(mapKey);
writeValue(generator, mapValue, fieldName + "." + mapKey, valueDataType);
}
generator.writeEndObject();
break;
}
case ARRAY:
default:
if (coercedValue instanceof Object[]) {
final Object[] values = (Object[]) coercedValue;
final ArrayDataType arrayDataType = (ArrayDataType) dataType;
final DataType elementType = arrayDataType.getElementType();
writeArray(values, fieldName, generator, elementType);
} else {
generator.writeString(coercedValue.toString());
}
break;
}
}
private void writeArray(final Object[] values, final String fieldName, final JsonGenerator generator, final DataType elementType) throws IOException {
generator.writeStartArray();
for (final Object element : values) {
writeValue(generator, element, fieldName, elementType);
}
generator.writeEndArray();
}
}

View File

@ -16,5 +16,6 @@ org.apache.nifi.processors.elasticsearch.FetchElasticsearch
org.apache.nifi.processors.elasticsearch.PutElasticsearch org.apache.nifi.processors.elasticsearch.PutElasticsearch
org.apache.nifi.processors.elasticsearch.FetchElasticsearchHttp org.apache.nifi.processors.elasticsearch.FetchElasticsearchHttp
org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp
org.apache.nifi.processors.elasticsearch.PutElasticsearchHttpRecord
org.apache.nifi.processors.elasticsearch.QueryElasticsearchHttp org.apache.nifi.processors.elasticsearch.QueryElasticsearchHttp
org.apache.nifi.processors.elasticsearch.ScrollElasticsearchHttp org.apache.nifi.processors.elasticsearch.ScrollElasticsearchHttp

View File

@ -0,0 +1,480 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import okhttp3.Call;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.net.ConnectException;
import java.util.HashMap;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPutElasticsearchHttpRecord {
private TestRunner runner;
@After
public void teardown() {
runner = null;
}
@Test
public void testPutElasticSearchOnTriggerIndex() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
assertNotNull(provEvents);
assertEquals(1, provEvents.size());
assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
}
@Test
public void testPutElasticSearchOnTriggerUpdate() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerDelete() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerEL() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
runner.assertValid();
runner.setVariable("es.url", "http://127.0.0.1:9200");
runner.setVariable("connect.timeout", "5s");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerBadIndexOp() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "${no.attr}");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchInvalidConfig() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.assertValid();
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "");
runner.assertNotValid();
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "index");
runner.assertValid();
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "upsert");
runner.assertNotValid();
}
@Test
public void testPutElasticSearchOnTriggerWithFailures() throws IOException {
PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(true);
processor.setStatus(100, "Should fail");
runner = TestRunners.newTestRunner(processor); // simulate failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
runner.clearTransferState();
processor.setStatus(500, "Should retry");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_RETRY, 1);
}
@Test
public void testPutElasticSearchOnTriggerWithConnectException() throws IOException {
PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(true);
processor.setStatus(-1, "Connection Exception");
runner = TestRunners.newTestRunner(processor); // simulate failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
}
@Test
public void testPutElasticsearchOnTriggerWithNoIdPath() throws Exception {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false));
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/none"); // Field does not exist
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0);
}
@Test
public void testPutElasticsearchOnTriggerWithNoIdField() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(true)); // simulate failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0);
}
@Test
public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false));
generateTestData();
runner.setValidateExpressionUsage(false);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "${i}");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "${type}");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652144");
put("i", "doc");
put("type", "status");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
runner.clearTransferState();
// Now try an empty attribute value, should fail
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652144");
put("type", "status");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
assertNotNull(out2);
}
@Test
public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.assertValid();
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.assertValid();
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "index_fail");
runner.assertValid();
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
assertNotNull(out);
}
/**
* A Test class that extends the processor in order to inject/mock behavior
*/
private static class PutElasticsearchHttpRecordTestProcessor extends PutElasticsearchHttpRecord {
boolean responseHasFailures = false;
OkHttpClient client;
int statusCode = 200;
String statusMessage = "OK";
PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
this.responseHasFailures = responseHasFailures;
}
void setStatus(int code, String message) {
statusCode = code;
statusMessage = message;
}
@Override
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
client = mock(OkHttpClient.class);
when(client.newCall(any(Request.class))).thenAnswer(invocationOnMock -> {
final Call call = mock(Call.class);
if (statusCode != -1) {
Request realRequest = (Request) invocationOnMock.getArguments()[0];
StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
sb.append(responseHasFailures);
sb.append("\", \"items\": [");
if (responseHasFailures) {
// This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\",");
sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at ");
sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},");
}
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
sb.append(statusCode);
sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}");
sb.append("]}");
Response mockResponse = new Response.Builder()
.request(realRequest)
.protocol(Protocol.HTTP_1_1)
.code(statusCode)
.message(statusMessage)
.body(ResponseBody.create(MediaType.parse("application/json"), sb.toString()))
.build();
when(call.execute()).thenReturn(mockResponse);
} else {
when(call.execute()).thenThrow(ConnectException.class);
}
return call;
});
}
protected OkHttpClient getClient() {
return client;
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Integration test section below
//
// The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution.
// However if you wish to execute them as part of a test phase, comment out the @Ignored line for each
// desired test.
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* Tests basic ES functionality against a local or test ES cluster
*/
@Test
@Ignore("Comment this out if you want to run against local or test ES")
public void testPutElasticSearchBasic() {
System.out.println("Starting test " + new Object() {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecord());
runner.setValidateExpressionUsage(false);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.assertValid();
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
assertNotNull(provEvents);
assertEquals(1, provEvents.size());
assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
}
@Test
@Ignore("Comment this out if you want to run against local or test ES")
public void testPutElasticSearchBatch() throws IOException {
System.out.println("Starting test " + new Object() {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecord());
runner.setValidateExpressionUsage(false);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.assertValid();
for (int i = 0; i < 100; i++) {
long newId = 28039652140L + i;
final String newStrId = Long.toString(newId);
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", newStrId);
}});
}
runner.run();
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 100);
}
private void generateTestData() throws IOException {
final MockRecordParser parser = new MockRecordParser();
try {
runner.addControllerService("parser", parser);
} catch (InitializationException e) {
throw new IOException(e);
}
runner.enableControllerService(parser);
runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "parser");
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("code", RecordFieldType.INT);
parser.addRecord(1, "rec1", 101);
parser.addRecord(2, "rec2", 102);
parser.addRecord(3, "rec3", 103);
parser.addRecord(4, "rec4", 104);
}
}