NIFI-3235 - EvaluateJsonPath performance improvements

This closes #1346.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Bryan Rosander 2016-12-20 12:29:15 -05:00 committed by Bryan Bende
parent 721c9ee7f0
commit 4285157675
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
1 changed files with 82 additions and 68 deletions

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.BufferedOutputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -26,8 +26,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
@ -40,6 +42,8 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -50,14 +54,13 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.BufferedOutputStream;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@EventDriven
@SideEffectFree
@ -140,6 +143,13 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
private final ConcurrentMap<String, JsonPath> cachedJsonPathMap = new ConcurrentHashMap<>();
private final Queue<Set<Map.Entry<String, JsonPath>>> attributeToJsonPathEntrySetQueue = new ConcurrentLinkedQueue<>();
private volatile String representationOption;
private volatile boolean destinationIsAttribute;
private volatile String returnType;
private volatile String pathNotFound;
private volatile String nullDefaultValue;
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
@ -230,9 +240,25 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
}
}
@OnScheduled
public void onScheduled(ProcessContext processContext) {
representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue();
destinationIsAttribute = DESTINATION_ATTRIBUTE.equals(processContext.getProperty(DESTINATION).getValue());
returnType = processContext.getProperty(RETURN_TYPE).getValue();
if (returnType.equals(RETURN_TYPE_AUTO)) {
returnType = destinationIsAttribute ? RETURN_TYPE_SCALAR : RETURN_TYPE_JSON;
}
pathNotFound = processContext.getProperty(PATH_NOT_FOUND).getValue();
nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption);
}
@OnUnscheduled
public void onUnscheduled() {
attributeToJsonPathEntrySetQueue.clear();
}
@Override
public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
FlowFile flowFile = processSession.get();
if (flowFile == null) {
return;
@ -240,27 +266,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
final ComponentLog logger = getLogger();
String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue();
final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption);
/* Build the JsonPath expressions from attributes */
final Map<String, JsonPath> attributeToJsonPathMap = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : processContext.getProperties().entrySet()) {
if (!entry.getKey().isDynamic()) {
continue;
}
final JsonPath jsonPath = JsonPath.compile(entry.getValue());
attributeToJsonPathMap.put(entry.getKey().getName(), jsonPath);
}
final String destination = processContext.getProperty(DESTINATION).getValue();
String returnType = processContext.getProperty(RETURN_TYPE).getValue();
if (returnType.equals(RETURN_TYPE_AUTO)) {
returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR;
}
DocumentContext documentContext = null;
DocumentContext documentContext;
try {
documentContext = validateAndEstablishJsonContext(processSession, flowFile);
} catch (InvalidJsonException e) {
@ -269,59 +275,67 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
return;
}
final Map<String, String> jsonPathResults = new HashMap<>();
Set<Map.Entry<String, JsonPath>> attributeJsonPathEntries = attributeToJsonPathEntrySetQueue.poll();
if (attributeJsonPathEntries == null) {
attributeJsonPathEntries = processContext.getProperties().entrySet().stream()
.filter(e -> e.getKey().isDynamic())
.collect(Collectors.toMap(e -> e.getKey().getName(), e -> JsonPath.compile(e.getValue())))
.entrySet();
}
for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) {
try {
// We'll only be using this map if destinationIsAttribute == true
final Map<String, String> jsonPathResults = destinationIsAttribute ? new HashMap<>(attributeJsonPathEntries.size()) : Collections.EMPTY_MAP;
final String jsonPathAttrKey = attributeJsonPathEntry.getKey();
final JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
final String pathNotFound = processContext.getProperty(PATH_NOT_FOUND).getValue();
for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeJsonPathEntries) {
final String jsonPathAttrKey = attributeJsonPathEntry.getKey();
final JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
final AtomicReference<Object> resultHolder = new AtomicReference<>(null);
try {
final Object result = documentContext.read(jsonPathExp);
if (returnType.equals(RETURN_TYPE_SCALAR) && !isJsonScalar(result)) {
logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.",
new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()});
processSession.transfer(flowFile, REL_FAILURE);
return;
}
resultHolder.set(result);
} catch (PathNotFoundException e) {
Object result;
try {
Object potentialResult = documentContext.read(jsonPathExp);
if (returnType.equals(RETURN_TYPE_SCALAR) && !isJsonScalar(potentialResult)) {
logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.",
new Object[]{jsonPathExp.getPath(), flowFile.getId(), potentialResult.toString(), REL_FAILURE.getName()});
processSession.transfer(flowFile, REL_FAILURE);
return;
}
result = potentialResult;
} catch (PathNotFoundException e) {
if (pathNotFound.equals(PATH_NOT_FOUND_WARN)) {
logger.warn("FlowFile {} could not find path {} for attribute key {}.",
new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
}
if (pathNotFound.equals(PATH_NOT_FOUND_WARN)) {
logger.warn("FlowFile {} could not find path {} for attribute key {}.",
new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
if (destinationIsAttribute) {
jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY);
continue;
} else {
processSession.transfer(flowFile, REL_NO_MATCH);
return;
}
}
if (destination.equals(DESTINATION_ATTRIBUTE)) {
jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY);
continue;
} else {
processSession.transfer(flowFile, REL_NO_MATCH);
return;
}
}
final String resultRepresentation = getResultRepresentation(resultHolder.get(), nullDefaultValue);
switch (destination) {
case DESTINATION_ATTRIBUTE:
final String resultRepresentation = getResultRepresentation(result, nullDefaultValue);
if (destinationIsAttribute) {
jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
break;
case DESTINATION_CONTENT:
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8));
}
} else {
flowFile = processSession.write(flowFile, out -> {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8));
}
});
processSession.getProvenanceReporter().modifyContent(flowFile, "Replaced content with result of expression " + jsonPathExp.getPath());
break;
}
}
// jsonPathResults map will be empty if this is false
if (destinationIsAttribute) {
flowFile = processSession.putAllAttributes(flowFile, jsonPathResults);
}
processSession.transfer(flowFile, REL_MATCH);
} finally {
attributeToJsonPathEntrySetQueue.offer(attributeJsonPathEntries);
}
flowFile = processSession.putAllAttributes(flowFile, jsonPathResults);
processSession.transfer(flowFile, REL_MATCH);
}
}