diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java index e8e5452d3e..63e2a33afe 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.io.ByteArrayInputStream; import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; @@ -25,7 +24,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -49,15 +50,13 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.util.jolt.TransformUtils; -import org.apache.nifi.util.file.classloader.ClassLoaderUtils; import org.apache.nifi.processors.standard.util.jolt.TransformFactory; -import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.processors.standard.util.jolt.TransformUtils; import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; import com.bazaarvoice.jolt.JoltTransform; import com.bazaarvoice.jolt.JsonUtils; @@ -120,6 +119,16 @@ public class JoltTransformJSON extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder() + .name("Transform Cache Size") + .description("Compiling a Jolt Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need " + + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.") + .expressionLanguageSupported(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .required(true) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("The FlowFile with transformed content will be routed to this relationship") @@ -132,24 +141,34 @@ public class JoltTransformJSON extends AbstractProcessor { private final static List properties; private final static Set relationships; private volatile ClassLoader customClassLoader; - private volatile JoltTransform transform; - private volatile String specJsonString; private final static String DEFAULT_CHARSET = "UTF-8"; - static{ + // Cache is guarded by synchronizing on 'this'. + private volatile int maxTransformsToCache = 10; + private final Map transformCache = new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + final boolean evict = size() > maxTransformsToCache; + if (evict) { + getLogger().debug("Removing Jolt Transform from cache because cache is full"); + } + return evict; + } + }; + static { final List _properties = new ArrayList<>(); _properties.add(JOLT_TRANSFORM); _properties.add(CUSTOM_CLASS); _properties.add(MODULES); _properties.add(JOLT_SPEC); + _properties.add(TRANSFORM_CACHE_SIZE); properties = Collections.unmodifiableList(_properties); final Set _relationships = new HashSet<>(); _relationships.add(REL_SUCCESS); _relationships.add(REL_FAILURE); relationships = Collections.unmodifiableSet(_relationships); - } @Override @@ -163,6 +182,7 @@ public class JoltTransformJSON extends AbstractProcessor { } + @Override protected Collection customValidate(ValidationContext validationContext) { final List results = new ArrayList<>(super.customValidate(validationContext)); @@ -177,34 +197,29 @@ public class JoltTransformJSON extends AbstractProcessor { .explanation(message) .build()); } - } else { - final ClassLoader customClassLoader; - try { - if(modulePath != null) { + try { + if (modulePath != null) { customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter()); - }else{ + } else { customClassLoader = this.getClass().getClassLoader(); } final String specValue = validationContext.getProperty(JOLT_SPEC).getValue(); final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue,true); - if(validationContext.isExpressionLanguagePresent(specValue) && invalidExpressionMsg != null){ + if (validationContext.isExpressionLanguagePresent(specValue) && invalidExpressionMsg != null) { final String customMessage = "The expression language used withing this specification is invalid"; results.add(new ValidationResult.Builder().valid(false) .explanation(customMessage) .build()); - - }else { - + } else { //for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{}","\\\\\\\\\\$\\{"), DEFAULT_CHARSET); if (CUSTOMR.getValue().equals(transform)) { - if (StringUtils.isEmpty(customTransform)) { final String customMessage = "A custom transformation class should be provided. "; results.add(new ValidationResult.Builder().valid(false) @@ -213,12 +228,10 @@ public class JoltTransformJSON extends AbstractProcessor { } else { TransformFactory.getCustomTransform(customClassLoader, customTransform, specJson); } - } else { TransformFactory.getTransform(customClassLoader, transform, specJson); } } - } catch (final Exception e) { getLogger().info("Processor is not valid - " + e.toString()); String message = "Specification not valid for the selected transformation." ; @@ -227,12 +240,12 @@ public class JoltTransformJSON extends AbstractProcessor { .build()); } } + return results; } @Override public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { - final FlowFile original = session.get(); if (original == null) { return; @@ -241,62 +254,31 @@ public class JoltTransformJSON extends AbstractProcessor { final ComponentLog logger = getLogger(); final StopWatch stopWatch = new StopWatch(true); - final byte[] originalContent = new byte[(int) original.getSize()]; - session.read(original, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, originalContent, true); - } - }); + final Object inputJson; + try (final InputStream in = session.read(original)) { + inputJson = JsonUtils.jsonToObject(in); + } catch (final Exception e) { + logger.error("Failed to transform {}; routing to failure", new Object[] {original, e}); + session.transfer(original, REL_FAILURE); + return; + } final String jsonString; final ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader(); - try { - - final String specString; - - if(context.getProperty(JOLT_SPEC).isSet() && !StringUtils.isEmpty(context.getProperty(JOLT_SPEC).getValue())){ - specString = context.isExpressionLanguagePresent(JOLT_SPEC) ? context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(original).getValue() : - context.getProperty(JOLT_SPEC).getValue(); - }else{ - specString = null; - } - - if(transform == null || (specString != null && !specJsonString.equals(specString)) || (specString == null && SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue()))){ - - specJsonString = specString; - - final Object specJson; - if(context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())){ - specJson = JsonUtils.jsonToObject(specJsonString, DEFAULT_CHARSET); - }else{ - specJson = null; - } - - if(CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())){ - transform = TransformFactory.getCustomTransform(customClassLoader,context.getProperty(CUSTOM_CLASS).getValue(), specJson); - }else { - transform = TransformFactory.getTransform(customClassLoader, context.getProperty(JOLT_TRANSFORM).getValue(), specJson); - } - } - - if(customClassLoader != null) { + final JoltTransform transform = getTransform(context, original); + if (customClassLoader != null) { Thread.currentThread().setContextClassLoader(customClassLoader); } - final ByteArrayInputStream bais = new ByteArrayInputStream(originalContent); - final Object inputJson = JsonUtils.jsonToObject(bais); final Object transformedJson = TransformUtils.transform(transform,inputJson); jsonString = JsonUtils.toJsonString(transformedJson); - - } catch (Exception ex) { - logger.error("Unable to transform {} due to {}", new Object[]{original, ex}); + } catch (final Exception ex) { + logger.error("Unable to transform {} due to {}", new Object[] {original, ex.toString(), ex}); session.transfer(original, REL_FAILURE); return; - - }finally { - if(customClassLoader != null && originalContextClassLoader != null) { + } finally { + if (customClassLoader != null && originalContextClassLoader != null) { Thread.currentThread().setContextClassLoader(originalContextClassLoader); } } @@ -309,27 +291,73 @@ public class JoltTransformJSON extends AbstractProcessor { }); final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); - transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(),"application/json"); + transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json"); session.transfer(transformed, REL_SUCCESS); session.getProvenanceReporter().modifyContent(transformed,"Modified With " + transformType ,stopWatch.getElapsed(TimeUnit.MILLISECONDS)); logger.info("Transformed {}", new Object[]{original}); + } + private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) throws Exception { + final String specString; + if (context.getProperty(JOLT_SPEC).isSet() && !StringUtils.isEmpty(context.getProperty(JOLT_SPEC).getValue())) { + specString = context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue(); + } else { + specString = null; + } + + // Get the transform from our cache, if it exists. + JoltTransform transform = null; + synchronized (this) { + transform = transformCache.get(specString); + } + + if (transform != null) { + return transform; + } + + // If no transform for our spec, create the transform. + final Object specJson; + if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { + specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET); + } else { + specJson = null; + } + + if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { + transform = TransformFactory.getCustomTransform(customClassLoader, context.getProperty(CUSTOM_CLASS).getValue(), specJson); + } else { + transform = TransformFactory.getTransform(customClassLoader, context.getProperty(JOLT_TRANSFORM).getValue(), specJson); + } + + // Check again for the transform in our cache, since it's possible that another thread has + // already populated it. If absent from the cache, populate the cache. Otherwise, use the + // value from the cache. + synchronized (this) { + final JoltTransform existingTransform = transformCache.get(specString); + if (existingTransform == null) { + transformCache.put(specString, transform); + } else { + transform = existingTransform; + } + } + + return transform; } @OnScheduled - public void setup(final ProcessContext context) { + public synchronized void setup(final ProcessContext context) { + transformCache.clear(); + maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger(); - try{ - if(context.getProperty(MODULES).isSet()){ - customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(),this.getClass().getClassLoader(),getJarFilenameFilter()); - }else{ + try { + if (context.getProperty(MODULES).isSet()) { + customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(), this.getClass().getClassLoader(), getJarFilenameFilter()); + } else { customClassLoader = this.getClass().getClassLoader(); } - - } catch (Exception ex){ - getLogger().error("Unable to setup processor",ex); + } catch (final Exception ex) { + getLogger().error("Unable to setup processor", ex); } - } protected FilenameFilter getJarFilenameFilter(){