diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java new file mode 100644 index 0000000000..9a505a2284 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.WriteResult; + +@EventDriven +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@SideEffectFree +@Tags({"convert", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"}) +@WritesAttributes({ + @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"), + @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile") +}) +@CapabilityDescription("Converts records from one data format to another using configured Record Reader and Record Write Controller Services. " + + "The Reader and Writer must be configured with \"matching\" schemas. By this, we mean the schemas must have the same field names. The types of the fields " + + "do not have to be the same if a field value can be coerced from one type to another. For instance, if the input schema has a field named \"balance\" of type double, " + + "the output schema can have a field named \"balance\" with a type of string, double, or float. If any field is present in the input that is not present in the output, " + + "the field will be left out of the output. If any field is specified in the output schema but is not present in the input data/schema, then the field will not be " + + "present in the output or will have a null value, depending on the writer.") +public class ConvertRecord extends AbstractProcessor { + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for reading incoming data") + .identifiesControllerService(RowRecordReaderFactory.class) + .required(true) + .build(); + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles that are successfully transformed will be routed to this relationship") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile cannot be transformed from the configured input format to the configured output format, " + + "the unchanged FlowFile will be routed to this relationship") + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(RECORD_READER); + properties.add(RECORD_WRITER); + return properties; + } + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RowRecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RowRecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordSetWriter writer = writerFactory.createWriter(getLogger()); + + final AtomicReference writeResultRef = new AtomicReference<>(); + + final FlowFile original = flowFile; + try { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(final InputStream in, final OutputStream out) throws IOException { + try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { + + final WriteResult writeResult = writer.write(reader.createRecordSet(), out); + writeResultRef.set(writeResult); + + } catch (final MalformedRecordException e) { + throw new ProcessException("Could not parse incoming data", e); + } + } + }); + } catch (final ProcessException e) { + getLogger().error("Failed to convert {}", new Object[] {flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final WriteResult writeResult = writeResultRef.get(); + + final Map attributes = new HashMap<>(); + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + session.adjustCounter("Records Converted", writeResult.getRecordCount(), false); + getLogger().info("Successfully converted {} records for {}", new Object[] {writeResult.getRecordCount(), flowFile}); + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 2f2b0cbc0e..3891ee620c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -18,6 +18,7 @@ org.apache.nifi.processors.standard.CompressContent org.apache.nifi.processors.standard.ControlRate org.apache.nifi.processors.standard.ConvertCharacterSet org.apache.nifi.processors.standard.ConvertJSONToSQL +org.apache.nifi.processors.standard.ConvertRecord org.apache.nifi.processors.standard.DebugFlow org.apache.nifi.processors.standard.DetectDuplicate org.apache.nifi.processors.standard.DistributeLoad diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java new file mode 100644 index 0000000000..0dcaeec977 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertTrue; + +import org.apache.nifi.processors.standard.util.record.MockRecordParser; +import org.apache.nifi.processors.standard.util.record.MockRecordWriter; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestConvertRecord { + + @Test + public void testSuccessfulConversion() throws InitializationException { + final MockRecordParser readerService = new MockRecordParser(); + final MockRecordWriter writerService = new MockRecordWriter("header", false); + + final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(ConvertRecord.RECORD_READER, "reader"); + runner.setProperty(ConvertRecord.RECORD_WRITER, "writer"); + + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + + readerService.addRecord("John Doe", 48); + readerService.addRecord("Jane Doe", 47); + readerService.addRecord("Jimmy Doe", 14); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0); + + out.assertAttributeEquals("record.count", "3"); + out.assertAttributeEquals("mime.type", "text/plain"); + out.assertContentEquals("header\nJohn Doe,48\nJane Doe,47\nJimmy Doe,14\n"); + } + + + @Test + public void testReadFailure() throws InitializationException { + final MockRecordParser readerService = new MockRecordParser(2); + final MockRecordWriter writerService = new MockRecordWriter("header", false); + + final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(ConvertRecord.RECORD_READER, "reader"); + runner.setProperty(ConvertRecord.RECORD_WRITER, "writer"); + + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + + readerService.addRecord("John Doe", 48); + readerService.addRecord("Jane Doe", 47); + readerService.addRecord("Jimmy Doe", 14); + + final MockFlowFile original = runner.enqueue("hello"); + runner.run(); + + // Original FlowFile should be routed to 'failure' relationship without modification + runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0); + assertTrue(original == out); + } + + + @Test + public void testWriteFailure() throws InitializationException { + final MockRecordParser readerService = new MockRecordParser(); + final MockRecordWriter writerService = new MockRecordWriter("header", false, 2); + + final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(ConvertRecord.RECORD_READER, "reader"); + runner.setProperty(ConvertRecord.RECORD_WRITER, "writer"); + + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + + readerService.addRecord("John Doe", 48); + readerService.addRecord("Jane Doe", 47); + readerService.addRecord("Jimmy Doe", 14); + + final MockFlowFile original = runner.enqueue("hello"); + runner.run(); + + // Original FlowFile should be routed to 'failure' relationship without modification + runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0); + assertTrue(original == out); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java index 1cf2a28e7f..0a57b29f14 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java @@ -31,9 +31,21 @@ import org.apache.nifi.serialization.record.RecordSet; public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory { private final String header; + private final int failAfterN; + private final boolean quoteValues; public MockRecordWriter(final String header) { + this(header, true, -1); + } + + public MockRecordWriter(final String header, final boolean quoteValues) { + this(header, quoteValues, -1); + } + + public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN) { this.header = header; + this.quoteValues = quoteValues; + this.failAfterN = failAfterN; } @Override @@ -48,13 +60,20 @@ public class MockRecordWriter extends AbstractControllerService implements Recor final int numCols = rs.getSchema().getFieldCount(); Record record = null; while ((record = rs.next()) != null) { - recordCount++; + if (++recordCount > failAfterN && failAfterN > -1) { + throw new IOException("Unit Test intentionally throwing IOException after " + failAfterN + " records were written"); + } + int i = 0; for (final String fieldName : record.getSchema().getFieldNames()) { final String val = record.getAsString(fieldName); - out.write("\"".getBytes()); - out.write(val.getBytes()); - out.write("\"".getBytes()); + if (quoteValues) { + out.write("\"".getBytes()); + out.write(val.getBytes()); + out.write("\"".getBytes()); + } else { + out.write(val.getBytes()); + } if (i++ < numCols - 1) { out.write(",".getBytes()); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java index b728498f22..add248e870 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; /** *

@@ -51,4 +52,29 @@ public interface RecordReader extends Closeable { * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse the underlying data */ RecordSchema getSchema() throws MalformedRecordException; + + /** + * @return a RecordSet that returns the records in this Record Reader in a streaming fashion + */ + default RecordSet createRecordSet() { + return new RecordSet() { + @Override + public RecordSchema getSchema() throws IOException { + try { + return RecordReader.this.getSchema(); + } catch (final MalformedRecordException mre) { + throw new IOException(mre); + } + } + + @Override + public Record next() throws IOException { + try { + return RecordReader.this.nextRecord(); + } catch (final MalformedRecordException mre) { + throw new IOException(mre); + } + } + }; + } }