NIFI-3613: Fixed threading bug in JoltTransformJSON and adding caching of compiled Transforms

Signed-off-by: Yolanda M. Davis <ymdavis@apache.org>

This closes #1630
This commit is contained in:
Mark Payne 2017-03-28 15:52:32 -04:00 committed by Yolanda M. Davis
parent 168fc72bba
commit 83fa24b68f
1 changed files with 105 additions and 77 deletions

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.ByteArrayInputStream;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -25,7 +24,9 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -49,15 +50,13 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.jolt.TransformUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.apache.nifi.processors.standard.util.jolt.TransformFactory; import org.apache.nifi.processors.standard.util.jolt.TransformFactory;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.processors.standard.util.jolt.TransformUtils;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import com.bazaarvoice.jolt.JoltTransform; import com.bazaarvoice.jolt.JoltTransform;
import com.bazaarvoice.jolt.JsonUtils; import com.bazaarvoice.jolt.JsonUtils;
@ -120,6 +119,16 @@ public class JoltTransformJSON extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
.name("Transform Cache Size")
.description("Compiling a Jolt Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+ "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
.expressionLanguageSupported(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
.required(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("The FlowFile with transformed content will be routed to this relationship") .description("The FlowFile with transformed content will be routed to this relationship")
@ -132,24 +141,34 @@ public class JoltTransformJSON extends AbstractProcessor {
private final static List<PropertyDescriptor> properties; private final static List<PropertyDescriptor> properties;
private final static Set<Relationship> relationships; private final static Set<Relationship> relationships;
private volatile ClassLoader customClassLoader; private volatile ClassLoader customClassLoader;
private volatile JoltTransform transform;
private volatile String specJsonString;
private final static String DEFAULT_CHARSET = "UTF-8"; private final static String DEFAULT_CHARSET = "UTF-8";
static{ // Cache is guarded by synchronizing on 'this'.
private volatile int maxTransformsToCache = 10;
private final Map<String, JoltTransform> transformCache = new LinkedHashMap<String, JoltTransform>() {
@Override
protected boolean removeEldestEntry(Map.Entry<String, JoltTransform> eldest) {
final boolean evict = size() > maxTransformsToCache;
if (evict) {
getLogger().debug("Removing Jolt Transform from cache because cache is full");
}
return evict;
}
};
static {
final List<PropertyDescriptor> _properties = new ArrayList<>(); final List<PropertyDescriptor> _properties = new ArrayList<>();
_properties.add(JOLT_TRANSFORM); _properties.add(JOLT_TRANSFORM);
_properties.add(CUSTOM_CLASS); _properties.add(CUSTOM_CLASS);
_properties.add(MODULES); _properties.add(MODULES);
_properties.add(JOLT_SPEC); _properties.add(JOLT_SPEC);
_properties.add(TRANSFORM_CACHE_SIZE);
properties = Collections.unmodifiableList(_properties); properties = Collections.unmodifiableList(_properties);
final Set<Relationship> _relationships = new HashSet<>(); final Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS); _relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE); _relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships); relationships = Collections.unmodifiableSet(_relationships);
} }
@Override @Override
@ -163,6 +182,7 @@ public class JoltTransformJSON extends AbstractProcessor {
} }
@Override @Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext)); final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
@ -177,34 +197,29 @@ public class JoltTransformJSON extends AbstractProcessor {
.explanation(message) .explanation(message)
.build()); .build());
} }
} else { } else {
final ClassLoader customClassLoader; final ClassLoader customClassLoader;
try {
if(modulePath != null) { try {
if (modulePath != null) {
customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter()); customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter());
}else{ } else {
customClassLoader = this.getClass().getClassLoader(); customClassLoader = this.getClass().getClassLoader();
} }
final String specValue = validationContext.getProperty(JOLT_SPEC).getValue(); final String specValue = validationContext.getProperty(JOLT_SPEC).getValue();
final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue,true); final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue,true);
if(validationContext.isExpressionLanguagePresent(specValue) && invalidExpressionMsg != null){ if (validationContext.isExpressionLanguagePresent(specValue) && invalidExpressionMsg != null) {
final String customMessage = "The expression language used withing this specification is invalid"; final String customMessage = "The expression language used withing this specification is invalid";
results.add(new ValidationResult.Builder().valid(false) results.add(new ValidationResult.Builder().valid(false)
.explanation(customMessage) .explanation(customMessage)
.build()); .build());
} else {
}else {
//for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet //for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet
Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{}","\\\\\\\\\\$\\{"), DEFAULT_CHARSET); Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{}","\\\\\\\\\\$\\{"), DEFAULT_CHARSET);
if (CUSTOMR.getValue().equals(transform)) { if (CUSTOMR.getValue().equals(transform)) {
if (StringUtils.isEmpty(customTransform)) { if (StringUtils.isEmpty(customTransform)) {
final String customMessage = "A custom transformation class should be provided. "; final String customMessage = "A custom transformation class should be provided. ";
results.add(new ValidationResult.Builder().valid(false) results.add(new ValidationResult.Builder().valid(false)
@ -213,12 +228,10 @@ public class JoltTransformJSON extends AbstractProcessor {
} else { } else {
TransformFactory.getCustomTransform(customClassLoader, customTransform, specJson); TransformFactory.getCustomTransform(customClassLoader, customTransform, specJson);
} }
} else { } else {
TransformFactory.getTransform(customClassLoader, transform, specJson); TransformFactory.getTransform(customClassLoader, transform, specJson);
} }
} }
} catch (final Exception e) { } catch (final Exception e) {
getLogger().info("Processor is not valid - " + e.toString()); getLogger().info("Processor is not valid - " + e.toString());
String message = "Specification not valid for the selected transformation." ; String message = "Specification not valid for the selected transformation." ;
@ -227,12 +240,12 @@ public class JoltTransformJSON extends AbstractProcessor {
.build()); .build());
} }
} }
return results; return results;
} }
@Override @Override
public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile original = session.get(); final FlowFile original = session.get();
if (original == null) { if (original == null) {
return; return;
@ -241,62 +254,31 @@ public class JoltTransformJSON extends AbstractProcessor {
final ComponentLog logger = getLogger(); final ComponentLog logger = getLogger();
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
final byte[] originalContent = new byte[(int) original.getSize()]; final Object inputJson;
session.read(original, new InputStreamCallback() { try (final InputStream in = session.read(original)) {
@Override inputJson = JsonUtils.jsonToObject(in);
public void process(final InputStream in) throws IOException { } catch (final Exception e) {
StreamUtils.fillBuffer(in, originalContent, true); logger.error("Failed to transform {}; routing to failure", new Object[] {original, e});
session.transfer(original, REL_FAILURE);
return;
} }
});
final String jsonString; final String jsonString;
final ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader(); final ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
try { try {
final JoltTransform transform = getTransform(context, original);
final String specString; if (customClassLoader != null) {
if(context.getProperty(JOLT_SPEC).isSet() && !StringUtils.isEmpty(context.getProperty(JOLT_SPEC).getValue())){
specString = context.isExpressionLanguagePresent(JOLT_SPEC) ? context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(original).getValue() :
context.getProperty(JOLT_SPEC).getValue();
}else{
specString = null;
}
if(transform == null || (specString != null && !specJsonString.equals(specString)) || (specString == null && SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue()))){
specJsonString = specString;
final Object specJson;
if(context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())){
specJson = JsonUtils.jsonToObject(specJsonString, DEFAULT_CHARSET);
}else{
specJson = null;
}
if(CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())){
transform = TransformFactory.getCustomTransform(customClassLoader,context.getProperty(CUSTOM_CLASS).getValue(), specJson);
}else {
transform = TransformFactory.getTransform(customClassLoader, context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
}
}
if(customClassLoader != null) {
Thread.currentThread().setContextClassLoader(customClassLoader); Thread.currentThread().setContextClassLoader(customClassLoader);
} }
final ByteArrayInputStream bais = new ByteArrayInputStream(originalContent);
final Object inputJson = JsonUtils.jsonToObject(bais);
final Object transformedJson = TransformUtils.transform(transform,inputJson); final Object transformedJson = TransformUtils.transform(transform,inputJson);
jsonString = JsonUtils.toJsonString(transformedJson); jsonString = JsonUtils.toJsonString(transformedJson);
} catch (final Exception ex) {
} catch (Exception ex) { logger.error("Unable to transform {} due to {}", new Object[] {original, ex.toString(), ex});
logger.error("Unable to transform {} due to {}", new Object[]{original, ex});
session.transfer(original, REL_FAILURE); session.transfer(original, REL_FAILURE);
return; return;
} finally {
}finally { if (customClassLoader != null && originalContextClassLoader != null) {
if(customClassLoader != null && originalContextClassLoader != null) {
Thread.currentThread().setContextClassLoader(originalContextClassLoader); Thread.currentThread().setContextClassLoader(originalContextClassLoader);
} }
} }
@ -309,27 +291,73 @@ public class JoltTransformJSON extends AbstractProcessor {
}); });
final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); final String transformType = context.getProperty(JOLT_TRANSFORM).getValue();
transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(),"application/json"); transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
session.transfer(transformed, REL_SUCCESS); session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed,"Modified With " + transformType ,stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.getProvenanceReporter().modifyContent(transformed,"Modified With " + transformType ,stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.info("Transformed {}", new Object[]{original}); logger.info("Transformed {}", new Object[]{original});
}
private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) throws Exception {
final String specString;
if (context.getProperty(JOLT_SPEC).isSet() && !StringUtils.isEmpty(context.getProperty(JOLT_SPEC).getValue())) {
specString = context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue();
} else {
specString = null;
}
// Get the transform from our cache, if it exists.
JoltTransform transform = null;
synchronized (this) {
transform = transformCache.get(specString);
}
if (transform != null) {
return transform;
}
// If no transform for our spec, create the transform.
final Object specJson;
if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
} else {
specJson = null;
}
if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
transform = TransformFactory.getCustomTransform(customClassLoader, context.getProperty(CUSTOM_CLASS).getValue(), specJson);
} else {
transform = TransformFactory.getTransform(customClassLoader, context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
}
// Check again for the transform in our cache, since it's possible that another thread has
// already populated it. If absent from the cache, populate the cache. Otherwise, use the
// value from the cache.
synchronized (this) {
final JoltTransform existingTransform = transformCache.get(specString);
if (existingTransform == null) {
transformCache.put(specString, transform);
} else {
transform = existingTransform;
}
}
return transform;
} }
@OnScheduled @OnScheduled
public void setup(final ProcessContext context) { public synchronized void setup(final ProcessContext context) {
transformCache.clear();
maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
try{ try {
if(context.getProperty(MODULES).isSet()){ if (context.getProperty(MODULES).isSet()) {
customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(),this.getClass().getClassLoader(),getJarFilenameFilter()); customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(), this.getClass().getClassLoader(), getJarFilenameFilter());
}else{ } else {
customClassLoader = this.getClass().getClassLoader(); customClassLoader = this.getClass().getClassLoader();
} }
} catch (final Exception ex) {
} catch (Exception ex){ getLogger().error("Unable to setup processor", ex);
getLogger().error("Unable to setup processor",ex);
} }
} }
protected FilenameFilter getJarFilenameFilter(){ protected FilenameFilter getJarFilenameFilter(){