NIFI-13288 Improved SplitXml and SplitAvro to call session.putAttributes()

This closes #8917

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
dan-s1 2024-06-03 22:53:56 +00:00 committed by exceptionfactory
parent 89f153aad8
commit 6ca03eae55
No known key found for this signature in database
2 changed files with 12 additions and 7 deletions

View File

@ -222,13 +222,15 @@ public class SplitAvro extends AbstractProcessor {
try { try {
final List<FlowFile> splits = splitter.split(session, flowFile, splitWriter); final List<FlowFile> splits = splitter.split(session, flowFile, splitWriter);
final Map<String, String> attributes = new HashMap<>();
final String fragmentIdentifier = UUID.randomUUID().toString(); final String fragmentIdentifier = UUID.randomUUID().toString();
attributes.put(FRAGMENT_ID.key(), fragmentIdentifier);
attributes.put(SEGMENT_ORIGINAL_FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
attributes.put(FRAGMENT_COUNT.key(), Integer.toString(splits.size()));
IntStream.range(0, splits.size()).forEach((i) -> { IntStream.range(0, splits.size()).forEach((i) -> {
FlowFile split = splits.get(i); FlowFile split = splits.get(i);
split = session.putAttribute(split, FRAGMENT_ID.key(), fragmentIdentifier); attributes.put(FRAGMENT_INDEX.key(), Integer.toString(i));
split = session.putAttribute(split, FRAGMENT_INDEX.key(), Integer.toString(i)); split = session.putAllAttributes(split, attributes);
split = session.putAttribute(split, SEGMENT_ORIGINAL_FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
split = session.putAttribute(split, FRAGMENT_COUNT.key(), Integer.toString(splits.size()));
session.transfer(split, REL_SPLIT); session.transfer(split, REL_SPLIT);
}); });
final FlowFile originalFlowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, splits.size()); final FlowFile originalFlowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, splits.size());

View File

@ -26,6 +26,7 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -146,12 +147,14 @@ public class SplitXml extends AbstractProcessor {
final List<FlowFile> splits = new ArrayList<>(); final List<FlowFile> splits = new ArrayList<>();
final String fragmentIdentifier = UUID.randomUUID().toString(); final String fragmentIdentifier = UUID.randomUUID().toString();
final AtomicInteger numberOfRecords = new AtomicInteger(0); final AtomicInteger numberOfRecords = new AtomicInteger(0);
final Map<String, String> attributes = new HashMap<>();
attributes.put(FRAGMENT_ID.key(), fragmentIdentifier);
attributes.put(SEGMENT_ORIGINAL_FILENAME.key(), original.getAttribute(CoreAttributes.FILENAME.key()));
final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(xmlTree -> { final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(xmlTree -> {
FlowFile split = session.create(original); FlowFile split = session.create(original);
split = session.write(split, out -> out.write(xmlTree.getBytes(StandardCharsets.UTF_8))); split = session.write(split, out -> out.write(xmlTree.getBytes(StandardCharsets.UTF_8)));
split = session.putAttribute(split, FRAGMENT_ID.key(), fragmentIdentifier); attributes.put(FRAGMENT_INDEX.key(), Integer.toString(numberOfRecords.getAndIncrement()));
split = session.putAttribute(split, FRAGMENT_INDEX.key(), Integer.toString(numberOfRecords.getAndIncrement())); split = session.putAllAttributes(split, attributes);
split = session.putAttribute(split, SEGMENT_ORIGINAL_FILENAME.key(), split.getAttribute(CoreAttributes.FILENAME.key()));
splits.add(split); splits.add(split);
}, depth); }, depth);