From 44c9ea0a660b5d3ee9551307a6b83249f4e8491c Mon Sep 17 00:00:00 2001 From: Bryan Rosander Date: Tue, 20 Dec 2016 16:39:22 -0500 Subject: [PATCH] NIFI-3236 - SplitJson performance improvements This closes #1347. Signed-off-by: Koji Kawamura --- .../nifi/processors/standard/SplitJson.java | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java index caa5a0bb27..003834ea00 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java @@ -20,11 +20,12 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; @@ -37,6 +38,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -101,6 +103,7 @@ public class SplitJson extends AbstractJsonPathProcessor { private Set relationships; private final AtomicReference JSON_PATH_REF = new AtomicReference<>(); + private volatile String nullDefaultValue; @Override protected void init(final ProcessorInitializationContext context) { @@ -156,6 +159,11 @@ public class SplitJson extends AbstractJsonPathProcessor { return Collections.singleton(validator.validate(ARRAY_JSON_PATH_EXPRESSION.getName(), value, validationContext)); } + @OnScheduled + public void onScheduled(ProcessContext processContext) { + nullDefaultValue = NULL_REPRESENTATION_MAP.get(processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue()); + } + @Override public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) { final FlowFile original = processSession.get(); @@ -165,7 +173,7 @@ public class SplitJson extends AbstractJsonPathProcessor { final ComponentLog logger = getLogger(); - DocumentContext documentContext = null; + DocumentContext documentContext; try { documentContext = validateAndEstablishJsonContext(processSession, original); } catch (InvalidJsonException e) { @@ -175,10 +183,6 @@ public class SplitJson extends AbstractJsonPathProcessor { } final JsonPath jsonPath = JSON_PATH_REF.get(); - String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue(); - final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption); - - final List segments = new ArrayList<>(); Object jsonPathResult; try { @@ -196,9 +200,11 @@ public class SplitJson extends AbstractJsonPathProcessor { } List resultList = (List) jsonPathResult; - AtomicInteger jsonLineCount = new AtomicInteger(0); - final String fragmentIdentifier = UUID.randomUUID().toString(); + Map attributes = new HashMap<>(); + attributes.put("fragment.identifier", UUID.randomUUID().toString()); + attributes.put("fragment.count", Integer.toString(resultList.size())); + for (int i = 0; i < resultList.size(); i++) { Object resultSegment = resultList.get(i); FlowFile split = processSession.create(original); @@ -207,19 +213,12 @@ public class SplitJson extends AbstractJsonPathProcessor { out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8)); } ); - split = processSession.putAttribute(split, "fragment.identifier", fragmentIdentifier); - split = processSession.putAttribute(split, "fragment.index", Integer.toString(i)); - split = processSession.putAttribute(split, "segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key())); - segments.add(split); - jsonLineCount.incrementAndGet(); + attributes.put("segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key())); + attributes.put("fragment.index", Integer.toString(i)); + processSession.transfer(processSession.putAllAttributes(split, attributes), REL_SPLIT); } - segments.forEach((segment) -> { - segment = processSession.putAttribute(segment, "fragment.count", Integer.toString(jsonLineCount.get())); - processSession.transfer(segment, REL_SPLIT); - }); - processSession.transfer(original, REL_ORIGINAL); - logger.info("Split {} into {} FlowFiles", new Object[]{original, segments.size()}); + logger.info("Split {} into {} FlowFiles", new Object[]{original, resultList.size()}); } -} +} \ No newline at end of file