NIFI-5356: Added record.count and failure.count attributes to PutElasticsearchHttpRecord

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2835.
This commit is contained in:
Matthew Burgess 2018-07-02 15:18:26 -04:00 committed by Pierre Villard
parent 22ec069acb
commit ee86af0c15
2 changed files with 36 additions and 19 deletions

View File

@ -29,6 +29,8 @@ import okhttp3.ResponseBody;
import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -88,6 +90,10 @@ import java.util.Set;
+ "the index to insert into and the type of the document, as well as the operation type (index, upsert, delete, etc.). Note: The Bulk API is used to " + "the index to insert into and the type of the document, as well as the operation type (index, upsert, delete, etc.). Note: The Bulk API is used to "
+ "send the records. This means that the entire contents of the incoming flow file are read into memory, and each record is transformed into a JSON document " + "send the records. This means that the entire contents of the incoming flow file are read into memory, and each record is transformed into a JSON document "
+ "which is added to a single HTTP request body. For very large flow files (files with a large number of records, e.g.), this could cause memory usage issues.") + "which is added to a single HTTP request body. For very large flow files (files with a large number of records, e.g.), this could cause memory usage issues.")
@WritesAttributes({
@WritesAttribute(attribute="record.count", description="The number of records in an outgoing FlowFile. This is only populated on the 'success' relationship."),
@WritesAttribute(attribute="failure.count", description="The number of records found by Elasticsearch to have errors. This is only populated on the 'failure' relationship.")
})
@DynamicProperty( @DynamicProperty(
name = "A URL query parameter", name = "A URL query parameter",
value = "The value to set it to", value = "The value to set it to",
@ -308,6 +314,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path); final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path);
final StringBuilder sb = new StringBuilder(); final StringBuilder sb = new StringBuilder();
int recordCount = 0;
try (final InputStream in = session.read(flowFile); try (final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) { final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
@ -341,6 +348,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
json.append(out.toString()); json.append(out.toString());
buildBulkCommand(sb, index, docType, indexOp, id, json.toString()); buildBulkCommand(sb, index, docType, indexOp, id, json.toString());
recordCount++;
} }
} catch (IdentifierNotFoundException infe) { } catch (IdentifierNotFoundException infe) {
logger.error(infe.getMessage(), new Object[]{flowFile}); logger.error(infe.getMessage(), new Object[]{flowFile});
@ -374,34 +382,40 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes)); JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
boolean errors = responseJson.get("errors").asBoolean(false); boolean errors = responseJson.get("errors").asBoolean(false);
int failureCount = 0;
// ES has no rollback, so if errors occur, log them and route the whole flow file to failure // ES has no rollback, so if errors occur, log them and route the whole flow file to failure
if (errors) { if (errors) {
ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items"); ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
if (itemNodeArray.size() > 0) { if(itemNodeArray != null) {
// All items are returned whether they succeeded or failed, so iterate through the item array if (itemNodeArray.size() > 0) {
// at the same time as the flow file list, moving each to success or failure accordingly, // All items are returned whether they succeeded or failed, so iterate through the item array
// but only keep the first error for logging // at the same time as the flow file list, moving each to success or failure accordingly,
String errorReason = null; // but only keep the first error for logging
for (int i = itemNodeArray.size() - 1; i >= 0; i--) { String errorReason = null;
JsonNode itemNode = itemNodeArray.get(i); for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
int status = itemNode.findPath("status").asInt(); JsonNode itemNode = itemNodeArray.get(i);
if (!isSuccess(status)) { int status = itemNode.findPath("status").asInt();
if (errorReason == null) { if (!isSuccess(status)) {
// Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason if (errorReason == null) {
String reason = itemNode.findPath("result").asText(); // Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason
if (StringUtils.isEmpty(reason)) { String reason = itemNode.findPath("result").asText();
// If there was no result, we expect an error with a string description in the "reason" field if (StringUtils.isEmpty(reason)) {
reason = itemNode.findPath("reason").asText(); // If there was no result, we expect an error with a string description in the "reason" field
reason = itemNode.findPath("reason").asText();
}
errorReason = reason;
logger.error("Failed to process {} due to {}, transferring to failure",
new Object[]{flowFile, errorReason});
} }
errorReason = reason; failureCount++;
logger.error("Failed to process {} due to {}, transferring to failure",
new Object[]{flowFile, errorReason});
} }
} }
} }
} }
flowFile = session.putAttribute(flowFile, "failure.count", Integer.toString(failureCount));
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
} else { } else {
flowFile = session.putAttribute(flowFile, "record.count", Integer.toString(recordCount));
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, url.toString()); session.getProvenanceReporter().send(flowFile, url.toString());
} }

View File

@ -78,6 +78,7 @@ public class TestPutElasticsearchHttpRecord {
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0); final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out); assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140"); out.assertAttributeEquals("doc_id", "28039652140");
out.assertAttributeEquals("record.count", "4");
List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents(); List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
assertNotNull(provEvents); assertNotNull(provEvents);
assertEquals(1, provEvents.size()); assertEquals(1, provEvents.size());
@ -263,8 +264,10 @@ public class TestPutElasticsearchHttpRecord {
runner.enqueue(new byte[0]); runner.enqueue(new byte[0]);
runner.run(1, true, true); runner.run(1, true, true);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0); runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
flowFile.assertAttributeEquals("failure.count", "1");
} }
@Test @Test