From dda42824b616e5ef448c04672447663b0a621f04 Mon Sep 17 00:00:00 2001 From: Jim Steinebrey Date: Fri, 7 Jun 2024 20:44:55 -0400 Subject: [PATCH] NIFI-13199 Update ValidateRecord to avoid writing to FlowFiles that will be auto-terminated Signed-off-by: Matt Burgess This closes #8942 --- .../processors/standard/ValidateRecord.java | 100 ++++++++++-------- 1 file changed, 55 insertions(+), 45 deletions(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java index c9edc2e280..672399dd0f 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java @@ -324,64 +324,64 @@ public class ValidateRecord extends AbstractProcessor { final SchemaValidationResult result = validator.validate(record); recordCount++; - RecordSetWriter writer; if (result.isValid()) { validCount++; if (validFlowFile == null) { validFlowFile = session.create(flowFile); } - validWriter = writer = createIfNecessary(validWriter, validRecordWriterFactory, session, validFlowFile, validationSchema); + validWriter = createIfNecessary(validWriter, validRecordWriterFactory, session, validFlowFile, validationSchema); + writeRecord(validWriter, record); } else { invalidCount++; logValidationErrors(flowFile, recordCount, result); - if (invalidFlowFile == null) { - invalidFlowFile = session.create(flowFile); - } - - invalidWriter = writer = createIfNecessary(invalidWriter, invalidRecordWriterFactory, session, invalidFlowFile, record.getSchema()); - - // Add all of the validation errors to our Set but only keep up to MAX_VALIDATION_ERRORS because if - // we keep too many then we both use up a lot of heap and risk outputting so much information in the Provenance Event - // that it is too noisy to be useful. - for (final ValidationError validationError : result.getValidationErrors()) { - final Optional fieldName = validationError.getFieldName(); - - switch (validationError.getType()) { - case EXTRA_FIELD: - if (fieldName.isPresent()) { - extraFields.add(fieldName.get()); - } else { - otherProblems.add(validationError.getExplanation()); - } - break; - case MISSING_FIELD: - if (fieldName.isPresent()) { - missingFields.add(fieldName.get()); - } else { - otherProblems.add(validationError.getExplanation()); - } - break; - case INVALID_FIELD: - if (fieldName.isPresent()) { - invalidFields.add(fieldName.get()); - } else { - otherProblems.add(validationError.getExplanation()); - } - break; - case OTHER: - otherProblems.add(validationError.getExplanation()); - break; + if (!context.isAutoTerminated(REL_INVALID)) { + // If REL_INVALID is not autoTerminated, then create a flow file and calculate the invalid details. + // If it is autoTerminated, then skip doing work which will just be discarded. + if (invalidFlowFile == null) { + invalidFlowFile = session.create(flowFile); } - } - } - if (writer instanceof RawRecordWriter) { - ((RawRecordWriter) writer).writeRawRecord(record); - } else { - writer.write(record); + invalidWriter = createIfNecessary(invalidWriter, invalidRecordWriterFactory, session, invalidFlowFile, record.getSchema()); + + // Add all of the validation errors to our Set but only keep up to MAX_VALIDATION_ERRORS because if + // we keep too many then we both use up a lot of heap and risk outputting so much information in the Provenance Event + // that it is too noisy to be useful. + for (final ValidationError validationError : result.getValidationErrors()) { + final Optional fieldName = validationError.getFieldName(); + + switch (validationError.getType()) { + case EXTRA_FIELD: + if (fieldName.isPresent()) { + extraFields.add(fieldName.get()); + } else { + otherProblems.add(validationError.getExplanation()); + } + break; + case MISSING_FIELD: + if (fieldName.isPresent()) { + missingFields.add(fieldName.get()); + } else { + otherProblems.add(validationError.getExplanation()); + } + break; + case INVALID_FIELD: + if (fieldName.isPresent()) { + invalidFields.add(fieldName.get()); + } else { + otherProblems.add(validationError.getExplanation()); + } + break; + case OTHER: + otherProblems.add(validationError.getExplanation()); + break; + } + } + + writeRecord(invalidWriter, record); + } } } @@ -450,6 +450,16 @@ public class ValidateRecord extends AbstractProcessor { session.remove(flowFile); } + private void writeRecord(final RecordSetWriter writer, final Record record) throws IOException { + if (writer != null) { + if (writer instanceof RawRecordWriter) { + ((RawRecordWriter) writer).writeRawRecord(record); + } else { + writer.write(record); + } + } + } + private void closeQuietly(final RecordSetWriter writer) { if (writer != null) { try {