diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java index 7777438828..8255781d3a 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -209,6 +209,16 @@ public class StandardValidators { @Override public ValidationResult validate(final String subject, final String value, final ValidationContext context) { if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { + final ResultType resultType = context.newExpressionLanguageCompiler().getResultType(value); + if (!resultType.equals(ResultType.STRING)) { + return new ValidationResult.Builder() + .subject(subject) + .input(value) + .valid(false) + .explanation("Expected Attribute Query to return type " + ResultType.STRING + " but query returns type " + resultType) + .build(); + } + return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java index 7a99a59d41..53c19543ea 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; + import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessorInitializationContext; @@ -87,12 +88,14 @@ public class ConvertCharacterSet extends AbstractProcessor { public static final PropertyDescriptor INPUT_CHARSET = new PropertyDescriptor.Builder() .name("Input Character Set") .description("The name of the CharacterSet to expect for Input") + .expressionLanguageSupported(true) .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .required(true) .build(); public static final PropertyDescriptor OUTPUT_CHARSET = new PropertyDescriptor.Builder() .name("Output Character Set") .description("The name of the CharacterSet to convert to") + .expressionLanguageSupported(true) .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .required(true) .build(); @@ -128,10 +131,15 @@ public class ConvertCharacterSet extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + final ProcessorLog logger = getLogger(); - final Charset inputCharset = Charset.forName(context.getProperty(INPUT_CHARSET).getValue()); - final Charset outputCharset = Charset.forName(context.getProperty(OUTPUT_CHARSET).getValue()); + final Charset inputCharset = Charset.forName(context.getProperty(INPUT_CHARSET).evaluateAttributeExpressions(flowFile).getValue()); + final Charset outputCharset = Charset.forName(context.getProperty(OUTPUT_CHARSET).evaluateAttributeExpressions(flowFile).getValue()); final CharBuffer charBuffer = CharBuffer.allocate(MAX_BUFFER_SIZE); final CharsetDecoder decoder = inputCharset.newDecoder(); @@ -144,11 +152,6 @@ public class ConvertCharacterSet extends AbstractProcessor { encoder.onUnmappableCharacter(CodingErrorAction.REPLACE); encoder.replaceWith("?".getBytes(outputCharset)); - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - try { final StopWatch stopWatch = new StopWatch(true); flowFile = session.write(flowFile, new StreamCallback() { @@ -169,7 +172,7 @@ public class ConvertCharacterSet extends AbstractProcessor { session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); logger.info("successfully converted characters from {} to {} for {}", - new Object[]{context.getProperty(INPUT_CHARSET).getValue(), context.getProperty(OUTPUT_CHARSET).getValue(), flowFile}); + new Object[]{inputCharset, outputCharset, flowFile}); session.transfer(flowFile, REL_SUCCESS); } catch (final Exception e) { throw new ProcessException(e); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertCharacterSet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertCharacterSet.java index 1b057d901f..fea0a4e4ec 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertCharacterSet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertCharacterSet.java @@ -19,16 +19,20 @@ package org.apache.nifi.processors.standard; import java.io.File; import java.io.IOException; import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; +import static junit.framework.TestCase.fail; + public class TestConvertCharacterSet { @Test - public void test() throws IOException { + public void testSimple() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new ConvertCharacterSet()); runner.setProperty(ConvertCharacterSet.INPUT_CHARSET, "ASCII"); runner.setProperty(ConvertCharacterSet.OUTPUT_CHARSET, "UTF-32"); @@ -41,4 +45,62 @@ public class TestConvertCharacterSet { output.assertContentEquals(new File("src/test/resources/CharacterSetConversionSamples/Converted2.txt")); } + @Test + public void testExpressionLanguageInput() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertCharacterSet()); + runner.setProperty(ConvertCharacterSet.INPUT_CHARSET, "${characterSet}"); + runner.setProperty(ConvertCharacterSet.OUTPUT_CHARSET, "UTF-32"); + + final Map attributes = new HashMap<>(); + attributes.put("characterSet", "ASCII"); + runner.enqueue(Paths.get("src/test/resources/CharacterSetConversionSamples/Original.txt"),attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertCharacterSet.REL_SUCCESS, 1); + final MockFlowFile output = runner.getFlowFilesForRelationship(ConvertCharacterSet.REL_SUCCESS).get(0); + output.assertContentEquals(new File("src/test/resources/CharacterSetConversionSamples/Converted2.txt")); + } + + @Test + public void testExpressionLanguageOutput() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertCharacterSet()); + runner.setProperty(ConvertCharacterSet.INPUT_CHARSET, "ASCII"); + runner.setProperty(ConvertCharacterSet.OUTPUT_CHARSET, "${characterSet}"); + + final Map attributes = new HashMap<>(); + attributes.put("characterSet", "UTF-32"); + runner.enqueue(Paths.get("src/test/resources/CharacterSetConversionSamples/Original.txt"),attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertCharacterSet.REL_SUCCESS, 1); + final MockFlowFile output = runner.getFlowFilesForRelationship(ConvertCharacterSet.REL_SUCCESS).get(0); + output.assertContentEquals(new File("src/test/resources/CharacterSetConversionSamples/Converted2.txt")); + } + + @Test + public void testExpressionLanguageConfig() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertCharacterSet()); + runner.setProperty(ConvertCharacterSet.INPUT_CHARSET, "${now()}"); + runner.setProperty(ConvertCharacterSet.OUTPUT_CHARSET, "UTF-32"); + + runner.enqueue(Paths.get("src/test/resources/CharacterSetConversionSamples/Original.txt")); + try { + runner.run(); + fail("Should fail to validate config and fail to run the on trigger"); + } catch (AssertionError e){ + // Expect to fail assertion for passing a date to the character set validator + } + + + runner.setProperty(ConvertCharacterSet.INPUT_CHARSET, "UTF-32"); + runner.setProperty(ConvertCharacterSet.OUTPUT_CHARSET, "${anyAttribute(\"abc\", \"xyz\"):contains(\"bye\")}"); + + runner.enqueue(Paths.get("src/test/resources/CharacterSetConversionSamples/Original.txt")); + try { + runner.run(); + fail("Should fail to validate config and fail to run the on trigger"); + } catch (AssertionError e) { + // Expect to fail assertion for passing a boolean to the character set validator + } + } }