mirror of https://github.com/apache/nifi.git
NIFI-3988: Add fragment attributes to SplitRecord
This closes #3217. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
parent
cc47a8c0e1
commit
7548d7c859
|
@ -26,6 +26,7 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
@ -40,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
|
||||||
import org.apache.nifi.processor.AbstractProcessor;
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
@ -66,11 +68,20 @@ import org.apache.nifi.serialization.record.RecordSet;
|
||||||
@Tags({"split", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"})
|
@Tags({"split", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"})
|
||||||
@WritesAttributes({
|
@WritesAttributes({
|
||||||
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer for the FlowFiles routed to the 'splits' Relationship."),
|
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer for the FlowFiles routed to the 'splits' Relationship."),
|
||||||
@WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile. This is added to FlowFiles that are routed to the 'splits' Relationship.")
|
@WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile. This is added to FlowFiles that are routed to the 'splits' Relationship."),
|
||||||
|
@WritesAttribute(attribute = "fragment.identifier", description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
|
||||||
|
@WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"),
|
||||||
|
@WritesAttribute(attribute = "fragment.count", description = "The number of split FlowFiles generated from the parent FlowFile"),
|
||||||
|
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile")
|
||||||
})
|
})
|
||||||
@CapabilityDescription("Splits up an input FlowFile that is in a record-oriented data format into multiple smaller FlowFiles")
|
@CapabilityDescription("Splits up an input FlowFile that is in a record-oriented data format into multiple smaller FlowFiles")
|
||||||
public class SplitRecord extends AbstractProcessor {
|
public class SplitRecord extends AbstractProcessor {
|
||||||
|
|
||||||
|
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
|
||||||
|
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
|
||||||
|
public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
|
||||||
|
public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
|
||||||
|
|
||||||
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
|
||||||
.name("Record Reader")
|
.name("Record Reader")
|
||||||
.description("Specifies the Controller Service to use for reading incoming data")
|
.description("Specifies the Controller Service to use for reading incoming data")
|
||||||
|
@ -125,7 +136,7 @@ public class SplitRecord extends AbstractProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
FlowFile original = session.get();
|
final FlowFile original = session.get();
|
||||||
if (original == null) {
|
if (original == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -137,6 +148,7 @@ public class SplitRecord extends AbstractProcessor {
|
||||||
|
|
||||||
final List<FlowFile> splits = new ArrayList<>();
|
final List<FlowFile> splits = new ArrayList<>();
|
||||||
final Map<String, String> originalAttributes = original.getAttributes();
|
final Map<String, String> originalAttributes = original.getAttributes();
|
||||||
|
final String fragmentId = UUID.randomUUID().toString();
|
||||||
try {
|
try {
|
||||||
session.read(original, new InputStreamCallback() {
|
session.read(original, new InputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -148,6 +160,7 @@ public class SplitRecord extends AbstractProcessor {
|
||||||
final RecordSet recordSet = reader.createRecordSet();
|
final RecordSet recordSet = reader.createRecordSet();
|
||||||
final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);
|
final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);
|
||||||
|
|
||||||
|
int fragmentIndex = 0;
|
||||||
while (pushbackSet.isAnotherRecord()) {
|
while (pushbackSet.isAnotherRecord()) {
|
||||||
FlowFile split = session.create(original);
|
FlowFile split = session.create(original);
|
||||||
|
|
||||||
|
@ -167,6 +180,9 @@ public class SplitRecord extends AbstractProcessor {
|
||||||
|
|
||||||
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
|
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
|
||||||
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
|
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
|
||||||
|
attributes.put(FRAGMENT_INDEX, String.valueOf(fragmentIndex));
|
||||||
|
attributes.put(FRAGMENT_ID, fragmentId);
|
||||||
|
attributes.put(SEGMENT_ORIGINAL_FILENAME, original.getAttribute(CoreAttributes.FILENAME.key()));
|
||||||
attributes.putAll(writeResult.getAttributes());
|
attributes.putAll(writeResult.getAttributes());
|
||||||
|
|
||||||
session.adjustCounter("Records Split", writeResult.getRecordCount(), false);
|
session.adjustCounter("Records Split", writeResult.getRecordCount(), false);
|
||||||
|
@ -176,6 +192,7 @@ public class SplitRecord extends AbstractProcessor {
|
||||||
} finally {
|
} finally {
|
||||||
splits.add(split);
|
splits.add(split);
|
||||||
}
|
}
|
||||||
|
fragmentIndex++;
|
||||||
}
|
}
|
||||||
} catch (final SchemaNotFoundException | MalformedRecordException e) {
|
} catch (final SchemaNotFoundException | MalformedRecordException e) {
|
||||||
throw new ProcessException("Failed to parse incoming data", e);
|
throw new ProcessException("Failed to parse incoming data", e);
|
||||||
|
@ -190,6 +207,10 @@ public class SplitRecord extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
session.transfer(original, REL_ORIGINAL);
|
session.transfer(original, REL_ORIGINAL);
|
||||||
|
// Add the fragment count to each split
|
||||||
|
for(FlowFile split : splits) {
|
||||||
|
session.putAttribute(split, FRAGMENT_COUNT, String.valueOf(splits.size()));
|
||||||
|
}
|
||||||
session.transfer(splits, REL_SPLITS);
|
session.transfer(splits, REL_SPLITS);
|
||||||
getLogger().info("Successfully split {} into {} FlowFiles, each containing up to {} records", new Object[] {original, splits.size(), maxRecords});
|
getLogger().info("Successfully split {} into {} FlowFiles, each containing up to {} records", new Object[] {original, splits.size(), maxRecords});
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
import org.apache.nifi.serialization.record.MockRecordParser;
|
import org.apache.nifi.serialization.record.MockRecordParser;
|
||||||
import org.apache.nifi.serialization.record.MockRecordWriter;
|
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||||
|
@ -55,7 +56,7 @@ public class TestSplitRecord {
|
||||||
readerService.addRecord("Jane Doe", 47);
|
readerService.addRecord("Jane Doe", 47);
|
||||||
readerService.addRecord("Jimmy Doe", 14);
|
readerService.addRecord("Jimmy Doe", 14);
|
||||||
|
|
||||||
runner.enqueue("");
|
final MockFlowFile inputFlowFile = runner.enqueue("");
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
runner.assertTransferCount(SplitRecord.REL_SPLITS, 3);
|
runner.assertTransferCount(SplitRecord.REL_SPLITS, 3);
|
||||||
|
@ -63,11 +64,23 @@ public class TestSplitRecord {
|
||||||
runner.assertTransferCount(SplitRecord.REL_FAILURE, 0);
|
runner.assertTransferCount(SplitRecord.REL_FAILURE, 0);
|
||||||
final List<MockFlowFile> out = runner.getFlowFilesForRelationship(SplitRecord.REL_SPLITS);
|
final List<MockFlowFile> out = runner.getFlowFilesForRelationship(SplitRecord.REL_SPLITS);
|
||||||
|
|
||||||
|
int fragmentIndex = 0;
|
||||||
|
String fragmentUUID = null;
|
||||||
for (final MockFlowFile mff : out) {
|
for (final MockFlowFile mff : out) {
|
||||||
|
if (fragmentUUID == null) {
|
||||||
|
fragmentUUID = mff.getAttribute(SplitRecord.FRAGMENT_ID);
|
||||||
|
} else {
|
||||||
|
mff.assertAttributeEquals(SplitRecord.FRAGMENT_ID, fragmentUUID);
|
||||||
|
}
|
||||||
|
mff.assertAttributeEquals(SplitRecord.FRAGMENT_COUNT, "3");
|
||||||
|
mff.assertAttributeEquals(SplitRecord.FRAGMENT_INDEX, String.valueOf(fragmentIndex));
|
||||||
|
mff.assertAttributeEquals(SplitRecord.SEGMENT_ORIGINAL_FILENAME, inputFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
|
||||||
mff.assertAttributeEquals("record.count", "1");
|
mff.assertAttributeEquals("record.count", "1");
|
||||||
mff.assertAttributeEquals("mime.type", "text/plain");
|
mff.assertAttributeEquals("mime.type", "text/plain");
|
||||||
|
fragmentIndex++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJohn Doe,48\n")).count());
|
assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJohn Doe,48\n")).count());
|
||||||
assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJane Doe,47\n")).count());
|
assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJane Doe,47\n")).count());
|
||||||
assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJimmy Doe,14\n")).count());
|
assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJimmy Doe,14\n")).count());
|
||||||
|
|
Loading…
Reference in New Issue