From 6ca03eae5523c21820af40b215e3f8bb3aecb879 Mon Sep 17 00:00:00 2001 From: dan-s1 Date: Mon, 3 Jun 2024 22:53:56 +0000 Subject: [PATCH] NIFI-13288 Improved SplitXml and SplitAvro to call session.putAttributes() This closes #8917 Signed-off-by: David Handermann --- .../org/apache/nifi/processors/avro/SplitAvro.java | 10 ++++++---- .../org/apache/nifi/processors/standard/SplitXml.java | 9 ++++++--- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/nifi-extension-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java b/nifi-extension-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java index 384ceaf88d..239c127e81 100644 --- a/nifi-extension-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java +++ b/nifi-extension-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java @@ -222,13 +222,15 @@ public class SplitAvro extends AbstractProcessor { try { final List splits = splitter.split(session, flowFile, splitWriter); + final Map attributes = new HashMap<>(); final String fragmentIdentifier = UUID.randomUUID().toString(); + attributes.put(FRAGMENT_ID.key(), fragmentIdentifier); + attributes.put(SEGMENT_ORIGINAL_FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key())); + attributes.put(FRAGMENT_COUNT.key(), Integer.toString(splits.size())); IntStream.range(0, splits.size()).forEach((i) -> { FlowFile split = splits.get(i); - split = session.putAttribute(split, FRAGMENT_ID.key(), fragmentIdentifier); - split = session.putAttribute(split, FRAGMENT_INDEX.key(), Integer.toString(i)); - split = session.putAttribute(split, SEGMENT_ORIGINAL_FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key())); - split = session.putAttribute(split, FRAGMENT_COUNT.key(), Integer.toString(splits.size())); + attributes.put(FRAGMENT_INDEX.key(), Integer.toString(i)); + split = session.putAllAttributes(split, attributes); session.transfer(split, REL_SPLIT); }); final FlowFile originalFlowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, splits.size()); diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java index e73d758528..e3b1804a60 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java @@ -26,6 +26,7 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -146,12 +147,14 @@ public class SplitXml extends AbstractProcessor { final List splits = new ArrayList<>(); final String fragmentIdentifier = UUID.randomUUID().toString(); final AtomicInteger numberOfRecords = new AtomicInteger(0); + final Map attributes = new HashMap<>(); + attributes.put(FRAGMENT_ID.key(), fragmentIdentifier); + attributes.put(SEGMENT_ORIGINAL_FILENAME.key(), original.getAttribute(CoreAttributes.FILENAME.key())); final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(xmlTree -> { FlowFile split = session.create(original); split = session.write(split, out -> out.write(xmlTree.getBytes(StandardCharsets.UTF_8))); - split = session.putAttribute(split, FRAGMENT_ID.key(), fragmentIdentifier); - split = session.putAttribute(split, FRAGMENT_INDEX.key(), Integer.toString(numberOfRecords.getAndIncrement())); - split = session.putAttribute(split, SEGMENT_ORIGINAL_FILENAME.key(), split.getAttribute(CoreAttributes.FILENAME.key())); + attributes.put(FRAGMENT_INDEX.key(), Integer.toString(numberOfRecords.getAndIncrement())); + split = session.putAllAttributes(split, attributes); splits.add(split); }, depth);