mirror of https://github.com/apache/nifi.git
NIFI-3236 - SplitJson performance improvements
This closes #1347. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
parent
4285157675
commit
44c9ea0a66
|
@ -20,11 +20,12 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
@ -37,6 +38,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
|
|||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
|
@ -101,6 +103,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
|||
private Set<Relationship> relationships;
|
||||
|
||||
private final AtomicReference<JsonPath> JSON_PATH_REF = new AtomicReference<>();
|
||||
private volatile String nullDefaultValue;
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
|
@ -156,6 +159,11 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
|||
return Collections.singleton(validator.validate(ARRAY_JSON_PATH_EXPRESSION.getName(), value, validationContext));
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(ProcessContext processContext) {
|
||||
nullDefaultValue = NULL_REPRESENTATION_MAP.get(processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) {
|
||||
final FlowFile original = processSession.get();
|
||||
|
@ -165,7 +173,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
|||
|
||||
final ComponentLog logger = getLogger();
|
||||
|
||||
DocumentContext documentContext = null;
|
||||
DocumentContext documentContext;
|
||||
try {
|
||||
documentContext = validateAndEstablishJsonContext(processSession, original);
|
||||
} catch (InvalidJsonException e) {
|
||||
|
@ -175,10 +183,6 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
|||
}
|
||||
|
||||
final JsonPath jsonPath = JSON_PATH_REF.get();
|
||||
String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue();
|
||||
final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption);
|
||||
|
||||
final List<FlowFile> segments = new ArrayList<>();
|
||||
|
||||
Object jsonPathResult;
|
||||
try {
|
||||
|
@ -196,9 +200,11 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
|||
}
|
||||
|
||||
List resultList = (List) jsonPathResult;
|
||||
AtomicInteger jsonLineCount = new AtomicInteger(0);
|
||||
|
||||
final String fragmentIdentifier = UUID.randomUUID().toString();
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("fragment.identifier", UUID.randomUUID().toString());
|
||||
attributes.put("fragment.count", Integer.toString(resultList.size()));
|
||||
|
||||
for (int i = 0; i < resultList.size(); i++) {
|
||||
Object resultSegment = resultList.get(i);
|
||||
FlowFile split = processSession.create(original);
|
||||
|
@ -207,19 +213,12 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
|||
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
);
|
||||
split = processSession.putAttribute(split, "fragment.identifier", fragmentIdentifier);
|
||||
split = processSession.putAttribute(split, "fragment.index", Integer.toString(i));
|
||||
split = processSession.putAttribute(split, "segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
|
||||
segments.add(split);
|
||||
jsonLineCount.incrementAndGet();
|
||||
attributes.put("segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
|
||||
attributes.put("fragment.index", Integer.toString(i));
|
||||
processSession.transfer(processSession.putAllAttributes(split, attributes), REL_SPLIT);
|
||||
}
|
||||
|
||||
segments.forEach((segment) -> {
|
||||
segment = processSession.putAttribute(segment, "fragment.count", Integer.toString(jsonLineCount.get()));
|
||||
processSession.transfer(segment, REL_SPLIT);
|
||||
});
|
||||
|
||||
processSession.transfer(original, REL_ORIGINAL);
|
||||
logger.info("Split {} into {} FlowFiles", new Object[]{original, segments.size()});
|
||||
logger.info("Split {} into {} FlowFiles", new Object[]{original, resultList.size()});
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue