Added EL support to CSV properties

Clean up checkstyle errors

Added unit tests, fixed typo

This closes #709

Signed-off-by: Matt Burgess <mattyb149@apache.org>
This commit is contained in:
Simon Elliston Ball 2016-07-22 20:53:22 +01:00 committed by Matt Burgess
parent 7963df89eb
commit 0aa4b72678
2 changed files with 76 additions and 59 deletions

View File

@ -77,7 +77,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor {
.subject(subject)
.input(input)
.explanation("Only non-null single characters are supported")
.valid(input.length() == 1 && input.charAt(0) != 0)
.valid((input.length() == 1 && input.charAt(0) != 0) || context.isExpressionLanguagePresent(input))
.build();
}
};
@ -111,6 +111,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor {
.name("CSV charset")
.description("Character set for CSV files")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue(DEFAULTS.charset)
.build();
@ -119,6 +120,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor {
.name("CSV delimiter")
.description("Delimiter character for CSV records")
.addValidator(CHAR_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue(DEFAULTS.delimiter)
.build();
@ -127,6 +129,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor {
.name("CSV quote character")
.description("Quote character for CSV values")
.addValidator(CHAR_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue(DEFAULTS.quote)
.build();
@ -135,6 +138,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor {
.name("CSV escape character")
.description("Escape character for CSV values")
.addValidator(CHAR_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue(DEFAULTS.escape)
.build();
@ -143,6 +147,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor {
.name("Use CSV header line")
.description("Whether to use the first line as a header")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue(String.valueOf(DEFAULTS.useHeader))
.build();
@ -151,6 +156,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor {
.name("Lines to skip")
.description("Number of lines to skip before reading header or data")
.addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
.expressionLanguageSupported(true)
.defaultValue(String.valueOf(DEFAULTS.linesToSkip))
.build();
@ -172,10 +178,6 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor {
.add(INCOMPATIBLE)
.build();
// Immutable configuration
@VisibleForTesting
volatile CSVProperties props;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
@ -189,15 +191,6 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor {
@OnScheduled
public void createCSVProperties(ProcessContext context) throws IOException {
super.setDefaultConfiguration(context);
this.props = new CSVProperties.Builder()
.charset(context.getProperty(CHARSET).getValue())
.delimiter(context.getProperty(DELIMITER).getValue())
.quote(context.getProperty(QUOTE).getValue())
.escape(context.getProperty(ESCAPE).getValue())
.hasHeader(context.getProperty(HAS_HEADER).asBoolean())
.linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
.build();
}
@Override
@ -208,6 +201,15 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor {
return;
}
CSVProperties props = new CSVProperties.Builder()
.charset(context.getProperty(CHARSET).evaluateAttributeExpressions(incomingCSV).getValue())
.delimiter(context.getProperty(DELIMITER).evaluateAttributeExpressions(incomingCSV).getValue())
.quote(context.getProperty(QUOTE).evaluateAttributeExpressions(incomingCSV).getValue())
.escape(context.getProperty(ESCAPE).evaluateAttributeExpressions(incomingCSV).getValue())
.hasHeader(context.getProperty(HAS_HEADER).evaluateAttributeExpressions(incomingCSV).asBoolean())
.linesToSkip(context.getProperty(LINES_TO_SKIP).evaluateAttributeExpressions(incomingCSV).asInteger())
.build();
String schemaProperty = context.getProperty(SCHEMA)
.evaluateAttributeExpressions(incomingCSV)
.getValue();

View File

@ -18,12 +18,19 @@
*/
package org.apache.nifi.processors.kite;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.nifi.processor.ProcessContext;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -32,6 +39,9 @@ import org.junit.Assert;
import org.junit.Test;
import static org.apache.nifi.processors.kite.TestUtil.streamFor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TestCSVToAvroProcessor {
@ -219,50 +229,6 @@ public class TestCSVToAvroProcessor {
"No incoming records", incompatible.getAttribute("errors"));
}
@Test
public void testCSVProperties() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
ConvertCSVToAvro processor = new ConvertCSVToAvro();
ProcessContext context = runner.getProcessContext();
// check defaults
processor.createCSVProperties(context);
Assert.assertEquals("Charset should match",
"utf8", processor.props.charset);
Assert.assertEquals("Delimiter should match",
",", processor.props.delimiter);
Assert.assertEquals("Quote should match",
"\"", processor.props.quote);
Assert.assertEquals("Escape should match",
"\\", processor.props.escape);
Assert.assertEquals("Header flag should match",
false, processor.props.useHeader);
Assert.assertEquals("Lines to skip should match",
0, processor.props.linesToSkip);
runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
runner.setProperty(ConvertCSVToAvro.DELIMITER, "|");
runner.setProperty(ConvertCSVToAvro.QUOTE, "'");
runner.setProperty(ConvertCSVToAvro.ESCAPE, "\u2603");
runner.setProperty(ConvertCSVToAvro.HAS_HEADER, "true");
runner.setProperty(ConvertCSVToAvro.LINES_TO_SKIP, "2");
// check updates
processor.createCSVProperties(context);
Assert.assertEquals("Charset should match",
"utf16", processor.props.charset);
Assert.assertEquals("Delimiter should match",
"|", processor.props.delimiter);
Assert.assertEquals("Quote should match",
"'", processor.props.quote);
Assert.assertEquals("Escape should match",
"\u2603", processor.props.escape);
Assert.assertEquals("Header flag should match",
true, processor.props.useHeader);
Assert.assertEquals("Lines to skip should match",
2, processor.props.linesToSkip);
}
@Test
public void testBasicConversionNoErrors() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
@ -282,4 +248,53 @@ public class TestCSVToAvroProcessor {
runner.assertTransferCount("failure", 0);
runner.assertTransferCount("incompatible", 0);
}
@Test
public void testExpressionLanguageBasedCSVProperties() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
runner.assertNotValid();
runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
runner.assertValid();
runner.setProperty(ConvertCSVToAvro.DELIMITER, "${csv.delimiter}");
runner.setProperty(ConvertCSVToAvro.QUOTE, "${csv.quote}");
HashMap<String, String> flowFileAttributes = new HashMap<String,String>();
flowFileAttributes.put("csv.delimiter", "|");
flowFileAttributes.put("csv.quote", "~");
runner.enqueue(streamFor("1|green\n2|~blue|field~|\n3|grey|12.95"), flowFileAttributes);
runner.run();
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 3 rows", 3, converted);
Assert.assertEquals("Should reject 0 row", 0, errors);
runner.assertTransferCount("success", 1);
runner.assertTransferCount("failure", 0);
runner.assertTransferCount("incompatible", 0);
final InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship("success").get(0).toByteArray());
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
assertTrue(dataFileReader.hasNext());
GenericRecord record = dataFileReader.next();
assertEquals(1L, record.get("id"));
assertEquals("green", record.get("color").toString());
assertNull(record.get("price"));
assertTrue(dataFileReader.hasNext());
record = dataFileReader.next();
assertEquals(2L, record.get("id"));
assertEquals("blue|field", record.get("color").toString());
assertNull(record.get("price"));
assertTrue(dataFileReader.hasNext());
record = dataFileReader.next();
assertEquals(3L, record.get("id"));
assertEquals("grey", record.get("color").toString());
assertEquals(12.95, record.get("price"));
}
}
}