diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java index 81c9bbec27..64f6e0d428 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java @@ -40,7 +40,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.util.ObjectHolder; -import org.apache.nifi.util.Tuple; import java.io.IOException; import java.io.OutputStream; @@ -96,7 +95,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { private Set relationships; private List properties; - private ConcurrentMap> cachedJsonPathMap = new ConcurrentHashMap<>(); + private final ConcurrentMap cachedJsonPathMap = new ConcurrentHashMap<>(); @Override protected void init(final ProcessorInitializationContext context) { @@ -153,13 +152,12 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { .addValidator(new JsonPathValidator() { @Override public void cacheComputedValue(String subject, String input, JsonPath computedJsonPath) { - cachedJsonPathMap.put(subject, new Tuple<>(input, computedJsonPath)); - + cachedJsonPathMap.put(input, computedJsonPath); } @Override public boolean isStale(String subject, String input) { - return cachedJsonPathMap.get(subject) == null; + return cachedJsonPathMap.get(input) == null; } }) .required(false) @@ -171,7 +169,9 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { if (descriptor.isDynamic()) { if (!StringUtils.equals(oldValue, newValue)) { - cachedJsonPathMap.remove(descriptor.getName()); + if (oldValue != null) { + cachedJsonPathMap.remove(oldValue); + } } } } @@ -185,7 +185,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { public void onRemoved(ProcessContext processContext) { for (PropertyDescriptor propertyDescriptor : getPropertyDescriptors()) { if (propertyDescriptor.isDynamic()) { - cachedJsonPathMap.remove(propertyDescriptor.getName()); + cachedJsonPathMap.remove(processContext.getProperty(propertyDescriptor).getValue()); } } } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java index 7bb8c4e12c..5a193a15b8 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java @@ -26,8 +26,9 @@ import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.ProcessContext; @@ -35,13 +36,13 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; @EventDriven @SideEffectFree @@ -56,18 +57,8 @@ public class SplitJson extends AbstractJsonPathProcessor { public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder() .name("JsonPath Expression") .description("A JsonPath expression that indicates the array element to split into JSON/scalar fragments.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(true) - .addValidator(new JsonPathValidator() { - @Override - public void cacheComputedValue(String subject, String input, JsonPath computedJson) { - JSON_PATH_MAP.put(input, computedJson); - } - - @Override - public boolean isStale(String subject, String input) { - return JSON_PATH_MAP.get(input) == null; - } - }) .build(); public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build(); @@ -77,7 +68,7 @@ public class SplitJson extends AbstractJsonPathProcessor { private List properties; private Set relationships; - private static final ConcurrentMap JSON_PATH_MAP = new ConcurrentHashMap(); + private final AtomicReference JSON_PATH_REF = new AtomicReference<>(); @Override protected void init(final ProcessorInitializationContext context) { @@ -106,21 +97,30 @@ public class SplitJson extends AbstractJsonPathProcessor { public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { if (descriptor.equals(ARRAY_JSON_PATH_EXPRESSION)) { if (!StringUtils.equals(oldValue, newValue)) { - // clear the cached item - JSON_PATH_MAP.remove(oldValue); + if (oldValue != null) { + // clear the cached item + JSON_PATH_REF.set(null); + } } } } - /** - * Provides cleanup of the map for any JsonPath values that may have been created. This will remove common values - * shared between multiple instances, but will be regenerated when the next validation cycle occurs as a result of - * isStale() - */ - @OnRemoved - public void onRemoved(ProcessContext processContext) { - String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue(); - JSON_PATH_MAP.remove(jsonPathExpression); + @Override + protected Collection customValidate(ValidationContext validationContext) { + JsonPathValidator validator = new JsonPathValidator() { + @Override + public void cacheComputedValue(String subject, String input, JsonPath computedJson) { + JSON_PATH_REF.set(computedJson); + } + + @Override + public boolean isStale(String subject, String input) { + return JSON_PATH_REF.get() == null; + } + }; + + String value = validationContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue(); + return Collections.singleton(validator.validate(ARRAY_JSON_PATH_EXPRESSION.getName(), value, validationContext)); } @Override @@ -141,8 +141,7 @@ public class SplitJson extends AbstractJsonPathProcessor { return; } - String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue(); - final JsonPath jsonPath = JSON_PATH_MAP.get(jsonPathExpression); + final JsonPath jsonPath = JSON_PATH_REF.get(); final List segments = new ArrayList<>();