NIFI-13199 Update ValidateRecord to avoid writing to FlowFiles that will be auto-terminated

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #8942
This commit is contained in:
Jim Steinebrey 2024-06-07 20:44:55 -04:00 committed by Matt Burgess
parent bc95799a39
commit dda42824b6
1 changed files with 55 additions and 45 deletions

View File

@ -324,64 +324,64 @@ public class ValidateRecord extends AbstractProcessor {
final SchemaValidationResult result = validator.validate(record); final SchemaValidationResult result = validator.validate(record);
recordCount++; recordCount++;
RecordSetWriter writer;
if (result.isValid()) { if (result.isValid()) {
validCount++; validCount++;
if (validFlowFile == null) { if (validFlowFile == null) {
validFlowFile = session.create(flowFile); validFlowFile = session.create(flowFile);
} }
validWriter = writer = createIfNecessary(validWriter, validRecordWriterFactory, session, validFlowFile, validationSchema); validWriter = createIfNecessary(validWriter, validRecordWriterFactory, session, validFlowFile, validationSchema);
writeRecord(validWriter, record);
} else { } else {
invalidCount++; invalidCount++;
logValidationErrors(flowFile, recordCount, result); logValidationErrors(flowFile, recordCount, result);
if (invalidFlowFile == null) { if (!context.isAutoTerminated(REL_INVALID)) {
invalidFlowFile = session.create(flowFile); // If REL_INVALID is not autoTerminated, then create a flow file and calculate the invalid details.
} // If it is autoTerminated, then skip doing work which will just be discarded.
if (invalidFlowFile == null) {
invalidWriter = writer = createIfNecessary(invalidWriter, invalidRecordWriterFactory, session, invalidFlowFile, record.getSchema()); invalidFlowFile = session.create(flowFile);
// Add all of the validation errors to our Set<ValidationError> but only keep up to MAX_VALIDATION_ERRORS because if
// we keep too many then we both use up a lot of heap and risk outputting so much information in the Provenance Event
// that it is too noisy to be useful.
for (final ValidationError validationError : result.getValidationErrors()) {
final Optional<String> fieldName = validationError.getFieldName();
switch (validationError.getType()) {
case EXTRA_FIELD:
if (fieldName.isPresent()) {
extraFields.add(fieldName.get());
} else {
otherProblems.add(validationError.getExplanation());
}
break;
case MISSING_FIELD:
if (fieldName.isPresent()) {
missingFields.add(fieldName.get());
} else {
otherProblems.add(validationError.getExplanation());
}
break;
case INVALID_FIELD:
if (fieldName.isPresent()) {
invalidFields.add(fieldName.get());
} else {
otherProblems.add(validationError.getExplanation());
}
break;
case OTHER:
otherProblems.add(validationError.getExplanation());
break;
} }
}
}
if (writer instanceof RawRecordWriter) { invalidWriter = createIfNecessary(invalidWriter, invalidRecordWriterFactory, session, invalidFlowFile, record.getSchema());
((RawRecordWriter) writer).writeRawRecord(record);
} else { // Add all of the validation errors to our Set<ValidationError> but only keep up to MAX_VALIDATION_ERRORS because if
writer.write(record); // we keep too many then we both use up a lot of heap and risk outputting so much information in the Provenance Event
// that it is too noisy to be useful.
for (final ValidationError validationError : result.getValidationErrors()) {
final Optional<String> fieldName = validationError.getFieldName();
switch (validationError.getType()) {
case EXTRA_FIELD:
if (fieldName.isPresent()) {
extraFields.add(fieldName.get());
} else {
otherProblems.add(validationError.getExplanation());
}
break;
case MISSING_FIELD:
if (fieldName.isPresent()) {
missingFields.add(fieldName.get());
} else {
otherProblems.add(validationError.getExplanation());
}
break;
case INVALID_FIELD:
if (fieldName.isPresent()) {
invalidFields.add(fieldName.get());
} else {
otherProblems.add(validationError.getExplanation());
}
break;
case OTHER:
otherProblems.add(validationError.getExplanation());
break;
}
}
writeRecord(invalidWriter, record);
}
} }
} }
@ -450,6 +450,16 @@ public class ValidateRecord extends AbstractProcessor {
session.remove(flowFile); session.remove(flowFile);
} }
private void writeRecord(final RecordSetWriter writer, final Record record) throws IOException {
if (writer != null) {
if (writer instanceof RawRecordWriter) {
((RawRecordWriter) writer).writeRawRecord(record);
} else {
writer.write(record);
}
}
}
private void closeQuietly(final RecordSetWriter writer) { private void closeQuietly(final RecordSetWriter writer) {
if (writer != null) { if (writer != null) {
try { try {