diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java index 813a07d812..cddfcebbeb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.processors.standard; -import java.io.IOException; +import java.io.BufferedOutputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -26,8 +26,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.StringUtils; @@ -40,6 +42,8 @@ import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnRemoved; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -50,14 +54,13 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.stream.io.BufferedOutputStream; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.InvalidJsonException; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.PathNotFoundException; -import java.util.concurrent.atomic.AtomicReference; + +import java.util.stream.Collectors; @EventDriven @SideEffectFree @@ -140,6 +143,13 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { private final ConcurrentMap cachedJsonPathMap = new ConcurrentHashMap<>(); + private final Queue>> attributeToJsonPathEntrySetQueue = new ConcurrentLinkedQueue<>(); + private volatile String representationOption; + private volatile boolean destinationIsAttribute; + private volatile String returnType; + private volatile String pathNotFound; + private volatile String nullDefaultValue; + @Override protected void init(final ProcessorInitializationContext context) { final Set relationships = new HashSet<>(); @@ -230,9 +240,25 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { } } + @OnScheduled + public void onScheduled(ProcessContext processContext) { + representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue(); + destinationIsAttribute = DESTINATION_ATTRIBUTE.equals(processContext.getProperty(DESTINATION).getValue()); + returnType = processContext.getProperty(RETURN_TYPE).getValue(); + if (returnType.equals(RETURN_TYPE_AUTO)) { + returnType = destinationIsAttribute ? RETURN_TYPE_SCALAR : RETURN_TYPE_JSON; + } + pathNotFound = processContext.getProperty(PATH_NOT_FOUND).getValue(); + nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption); + } + + @OnUnscheduled + public void onUnscheduled() { + attributeToJsonPathEntrySetQueue.clear(); + } + @Override public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) throws ProcessException { - FlowFile flowFile = processSession.get(); if (flowFile == null) { return; @@ -240,27 +266,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { final ComponentLog logger = getLogger(); - String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue(); - final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption); - - /* Build the JsonPath expressions from attributes */ - final Map attributeToJsonPathMap = new HashMap<>(); - - for (final Map.Entry entry : processContext.getProperties().entrySet()) { - if (!entry.getKey().isDynamic()) { - continue; - } - final JsonPath jsonPath = JsonPath.compile(entry.getValue()); - attributeToJsonPathMap.put(entry.getKey().getName(), jsonPath); - } - - final String destination = processContext.getProperty(DESTINATION).getValue(); - String returnType = processContext.getProperty(RETURN_TYPE).getValue(); - if (returnType.equals(RETURN_TYPE_AUTO)) { - returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR; - } - - DocumentContext documentContext = null; + DocumentContext documentContext; try { documentContext = validateAndEstablishJsonContext(processSession, flowFile); } catch (InvalidJsonException e) { @@ -269,59 +275,67 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { return; } - final Map jsonPathResults = new HashMap<>(); + Set> attributeJsonPathEntries = attributeToJsonPathEntrySetQueue.poll(); + if (attributeJsonPathEntries == null) { + attributeJsonPathEntries = processContext.getProperties().entrySet().stream() + .filter(e -> e.getKey().isDynamic()) + .collect(Collectors.toMap(e -> e.getKey().getName(), e -> JsonPath.compile(e.getValue()))) + .entrySet(); + } - for (final Map.Entry attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) { + try { + // We'll only be using this map if destinationIsAttribute == true + final Map jsonPathResults = destinationIsAttribute ? new HashMap<>(attributeJsonPathEntries.size()) : Collections.EMPTY_MAP; - final String jsonPathAttrKey = attributeJsonPathEntry.getKey(); - final JsonPath jsonPathExp = attributeJsonPathEntry.getValue(); - final String pathNotFound = processContext.getProperty(PATH_NOT_FOUND).getValue(); + for (final Map.Entry attributeJsonPathEntry : attributeJsonPathEntries) { + final String jsonPathAttrKey = attributeJsonPathEntry.getKey(); + final JsonPath jsonPathExp = attributeJsonPathEntry.getValue(); - final AtomicReference resultHolder = new AtomicReference<>(null); - try { - final Object result = documentContext.read(jsonPathExp); - if (returnType.equals(RETURN_TYPE_SCALAR) && !isJsonScalar(result)) { - logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.", - new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()}); - processSession.transfer(flowFile, REL_FAILURE); - return; - } - resultHolder.set(result); - } catch (PathNotFoundException e) { + Object result; + try { + Object potentialResult = documentContext.read(jsonPathExp); + if (returnType.equals(RETURN_TYPE_SCALAR) && !isJsonScalar(potentialResult)) { + logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.", + new Object[]{jsonPathExp.getPath(), flowFile.getId(), potentialResult.toString(), REL_FAILURE.getName()}); + processSession.transfer(flowFile, REL_FAILURE); + return; + } + result = potentialResult; + } catch (PathNotFoundException e) { + if (pathNotFound.equals(PATH_NOT_FOUND_WARN)) { + logger.warn("FlowFile {} could not find path {} for attribute key {}.", + new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e); + } - if (pathNotFound.equals(PATH_NOT_FOUND_WARN)) { - logger.warn("FlowFile {} could not find path {} for attribute key {}.", - new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e); + if (destinationIsAttribute) { + jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY); + continue; + } else { + processSession.transfer(flowFile, REL_NO_MATCH); + return; + } } - if (destination.equals(DESTINATION_ATTRIBUTE)) { - jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY); - continue; - } else { - processSession.transfer(flowFile, REL_NO_MATCH); - return; - } - } - - final String resultRepresentation = getResultRepresentation(resultHolder.get(), nullDefaultValue); - switch (destination) { - case DESTINATION_ATTRIBUTE: + final String resultRepresentation = getResultRepresentation(result, nullDefaultValue); + if (destinationIsAttribute) { jsonPathResults.put(jsonPathAttrKey, resultRepresentation); - break; - case DESTINATION_CONTENT: - flowFile = processSession.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - try (OutputStream outputStream = new BufferedOutputStream(out)) { - outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8)); - } + } else { + flowFile = processSession.write(flowFile, out -> { + try (OutputStream outputStream = new BufferedOutputStream(out)) { + outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8)); } }); processSession.getProvenanceReporter().modifyContent(flowFile, "Replaced content with result of expression " + jsonPathExp.getPath()); - break; + } } + + // jsonPathResults map will be empty if this is false + if (destinationIsAttribute) { + flowFile = processSession.putAllAttributes(flowFile, jsonPathResults); + } + processSession.transfer(flowFile, REL_MATCH); + } finally { + attributeToJsonPathEntrySetQueue.offer(attributeJsonPathEntries); } - flowFile = processSession.putAllAttributes(flowFile, jsonPathResults); - processSession.transfer(flowFile, REL_MATCH); } }