NIFI-5005: Fix MergeRecord to honor writer's schema

This closes #6141.

Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
Matthew Burgess 2022-06-21 11:24:16 -04:00 committed by Tamas Palfy
parent d8ebfb25d0
commit 9d379121c4
4 changed files with 72 additions and 15 deletions

View File

@ -30,6 +30,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
@ -38,12 +39,14 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
private final boolean quoteValues;
private final boolean bufferOutput;
private final RecordSchema writeSchema;
public MockRecordWriter() {
this(null);
}
public MockRecordWriter(final String header) {
this(header, true, -1, false);
this(header, true, -1, false, null);
}
public MockRecordWriter(final String header, final boolean quoteValues) {
@ -51,23 +54,24 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN) {
this(header, quoteValues, failAfterN, false);
this(header, quoteValues, failAfterN, false, null);
}
public MockRecordWriter(final String header, final boolean quoteValues, final boolean bufferOutput) {
this(header, quoteValues, -1, bufferOutput);
this(header, quoteValues, -1, bufferOutput, null);
}
public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN, final boolean bufferOutput) {
public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN, final boolean bufferOutput, final RecordSchema writeSchema) {
this.header = header;
this.quoteValues = quoteValues;
this.failAfterN = failAfterN;
this.bufferOutput = bufferOutput;
this.writeSchema = writeSchema;
}
@Override
public RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
return new SimpleRecordSchema(Collections.emptyList());
return (writeSchema != null) ? writeSchema : new SimpleRecordSchema(Collections.emptyList());
}
@Override
@ -78,6 +82,8 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
private int recordCount = 0;
private boolean headerWritten = false;
private RecordSchema writerSchema = schema;
@Override
public void flush() throws IOException {
out.flush();
@ -98,10 +104,18 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
throw new IOException("Unit Test intentionally throwing IOException after " + failAfterN + " records were written");
}
final int numCols = record.getSchema().getFieldCount();
final int numCols;
final List<String> fieldNames;
if (this.writerSchema != null && this.writerSchema.getFieldCount() != 0) {
fieldNames = this.writerSchema.getFieldNames();
numCols = this.writerSchema.getFieldCount();
} else {
fieldNames = record.getSchema().getFieldNames();
numCols = record.getSchema().getFieldCount();
}
int i = 0;
for (final String fieldName : record.getSchema().getFieldNames()) {
for (final String fieldName : fieldNames) {
final String val = record.getAsString(fieldName);
if (val != null) {
if (quoteValues) {
@ -140,9 +154,17 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
headerWritten = true;
}
final int numCols = record.getSchema().getFieldCount();
final int numCols;
final List<String> fieldNames;
if (this.writerSchema != null && this.writerSchema.getFieldCount() != 0) {
fieldNames = this.writerSchema.getFieldNames();
numCols = this.writerSchema.getFieldCount();
} else {
fieldNames = record.getSchema().getFieldNames();
numCols = record.getSchema().getFieldCount();
}
int i = 0;
for (final String fieldName : record.getSchema().getFieldNames()) {
for (final String fieldName : fieldNames) {
final String val = record.getAsString(fieldName);
if (val != null) {
if (quoteValues) {

View File

@ -28,6 +28,7 @@ import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import java.io.IOException;
@ -130,7 +131,8 @@ public class RecordBin {
this.out = new ByteCountingOutputStream(rawOut);
recordWriter = writerFactory.createWriter(logger, recordReader.getSchema(), out, flowFile);
RecordSchema outputSchema = writerFactory.getSchema(flowFile.getAttributes(), recordReader.getSchema());
recordWriter = writerFactory.createWriter(logger, outputSchema, out, flowFile);
recordWriter.beginRecordSet();
}

View File

@ -172,7 +172,7 @@ public class TestForkRecord {
final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
mff.assertAttributeEquals("record.count", "2");
mff.assertContentEquals("header\n42,4750.89,John Doe,123 My Street,My City,MS,11111,USA\n43,48212.38,John Doe,123 My Street,My City,MS,11111,USA\n");
mff.assertContentEquals("header\n42,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n43,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n");
}
@Test
@ -288,8 +288,8 @@ public class TestForkRecord {
final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
mff.assertAttributeEquals("record.count", "4");
mff.assertContentEquals("header\n5,150.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n6,-15.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n"
+ "7,36.78,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n8,-21.34,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n");
mff.assertContentEquals("header\n5,John Doe,123 My Street,My City,MS,11111,USA,4750.89,150.31\n6,John Doe,123 My Street,My City,MS,11111,USA,4750.89,-15.31\n"
+ "7,John Doe,123 My Street,My City,MS,11111,USA,48212.38,36.78\n8,John Doe,123 My Street,My City,MS,11111,USA,48212.38,-21.34\n");
}
@Test
@ -369,8 +369,8 @@ public class TestForkRecord {
final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
mff.assertAttributeEquals("record.count", "4");
mff.assertContentEquals("header\n5,150.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n6,-15.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n"
+ "7,36.78,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n8,-21.34,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n");
mff.assertContentEquals("header\n5,John Doe,123 My Street,My City,MS,11111,USA,4750.89,150.31\n6,John Doe,123 My Street,My City,MS,11111,USA,4750.89,-15.31\n"
+ "7,John Doe,123 My Street,My City,MS,11111,USA,48212.38,36.78\n8,John Doe,123 My Street,My City,MS,11111,USA,48212.38,-21.34\n");
}
@Test

View File

@ -19,8 +19,12 @@ package org.apache.nifi.processors.standard;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.CommaSeparatedRecordReader;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -28,6 +32,7 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -93,6 +98,34 @@ public class TestMergeRecord {
ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
}
@Test
public void testMergeSimpleDifferentWriteSchema() throws InitializationException {
// Exclude Age field
List<RecordField> writeFields = Collections.singletonList(
new RecordField("Name", RecordFieldType.STRING.getDataType())
);
RecordSchema writeSchema = new SimpleRecordSchema(writeFields);
writerService = new MockRecordWriter("header", false, -1, true, writeSchema);
runner.addControllerService("differentWriter", writerService);
runner.enableControllerService(writerService);
runner.setProperty(MergeRecord.RECORD_READER, "reader");
runner.setProperty(MergeRecord.RECORD_WRITER, "differentWriter");
runner.enqueue("Name, Age\nJohn, 35");
runner.enqueue("Name, Age\nJane, 34");
runner.run(2);
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
final MockFlowFile mff = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
mff.assertAttributeEquals("record.count", "2");
mff.assertContentEquals("header\nJohn\nJane\n");
runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach(
ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
}
// Verify that FlowFiles are grouped with like schemas.
@Test