diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java index 22244eedcc..970291659a 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java @@ -77,7 +77,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .subject(subject) .input(input) .explanation("Only non-null single characters are supported") - .valid(input.length() == 1 && input.charAt(0) != 0) + .valid((input.length() == 1 && input.charAt(0) != 0) || context.isExpressionLanguagePresent(input)) .build(); } }; @@ -111,6 +111,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .name("CSV charset") .description("Character set for CSV files") .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .expressionLanguageSupported(true) .defaultValue(DEFAULTS.charset) .build(); @@ -119,6 +120,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .name("CSV delimiter") .description("Delimiter character for CSV records") .addValidator(CHAR_VALIDATOR) + .expressionLanguageSupported(true) .defaultValue(DEFAULTS.delimiter) .build(); @@ -127,6 +129,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .name("CSV quote character") .description("Quote character for CSV values") .addValidator(CHAR_VALIDATOR) + .expressionLanguageSupported(true) .defaultValue(DEFAULTS.quote) .build(); @@ -135,6 +138,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .name("CSV escape character") .description("Escape character for CSV values") .addValidator(CHAR_VALIDATOR) + .expressionLanguageSupported(true) .defaultValue(DEFAULTS.escape) .build(); @@ -143,6 +147,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .name("Use CSV header line") .description("Whether to use the first line as a header") .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .expressionLanguageSupported(true) .defaultValue(String.valueOf(DEFAULTS.useHeader)) .build(); @@ -151,6 +156,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .name("Lines to skip") .description("Number of lines to skip before reading header or data") .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true)) + .expressionLanguageSupported(true) .defaultValue(String.valueOf(DEFAULTS.linesToSkip)) .build(); @@ -172,10 +178,6 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .add(INCOMPATIBLE) .build(); - // Immutable configuration - @VisibleForTesting - volatile CSVProperties props; - @Override protected List getSupportedPropertyDescriptors() { return PROPERTIES; @@ -189,15 +191,6 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { @OnScheduled public void createCSVProperties(ProcessContext context) throws IOException { super.setDefaultConfiguration(context); - - this.props = new CSVProperties.Builder() - .charset(context.getProperty(CHARSET).getValue()) - .delimiter(context.getProperty(DELIMITER).getValue()) - .quote(context.getProperty(QUOTE).getValue()) - .escape(context.getProperty(ESCAPE).getValue()) - .hasHeader(context.getProperty(HAS_HEADER).asBoolean()) - .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger()) - .build(); } @Override @@ -208,6 +201,15 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { return; } + CSVProperties props = new CSVProperties.Builder() + .charset(context.getProperty(CHARSET).evaluateAttributeExpressions(incomingCSV).getValue()) + .delimiter(context.getProperty(DELIMITER).evaluateAttributeExpressions(incomingCSV).getValue()) + .quote(context.getProperty(QUOTE).evaluateAttributeExpressions(incomingCSV).getValue()) + .escape(context.getProperty(ESCAPE).evaluateAttributeExpressions(incomingCSV).getValue()) + .hasHeader(context.getProperty(HAS_HEADER).evaluateAttributeExpressions(incomingCSV).asBoolean()) + .linesToSkip(context.getProperty(LINES_TO_SKIP).evaluateAttributeExpressions(incomingCSV).asInteger()) + .build(); + String schemaProperty = context.getProperty(SCHEMA) .evaluateAttributeExpressions(incomingCSV) .getValue(); diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java index 9252e81648..8bad01c355 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java @@ -18,12 +18,19 @@ */ package org.apache.nifi.processors.kite; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.HashMap; + import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; -import org.apache.nifi.processor.ProcessContext; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -32,6 +39,9 @@ import org.junit.Assert; import org.junit.Test; import static org.apache.nifi.processors.kite.TestUtil.streamFor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestCSVToAvroProcessor { @@ -219,50 +229,6 @@ public class TestCSVToAvroProcessor { "No incoming records", incompatible.getAttribute("errors")); } - @Test - public void testCSVProperties() throws IOException { - TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); - ConvertCSVToAvro processor = new ConvertCSVToAvro(); - ProcessContext context = runner.getProcessContext(); - - // check defaults - processor.createCSVProperties(context); - Assert.assertEquals("Charset should match", - "utf8", processor.props.charset); - Assert.assertEquals("Delimiter should match", - ",", processor.props.delimiter); - Assert.assertEquals("Quote should match", - "\"", processor.props.quote); - Assert.assertEquals("Escape should match", - "\\", processor.props.escape); - Assert.assertEquals("Header flag should match", - false, processor.props.useHeader); - Assert.assertEquals("Lines to skip should match", - 0, processor.props.linesToSkip); - - runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16"); - runner.setProperty(ConvertCSVToAvro.DELIMITER, "|"); - runner.setProperty(ConvertCSVToAvro.QUOTE, "'"); - runner.setProperty(ConvertCSVToAvro.ESCAPE, "\u2603"); - runner.setProperty(ConvertCSVToAvro.HAS_HEADER, "true"); - runner.setProperty(ConvertCSVToAvro.LINES_TO_SKIP, "2"); - - // check updates - processor.createCSVProperties(context); - Assert.assertEquals("Charset should match", - "utf16", processor.props.charset); - Assert.assertEquals("Delimiter should match", - "|", processor.props.delimiter); - Assert.assertEquals("Quote should match", - "'", processor.props.quote); - Assert.assertEquals("Escape should match", - "\u2603", processor.props.escape); - Assert.assertEquals("Header flag should match", - true, processor.props.useHeader); - Assert.assertEquals("Lines to skip should match", - 2, processor.props.linesToSkip); - } - @Test public void testBasicConversionNoErrors() throws IOException { TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); @@ -282,4 +248,53 @@ public class TestCSVToAvroProcessor { runner.assertTransferCount("failure", 0); runner.assertTransferCount("incompatible", 0); } + + @Test + public void testExpressionLanguageBasedCSVProperties() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString()); + runner.assertValid(); + + runner.setProperty(ConvertCSVToAvro.DELIMITER, "${csv.delimiter}"); + runner.setProperty(ConvertCSVToAvro.QUOTE, "${csv.quote}"); + + HashMap flowFileAttributes = new HashMap(); + flowFileAttributes.put("csv.delimiter", "|"); + flowFileAttributes.put("csv.quote", "~"); + + runner.enqueue(streamFor("1|green\n2|~blue|field~|\n3|grey|12.95"), flowFileAttributes); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 3 rows", 3, converted); + Assert.assertEquals("Should reject 0 row", 0, errors); + + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 0); + runner.assertTransferCount("incompatible", 0); + + final InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship("success").get(0).toByteArray()); + final DatumReader datumReader = new GenericDatumReader<>(); + try (DataFileStream dataFileReader = new DataFileStream<>(in, datumReader)) { + assertTrue(dataFileReader.hasNext()); + GenericRecord record = dataFileReader.next(); + assertEquals(1L, record.get("id")); + assertEquals("green", record.get("color").toString()); + assertNull(record.get("price")); + + assertTrue(dataFileReader.hasNext()); + record = dataFileReader.next(); + assertEquals(2L, record.get("id")); + assertEquals("blue|field", record.get("color").toString()); + assertNull(record.get("price")); + + assertTrue(dataFileReader.hasNext()); + record = dataFileReader.next(); + assertEquals(3L, record.get("id")); + assertEquals("grey", record.get("color").toString()); + assertEquals(12.95, record.get("price")); + } + } }