mirror of https://github.com/apache/nifi.git
NIFI-1077 enabling expression language on ConvertCharacterSet input and output
Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
parent
814e8b212c
commit
a549621267
|
@ -209,6 +209,16 @@ public class StandardValidators {
|
||||||
@Override
|
@Override
|
||||||
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
|
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
|
||||||
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
|
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
|
||||||
|
final ResultType resultType = context.newExpressionLanguageCompiler().getResultType(value);
|
||||||
|
if (!resultType.equals(ResultType.STRING)) {
|
||||||
|
return new ValidationResult.Builder()
|
||||||
|
.subject(subject)
|
||||||
|
.input(value)
|
||||||
|
.valid(false)
|
||||||
|
.explanation("Expected Attribute Query to return type " + ResultType.STRING + " but query returns type " + resultType)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
|
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.OutputStreamWriter;
|
import java.io.OutputStreamWriter;
|
||||||
|
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.AbstractProcessor;
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
|
@ -87,12 +88,14 @@ public class ConvertCharacterSet extends AbstractProcessor {
|
||||||
public static final PropertyDescriptor INPUT_CHARSET = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor INPUT_CHARSET = new PropertyDescriptor.Builder()
|
||||||
.name("Input Character Set")
|
.name("Input Character Set")
|
||||||
.description("The name of the CharacterSet to expect for Input")
|
.description("The name of the CharacterSet to expect for Input")
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||||
.required(true)
|
.required(true)
|
||||||
.build();
|
.build();
|
||||||
public static final PropertyDescriptor OUTPUT_CHARSET = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor OUTPUT_CHARSET = new PropertyDescriptor.Builder()
|
||||||
.name("Output Character Set")
|
.name("Output Character Set")
|
||||||
.description("The name of the CharacterSet to convert to")
|
.description("The name of the CharacterSet to convert to")
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||||
.required(true)
|
.required(true)
|
||||||
.build();
|
.build();
|
||||||
|
@ -128,10 +131,15 @@ public class ConvertCharacterSet extends AbstractProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||||
|
FlowFile flowFile = session.get();
|
||||||
|
if (flowFile == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
final ProcessorLog logger = getLogger();
|
final ProcessorLog logger = getLogger();
|
||||||
|
|
||||||
final Charset inputCharset = Charset.forName(context.getProperty(INPUT_CHARSET).getValue());
|
final Charset inputCharset = Charset.forName(context.getProperty(INPUT_CHARSET).evaluateAttributeExpressions(flowFile).getValue());
|
||||||
final Charset outputCharset = Charset.forName(context.getProperty(OUTPUT_CHARSET).getValue());
|
final Charset outputCharset = Charset.forName(context.getProperty(OUTPUT_CHARSET).evaluateAttributeExpressions(flowFile).getValue());
|
||||||
final CharBuffer charBuffer = CharBuffer.allocate(MAX_BUFFER_SIZE);
|
final CharBuffer charBuffer = CharBuffer.allocate(MAX_BUFFER_SIZE);
|
||||||
|
|
||||||
final CharsetDecoder decoder = inputCharset.newDecoder();
|
final CharsetDecoder decoder = inputCharset.newDecoder();
|
||||||
|
@ -144,11 +152,6 @@ public class ConvertCharacterSet extends AbstractProcessor {
|
||||||
encoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
|
encoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
|
||||||
encoder.replaceWith("?".getBytes(outputCharset));
|
encoder.replaceWith("?".getBytes(outputCharset));
|
||||||
|
|
||||||
FlowFile flowFile = session.get();
|
|
||||||
if (flowFile == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final StopWatch stopWatch = new StopWatch(true);
|
final StopWatch stopWatch = new StopWatch(true);
|
||||||
flowFile = session.write(flowFile, new StreamCallback() {
|
flowFile = session.write(flowFile, new StreamCallback() {
|
||||||
|
@ -169,7 +172,7 @@ public class ConvertCharacterSet extends AbstractProcessor {
|
||||||
|
|
||||||
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
||||||
logger.info("successfully converted characters from {} to {} for {}",
|
logger.info("successfully converted characters from {} to {} for {}",
|
||||||
new Object[]{context.getProperty(INPUT_CHARSET).getValue(), context.getProperty(OUTPUT_CHARSET).getValue(), flowFile});
|
new Object[]{inputCharset, outputCharset, flowFile});
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
throw new ProcessException(e);
|
throw new ProcessException(e);
|
||||||
|
|
|
@ -19,16 +19,20 @@ package org.apache.nifi.processors.standard;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static junit.framework.TestCase.fail;
|
||||||
|
|
||||||
public class TestConvertCharacterSet {
|
public class TestConvertCharacterSet {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test() throws IOException {
|
public void testSimple() throws IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new ConvertCharacterSet());
|
final TestRunner runner = TestRunners.newTestRunner(new ConvertCharacterSet());
|
||||||
runner.setProperty(ConvertCharacterSet.INPUT_CHARSET, "ASCII");
|
runner.setProperty(ConvertCharacterSet.INPUT_CHARSET, "ASCII");
|
||||||
runner.setProperty(ConvertCharacterSet.OUTPUT_CHARSET, "UTF-32");
|
runner.setProperty(ConvertCharacterSet.OUTPUT_CHARSET, "UTF-32");
|
||||||
|
@ -41,4 +45,62 @@ public class TestConvertCharacterSet {
|
||||||
output.assertContentEquals(new File("src/test/resources/CharacterSetConversionSamples/Converted2.txt"));
|
output.assertContentEquals(new File("src/test/resources/CharacterSetConversionSamples/Converted2.txt"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpressionLanguageInput() throws IOException {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new ConvertCharacterSet());
|
||||||
|
runner.setProperty(ConvertCharacterSet.INPUT_CHARSET, "${characterSet}");
|
||||||
|
runner.setProperty(ConvertCharacterSet.OUTPUT_CHARSET, "UTF-32");
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("characterSet", "ASCII");
|
||||||
|
runner.enqueue(Paths.get("src/test/resources/CharacterSetConversionSamples/Original.txt"),attributes);
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ConvertCharacterSet.REL_SUCCESS, 1);
|
||||||
|
final MockFlowFile output = runner.getFlowFilesForRelationship(ConvertCharacterSet.REL_SUCCESS).get(0);
|
||||||
|
output.assertContentEquals(new File("src/test/resources/CharacterSetConversionSamples/Converted2.txt"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpressionLanguageOutput() throws IOException {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new ConvertCharacterSet());
|
||||||
|
runner.setProperty(ConvertCharacterSet.INPUT_CHARSET, "ASCII");
|
||||||
|
runner.setProperty(ConvertCharacterSet.OUTPUT_CHARSET, "${characterSet}");
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("characterSet", "UTF-32");
|
||||||
|
runner.enqueue(Paths.get("src/test/resources/CharacterSetConversionSamples/Original.txt"),attributes);
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ConvertCharacterSet.REL_SUCCESS, 1);
|
||||||
|
final MockFlowFile output = runner.getFlowFilesForRelationship(ConvertCharacterSet.REL_SUCCESS).get(0);
|
||||||
|
output.assertContentEquals(new File("src/test/resources/CharacterSetConversionSamples/Converted2.txt"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpressionLanguageConfig() throws IOException {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new ConvertCharacterSet());
|
||||||
|
runner.setProperty(ConvertCharacterSet.INPUT_CHARSET, "${now()}");
|
||||||
|
runner.setProperty(ConvertCharacterSet.OUTPUT_CHARSET, "UTF-32");
|
||||||
|
|
||||||
|
runner.enqueue(Paths.get("src/test/resources/CharacterSetConversionSamples/Original.txt"));
|
||||||
|
try {
|
||||||
|
runner.run();
|
||||||
|
fail("Should fail to validate config and fail to run the on trigger");
|
||||||
|
} catch (AssertionError e){
|
||||||
|
// Expect to fail assertion for passing a date to the character set validator
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
runner.setProperty(ConvertCharacterSet.INPUT_CHARSET, "UTF-32");
|
||||||
|
runner.setProperty(ConvertCharacterSet.OUTPUT_CHARSET, "${anyAttribute(\"abc\", \"xyz\"):contains(\"bye\")}");
|
||||||
|
|
||||||
|
runner.enqueue(Paths.get("src/test/resources/CharacterSetConversionSamples/Original.txt"));
|
||||||
|
try {
|
||||||
|
runner.run();
|
||||||
|
fail("Should fail to validate config and fail to run the on trigger");
|
||||||
|
} catch (AssertionError e) {
|
||||||
|
// Expect to fail assertion for passing a boolean to the character set validator
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue