diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java new file mode 100644 index 0000000000..05aa98cec2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.PushBackRecordSet; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; + +@EventDriven +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@SideEffectFree +@Tags({"split", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"}) +@WritesAttributes({ + @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer for the FlowFiles routed to the 'splits' Relationship."), + @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile. This is added to FlowFiles that are routed to the 'splits' Relationship.") +}) +@CapabilityDescription("Splits up an input FlowFile that is in a record-oriented data format into multiple smaller FlowFiles") +public class SplitRecord extends AbstractProcessor { + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("Record Reader") + .description("Specifies the Controller Service to use for reading incoming data") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + static final PropertyDescriptor RECORDS_PER_SPLIT = new PropertyDescriptor.Builder() + .name("Records Per Split") + .description("Specifies how many records should be written to each 'split' or 'segment' FlowFile") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + + static final Relationship REL_SPLITS = new Relationship.Builder() + .name("splits") + .description("The individual 'segments' of the original FlowFile will be routed to this relationship.") + .build(); + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Upon successfully splitting an input FlowFile, the original FlowFile will be sent to this relationship.") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile cannot be transformed from the configured input format to the configured output format, " + + "the unchanged FlowFile will be routed to this relationship.") + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(RECORD_READER); + properties.add(RECORD_WRITER); + properties.add(RECORDS_PER_SPLIT); + return properties; + } + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_SPLITS); + relationships.add(REL_ORIGINAL); + relationships.add(REL_FAILURE); + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile original = session.get(); + if (original == null) { + return; + } + + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordSetWriter writer; + try (final InputStream rawIn = session.read(original); + final InputStream in = new BufferedInputStream(rawIn)) { + writer = writerFactory.createWriter(getLogger(), original, in); + } catch (final Exception e) { + getLogger().error("Failed to create Record Writer for {}; routing to failure", new Object[] {original, e}); + session.transfer(original, REL_FAILURE); + return; + } + + final int maxRecords = context.getProperty(RECORDS_PER_SPLIT).evaluateAttributeExpressions(original).asInteger(); + + final List splits = new ArrayList<>(); + try { + session.read(original, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { + + final RecordSet recordSet = reader.createRecordSet(); + final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet); + + while (pushbackSet.isAnotherRecord()) { + FlowFile split = session.create(original); + + try { + final AtomicReference writeResultRef = new AtomicReference<>(); + split = session.write(split, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + if (maxRecords == 1) { + final Record record = pushbackSet.next(); + writeResultRef.set(writer.write(record, out)); + } else { + final RecordSet limitedSet = pushbackSet.limit(maxRecords); + writeResultRef.set(writer.write(limitedSet, out)); + } + } + }); + + final WriteResult writeResult = writeResultRef.get(); + + final Map attributes = new HashMap<>(); + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + + session.adjustCounter("Records Split", writeResult.getRecordCount(), false); + split = session.putAllAttributes(split, attributes); + } finally { + splits.add(split); + } + } + } catch (final SchemaNotFoundException | MalformedRecordException e) { + throw new ProcessException("Failed to parse incoming data", e); + } + } + }); + } catch (final ProcessException pe) { + getLogger().error("Failed to split {}", new Object[] {original, pe}); + session.remove(splits); + session.transfer(original, REL_FAILURE); + return; + } + + session.transfer(original, REL_ORIGINAL); + session.transfer(splits, REL_SPLITS); + getLogger().info("Successfully split {} into {} FlowFiles, each containing up to {} records", new Object[] {original, splits.size(), maxRecords}); + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index f82c6370c0..b4085c894f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -88,6 +88,7 @@ org.apache.nifi.processors.standard.ScanContent org.apache.nifi.processors.standard.SegmentContent org.apache.nifi.processors.standard.SplitContent org.apache.nifi.processors.standard.SplitJson +org.apache.nifi.processors.standard.SplitRecord org.apache.nifi.processors.standard.SplitText org.apache.nifi.processors.standard.SplitXml org.apache.nifi.processors.standard.TailFile diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java new file mode 100644 index 0000000000..4c3bff4afb --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.nifi.processors.standard.util.record.MockRecordParser; +import org.apache.nifi.processors.standard.util.record.MockRecordWriter; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestSplitRecord { + + @Test + public void testIndividualRecordPerSplit() throws InitializationException { + final MockRecordParser readerService = new MockRecordParser(); + final MockRecordWriter writerService = new MockRecordWriter("header", false); + + final TestRunner runner = TestRunners.newTestRunner(SplitRecord.class); + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(SplitRecord.RECORD_READER, "reader"); + runner.setProperty(SplitRecord.RECORD_WRITER, "writer"); + runner.setProperty(SplitRecord.RECORDS_PER_SPLIT, "1"); + + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + + readerService.addRecord("John Doe", 48); + readerService.addRecord("Jane Doe", 47); + readerService.addRecord("Jimmy Doe", 14); + + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(SplitRecord.REL_SPLITS, 3); + runner.assertTransferCount(SplitRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitRecord.REL_FAILURE, 0); + final List out = runner.getFlowFilesForRelationship(SplitRecord.REL_SPLITS); + + for (final MockFlowFile mff : out) { + mff.assertAttributeEquals("record.count", "1"); + mff.assertAttributeEquals("mime.type", "text/plain"); + } + + assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJohn Doe,48\n")).count()); + assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJane Doe,47\n")).count()); + assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJimmy Doe,14\n")).count()); + } + + @Test + public void testMultipleRecordsPerSplit() throws InitializationException { + final MockRecordParser readerService = new MockRecordParser(); + final MockRecordWriter writerService = new MockRecordWriter("header", false); + + final TestRunner runner = TestRunners.newTestRunner(SplitRecord.class); + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(SplitRecord.RECORD_READER, "reader"); + runner.setProperty(SplitRecord.RECORD_WRITER, "writer"); + runner.setProperty(SplitRecord.RECORDS_PER_SPLIT, "2"); + + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + + readerService.addRecord("John Doe", 48); + readerService.addRecord("Jane Doe", 47); + readerService.addRecord("Jimmy Doe", 14); + + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(SplitRecord.REL_SPLITS, 2); + runner.assertTransferCount(SplitRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitRecord.REL_FAILURE, 0); + final List out = runner.getFlowFilesForRelationship(SplitRecord.REL_SPLITS); + + assertEquals(1, out.stream().filter(mff -> mff.getAttribute("record.count").equals("1")).count()); + assertTrue(out.stream().allMatch(mff -> mff.getAttribute("mime.type").equals("text/plain"))); + + assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJohn Doe,48\nJane Doe,47\n")).count()); + assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJimmy Doe,14\n")).count()); + } + + @Test + public void testAllSplitsOneDesintation() throws InitializationException { + final MockRecordParser readerService = new MockRecordParser(); + final MockRecordWriter writerService = new MockRecordWriter("header", false); + + final TestRunner runner = TestRunners.newTestRunner(SplitRecord.class); + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(SplitRecord.RECORD_READER, "reader"); + runner.setProperty(SplitRecord.RECORD_WRITER, "writer"); + runner.setProperty(SplitRecord.RECORDS_PER_SPLIT, "3"); + + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + + readerService.addRecord("John Doe", 48); + readerService.addRecord("Jane Doe", 47); + readerService.addRecord("Jimmy Doe", 14); + + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(SplitRecord.REL_SPLITS, 1); + runner.assertTransferCount(SplitRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitRecord.REL_FAILURE, 0); + final MockFlowFile out = runner.getFlowFilesForRelationship(SplitRecord.REL_SPLITS).get(0); + + out.assertAttributeEquals("record.count", "3"); + out.assertAttributeEquals("mime.type", "text/plain"); + + out.assertContentEquals("header\nJohn Doe,48\nJane Doe,47\nJimmy Doe,14\n"); + } + + + @Test + public void testReadFailure() throws InitializationException { + final MockRecordParser readerService = new MockRecordParser(2); + final MockRecordWriter writerService = new MockRecordWriter("header", false); + + final TestRunner runner = TestRunners.newTestRunner(SplitRecord.class); + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(SplitRecord.RECORD_READER, "reader"); + runner.setProperty(SplitRecord.RECORD_WRITER, "writer"); + runner.setProperty(SplitRecord.RECORDS_PER_SPLIT, "1"); + + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + + readerService.addRecord("John Doe", 48); + readerService.addRecord("Jane Doe", 47); + readerService.addRecord("Jimmy Doe", 14); + + final MockFlowFile original = runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred(SplitRecord.REL_FAILURE, 1); + final MockFlowFile failed = runner.getFlowFilesForRelationship(SplitRecord.REL_FAILURE).get(0); + assertTrue(original == failed); + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java index 1dbfd04224..ca16bcdf4e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java @@ -96,7 +96,28 @@ public class MockRecordWriter extends AbstractControllerService implements Recor @Override public WriteResult write(Record record, OutputStream out) throws IOException { - return null; + out.write(header.getBytes()); + out.write("\n".getBytes()); + + final int numCols = record.getSchema().getFieldCount(); + int i = 0; + for (final String fieldName : record.getSchema().getFieldNames()) { + final String val = record.getAsString(fieldName); + if (quoteValues) { + out.write("\"".getBytes()); + out.write(val.getBytes()); + out.write("\"".getBytes()); + } else { + out.write(val.getBytes()); + } + + if (i++ < numCols - 1) { + out.write(",".getBytes()); + } + } + out.write("\n".getBytes()); + + return WriteResult.of(1, Collections.emptyMap()); } }; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java new file mode 100644 index 0000000000..a1866117b4 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.io.IOException; + +public class PushBackRecordSet implements RecordSet { + private final RecordSet original; + private Record pushback; + + public PushBackRecordSet(final RecordSet original) { + this.original = original; + } + + @Override + public RecordSchema getSchema() throws IOException { + return original.getSchema(); + } + + @Override + public Record next() throws IOException { + if (pushback != null) { + final Record record = pushback; + pushback = null; + return record; + } + + return original.next(); + } + + public void pushback(final Record record) { + if (pushback != null) { + throw new IllegalStateException("RecordSet already has a Record pushed back. Cannot push back more than one record at a time."); + } + + this.pushback = record; + } + + public boolean isAnotherRecord() throws IOException { + if (pushback != null) { + return true; + } + + final Record nextRecord = next(); + if (nextRecord == null) { + return false; + } + + pushback(nextRecord); + return true; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSet.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSet.java index 25bbcdcb64..9e67346f15 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSet.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSet.java @@ -31,6 +31,44 @@ public interface RecordSet { */ Record next() throws IOException; + /** + * Returns a new Record Set that will return no more than {@code maxRecords} records from this + * RecordSet. Any Records that are pulled from this newly created RecordSet will also advance + * the cursor in this Record Set and vice versa. + * + * @param maxRecords the maximum number of records to return from the new RecordSet + * @return a view of this RecordSet that limits the number of records returned + */ + default RecordSet limit(final int maxRecords) { + if (maxRecords < 0) { + throw new IllegalArgumentException("Cannot limit number of records to " + maxRecords + ". Limit must be a non-negative integer"); + } + + final RecordSet original = this; + return new RecordSet() { + private int count = 0; + + @Override + public RecordSchema getSchema() throws IOException { + return original.getSchema(); + } + + @Override + public Record next() throws IOException { + if (count >= maxRecords) { + return null; + } + + final Record record = original.next(); + if (record != null) { + count++; + } + + return record; + } + }; + } + public static RecordSet of(final RecordSchema schema, final Record... records) { return new RecordSet() { private int index = 0;