Adjusting handling of map to cache data items on an instance basis.

This commit is contained in:
Aldrin Piri 2015-03-01 16:31:32 -05:00
parent 484687a67b
commit 973b493386
2 changed files with 34 additions and 35 deletions

View File

@ -40,7 +40,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.Tuple;
import java.io.IOException;
import java.io.OutputStream;
@ -96,7 +95,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
private Set<Relationship> relationships;
private List<PropertyDescriptor> properties;
private ConcurrentMap<String, Tuple<String, JsonPath>> cachedJsonPathMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, JsonPath> cachedJsonPathMap = new ConcurrentHashMap<>();
@Override
protected void init(final ProcessorInitializationContext context) {
@ -153,13 +152,12 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
.addValidator(new JsonPathValidator() {
@Override
public void cacheComputedValue(String subject, String input, JsonPath computedJsonPath) {
cachedJsonPathMap.put(subject, new Tuple<>(input, computedJsonPath));
cachedJsonPathMap.put(input, computedJsonPath);
}
@Override
public boolean isStale(String subject, String input) {
return cachedJsonPathMap.get(subject) == null;
return cachedJsonPathMap.get(input) == null;
}
})
.required(false)
@ -171,7 +169,9 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
if (descriptor.isDynamic()) {
if (!StringUtils.equals(oldValue, newValue)) {
cachedJsonPathMap.remove(descriptor.getName());
if (oldValue != null) {
cachedJsonPathMap.remove(oldValue);
}
}
}
}
@ -185,7 +185,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
public void onRemoved(ProcessContext processContext) {
for (PropertyDescriptor propertyDescriptor : getPropertyDescriptors()) {
if (propertyDescriptor.isDynamic()) {
cachedJsonPathMap.remove(propertyDescriptor.getName());
cachedJsonPathMap.remove(processContext.getProperty(propertyDescriptor).getValue());
}
}
}

View File

@ -26,8 +26,9 @@ import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
@ -35,13 +36,13 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
@EventDriven
@SideEffectFree
@ -56,18 +57,8 @@ public class SplitJson extends AbstractJsonPathProcessor {
public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder()
.name("JsonPath Expression")
.description("A JsonPath expression that indicates the array element to split into JSON/scalar fragments.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.addValidator(new JsonPathValidator() {
@Override
public void cacheComputedValue(String subject, String input, JsonPath computedJson) {
JSON_PATH_MAP.put(input, computedJson);
}
@Override
public boolean isStale(String subject, String input) {
return JSON_PATH_MAP.get(input) == null;
}
})
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build();
@ -77,7 +68,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private static final ConcurrentMap<String, JsonPath> JSON_PATH_MAP = new ConcurrentHashMap();
private final AtomicReference<JsonPath> JSON_PATH_REF = new AtomicReference<>();
@Override
protected void init(final ProcessorInitializationContext context) {
@ -106,21 +97,30 @@ public class SplitJson extends AbstractJsonPathProcessor {
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
if (descriptor.equals(ARRAY_JSON_PATH_EXPRESSION)) {
if (!StringUtils.equals(oldValue, newValue)) {
if (oldValue != null) {
// clear the cached item
JSON_PATH_MAP.remove(oldValue);
JSON_PATH_REF.set(null);
}
}
}
}
/**
* Provides cleanup of the map for any JsonPath values that may have been created. This will remove common values
* shared between multiple instances, but will be regenerated when the next validation cycle occurs as a result of
* isStale()
*/
@OnRemoved
public void onRemoved(ProcessContext processContext) {
String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue();
JSON_PATH_MAP.remove(jsonPathExpression);
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
JsonPathValidator validator = new JsonPathValidator() {
@Override
public void cacheComputedValue(String subject, String input, JsonPath computedJson) {
JSON_PATH_REF.set(computedJson);
}
@Override
public boolean isStale(String subject, String input) {
return JSON_PATH_REF.get() == null;
}
};
String value = validationContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue();
return Collections.singleton(validator.validate(ARRAY_JSON_PATH_EXPRESSION.getName(), value, validationContext));
}
@Override
@ -141,8 +141,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
return;
}
String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue();
final JsonPath jsonPath = JSON_PATH_MAP.get(jsonPathExpression);
final JsonPath jsonPath = JSON_PATH_REF.get();
final List<FlowFile> segments = new ArrayList<>();