NIFI-6318: Support EL in CSV formatting properties

CSVReader and CSVRecordSetWriter controller services and also ConvertExcelToCSVProcessor
support EL for Value Separator, Quote Character and Escape Character properties.

NIFI-6318: Fixed null checks and compound OR expression.

NIFI-6318: RecordSetWriterFactory.createWriter() changes.

NIFI-6318: Initialize CSVFormat in onEnabled() if there are no dynamic formatting properties.

NIFI-6318: Comment Marker supports EL.

NIFI-6318: Various review changes.

This closes #3504.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Peter Turcsanyi 2019-05-28 00:22:25 +02:00 committed by Koji Kawamura
parent 6d5acf1cb2
commit c6e6a418aa
No known key found for this signature in database
GPG Key ID: 36136B0EC89E4758
65 changed files with 1876 additions and 1330 deletions

View File

@ -294,7 +294,7 @@ public class MockPropertyValue implements PropertyValue {
@Override
public boolean isExpressionLanguagePresent() {
if (!Boolean.TRUE.equals(expectExpressions)) {
if (rawValue == null) {
return false;
}

View File

@ -465,7 +465,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
// Initialize the writer when the first record is read.
final RecordSchema readerSchema = record.getSchema();
final RecordSchema writeSchema = writerFactory.getSchema(schemaRetrievalVariables, readerSchema);
writer = writerFactory.createWriter(logger, writeSchema, out);
writer = writerFactory.createWriter(logger, writeSchema, out, flowFile);
writer.beginRecordSet();
}

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.azure.eventhub;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@ -180,7 +181,7 @@ public class TestConsumeAzureEventHub {
processor.setWriterFactory(writerFactory);
final RecordSetWriter writer = mock(RecordSetWriter.class);
final AtomicReference<OutputStream> outRef = new AtomicReference<>();
when(writerFactory.createWriter(any(), any(), any())).thenAnswer(invocation -> {
when(writerFactory.createWriter(any(), any(), any(), any(FlowFile.class))).thenAnswer(invocation -> {
outRef.set(invocation.getArgument(2));
return writer;
});

View File

@ -187,11 +187,11 @@ public class PutDruidRecord extends AbstractSessionFactoryProcessor {
final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger());
final RecordSchema outSchema = writerFactory.getSchema(attributes, reader.getSchema());
droppedRecordWriter = writerFactory.createWriter(log, outSchema, droppedOutputStream);
droppedRecordWriter = writerFactory.createWriter(log, outSchema, droppedOutputStream, flowFile);
droppedRecordWriter.beginRecordSet();
failedRecordWriter = writerFactory.createWriter(log, outSchema, failedOutputStream);
failedRecordWriter = writerFactory.createWriter(log, outSchema, failedOutputStream, flowFile);
failedRecordWriter.beginRecordSet();
successfulRecordWriter = writerFactory.createWriter(log, outSchema, successfulOutputStream);
successfulRecordWriter = writerFactory.createWriter(log, outSchema, successfulOutputStream, flowFile);
successfulRecordWriter.beginRecordSet();
Record r;

View File

@ -558,8 +558,8 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
final RecordSchema schema = writerFactory.getSchema(inputFlowFile.getAttributes(), reader.getSchema());
try (final RecordSetWriter successWriter = writerFactory.createWriter(getLogger(), schema, successOut);
final RecordSetWriter failedWriter = writerFactory.createWriter(getLogger(), schema, failedOut)) {
try (final RecordSetWriter successWriter = writerFactory.createWriter(getLogger(), schema, successOut, successFlowFile);
final RecordSetWriter failedWriter = writerFactory.createWriter(getLogger(), schema, failedOut, failedFlowFile)) {
successWriter.beginRecordSet();
failedWriter.beginRecordSet();

View File

@ -199,7 +199,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile.getAttributes(),
record == null ? null : record.getSchema());
try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, out)) {
try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, out, originalFlowFile)) {
recordSetWriter.beginRecordSet();
if (record != null) {
recordSetWriter.write(record);

View File

@ -30,8 +30,8 @@ import java.util.List;
import java.util.Map;
/**
* An implementation that is suitable for testing that does not serialize the data to an Output Stream but insted just buffers the data into an
* ArrayList and then provides that List of written records to the user
* An implementation that is suitable for testing that does not serialize the data to an Output Stream but instead just buffers the data into an
* ArrayList and then provides that List of written records to the user.
*/
public class ArrayListRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
private final List<Record> records = new ArrayList<>();
@ -48,7 +48,7 @@ public class ArrayListRecordWriter extends AbstractControllerService implements
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out, final Map<String, String> variables) {
return new ArrayListRecordSetWriter(records);
}

View File

@ -70,7 +70,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream rawOut) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream rawOut, Map<String, String> variables) {
final OutputStream out = bufferOutput ? new BufferedOutputStream(rawOut) : rawOut;
return new RecordSetWriter() {

View File

@ -26,9 +26,15 @@ import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class CSVUtils {
private static Logger LOG = LoggerFactory.getLogger(CSVUtils.class);
public static final AllowableValue CUSTOM = new AllowableValue("custom", "Custom Format",
"The format of the CSV is configured by using the properties of this Controller Service, such as Value Separator");
public static final AllowableValue RFC_4180 = new AllowableValue("rfc-4180", "RFC 4180", "CSV data follows the RFC 4180 Specification defined at https://tools.ietf.org/html/rfc4180");
@ -49,17 +55,19 @@ public class CSVUtils {
.build();
public static final PropertyDescriptor VALUE_SEPARATOR = new PropertyDescriptor.Builder()
.name("Value Separator")
.description("The character that is used to separate values/fields in a CSV Record")
.description("The character that is used to separate values/fields in a CSV Record. If the property has been specified via Expression Language " +
"but the expression gets evaluated to an invalid Value Separator at runtime, then it will be skipped and the default Value Separator will be used.")
.addValidator(CSVValidators.UNESCAPED_SINGLE_CHAR_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue(",")
.required(true)
.build();
public static final PropertyDescriptor QUOTE_CHAR = new PropertyDescriptor.Builder()
.name("Quote Character")
.description("The character that is used to quote values so that escape characters do not have to be used")
.description("The character that is used to quote values so that escape characters do not have to be used. If the property has been specified via Expression Language " +
"but the expression gets evaluated to an invalid Quote Character at runtime, then it will be skipped and the default Quote Character will be used.")
.addValidator(new CSVValidators.SingleCharacterValidator())
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("\"")
.required(true)
.build();
@ -92,14 +100,15 @@ public class CSVUtils {
.name("Comment Marker")
.description("The character that is used to denote the start of a comment. Any line that begins with this comment will be ignored.")
.addValidator(new CSVValidators.SingleCharacterValidator())
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final PropertyDescriptor ESCAPE_CHAR = new PropertyDescriptor.Builder()
.name("Escape Character")
.description("The character that is used to escape characters that would otherwise have a specific meaning to the CSV Parser.")
.description("The character that is used to escape characters that would otherwise have a specific meaning to the CSV Parser. If the property has been specified via Expression Language " +
"but the expression gets evaluated to an invalid Escape Character at runtime, then it will be skipped and the default Escape Character will be used.")
.addValidator(new CSVValidators.SingleCharacterValidator())
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("\\")
.required(true)
.build();
@ -168,10 +177,19 @@ public class CSVUtils {
.required(true)
.build();
public static CSVFormat createCSVFormat(final PropertyContext context) {
public static boolean isDynamicCSVFormat(final PropertyContext context) {
final String formatName = context.getProperty(CSV_FORMAT).getValue();
return formatName.equalsIgnoreCase(CUSTOM.getValue())
&& (context.getProperty(VALUE_SEPARATOR).isExpressionLanguagePresent()
|| context.getProperty(QUOTE_CHAR).isExpressionLanguagePresent()
|| context.getProperty(ESCAPE_CHAR).isExpressionLanguagePresent()
|| context.getProperty(COMMENT_MARKER).isExpressionLanguagePresent());
}
public static CSVFormat createCSVFormat(final PropertyContext context, final Map<String, String> variables) {
final String formatName = context.getProperty(CSV_FORMAT).getValue();
if (formatName.equalsIgnoreCase(CUSTOM.getValue())) {
return buildCustomFormat(context);
return buildCustomFormat(context, variables);
}
if (formatName.equalsIgnoreCase(RFC_4180.getValue())) {
return CSVFormat.RFC4180;
@ -190,50 +208,87 @@ public class CSVUtils {
}
}
private static char getUnescapedChar(final PropertyContext context, final PropertyDescriptor property) {
return StringEscapeUtils.unescapeJava(context.getProperty(property).getValue()).charAt(0);
private static Character getCharUnescapedJava(final PropertyContext context, final PropertyDescriptor property, final Map<String, String> variables) {
String value = context.getProperty(property).evaluateAttributeExpressions(variables).getValue();
if (value != null) {
String unescaped = unescapeJava(value);
if (unescaped.length() == 1) {
return unescaped.charAt(0);
}
}
LOG.warn("'{}' property evaluated to an invalid value: \"{}\". It must be a single character. The property value will be ignored.", property.getName(), value);
if (property.getDefaultValue() != null) {
return property.getDefaultValue().charAt(0);
} else {
return null;
}
}
private static char getChar(final PropertyContext context, final PropertyDescriptor property) {
return CSVUtils.unescape(context.getProperty(property).getValue()).charAt(0);
private static Character getCharUnescaped(final PropertyContext context, final PropertyDescriptor property, final Map<String, String> variables) {
String value = context.getProperty(property).evaluateAttributeExpressions(variables).getValue();
if (value != null) {
String unescaped = unescape(value);
if (unescaped.length() == 1) {
return unescaped.charAt(0);
}
}
LOG.warn("'{}' property evaluated to an invalid value: \"{}\". It must be a single character. The property value will be ignored.", property.getName(), value);
if (property.getDefaultValue() != null) {
return property.getDefaultValue().charAt(0);
} else {
return null;
}
}
private static CSVFormat buildCustomFormat(final PropertyContext context) {
final char valueSeparator = getUnescapedChar(context, VALUE_SEPARATOR);
private static CSVFormat buildCustomFormat(final PropertyContext context, final Map<String, String> variables) {
final Character valueSeparator = getCharUnescapedJava(context, VALUE_SEPARATOR, variables);
CSVFormat format = CSVFormat.newFormat(valueSeparator)
.withAllowMissingColumnNames()
.withIgnoreEmptyLines();
final PropertyValue skipHeaderPropertyValue = context.getProperty(FIRST_LINE_IS_HEADER);
if (skipHeaderPropertyValue.getValue() != null && skipHeaderPropertyValue.asBoolean()) {
final PropertyValue firstLineIsHeaderPropertyValue = context.getProperty(FIRST_LINE_IS_HEADER);
if (firstLineIsHeaderPropertyValue.getValue() != null && firstLineIsHeaderPropertyValue.asBoolean()) {
format = format.withFirstRecordAsHeader();
}
format = format.withQuote(getChar(context, QUOTE_CHAR));
format = format.withEscape(getChar(context, ESCAPE_CHAR));
final Character quoteChar = getCharUnescaped(context, QUOTE_CHAR, variables);
format = format.withQuote(quoteChar);
final Character escapeChar = getCharUnescaped(context, ESCAPE_CHAR, variables);
format = format.withEscape(escapeChar);
format = format.withTrim(context.getProperty(TRIM_FIELDS).asBoolean());
if (context.getProperty(COMMENT_MARKER).isSet()) {
format = format.withCommentMarker(getChar(context, COMMENT_MARKER));
final Character commentMarker = getCharUnescaped(context, COMMENT_MARKER, variables);
if (commentMarker != null) {
format = format.withCommentMarker(commentMarker);
}
}
if (context.getProperty(NULL_STRING).isSet()) {
format = format.withNullString(CSVUtils.unescape(context.getProperty(NULL_STRING).getValue()));
format = format.withNullString(unescape(context.getProperty(NULL_STRING).getValue()));
}
final PropertyValue quoteValue = context.getProperty(QUOTE_MODE);
if (quoteValue != null) {
if (quoteValue != null && quoteValue.isSet()) {
final QuoteMode quoteMode = QuoteMode.valueOf(quoteValue.getValue());
format = format.withQuoteMode(quoteMode);
}
final PropertyValue trailingDelimiterValue = context.getProperty(TRAILING_DELIMITER);
if (trailingDelimiterValue != null) {
if (trailingDelimiterValue != null && trailingDelimiterValue.isSet()) {
final boolean trailingDelimiter = trailingDelimiterValue.asBoolean();
format = format.withTrailingDelimiter(trailingDelimiter);
}
final PropertyValue recordSeparator = context.getProperty(RECORD_SEPARATOR);
if (recordSeparator != null) {
if (recordSeparator != null && recordSeparator.isSet()) {
final String separator = unescape(recordSeparator.getValue());
format = format.withRecordSeparator(separator);
}
@ -241,6 +296,13 @@ public class CSVUtils {
return format;
}
public static String unescapeJava(String input) {
if (input != null && input.length() > 1) {
input = StringEscapeUtils.unescapeJava(input);
}
return input;
}
public static String unescape(final String input) {
if (input == null) {

View File

@ -17,7 +17,6 @@
package org.apache.nifi.csv;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
@ -47,23 +46,25 @@ public class CSVValidators {
.build();
}
final String unescaped = CSVUtils.unescape(input);
if (unescaped.length() != 1) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation("Value must be exactly 1 character but was " + input.length() + " in length")
.build();
}
if (!context.isExpressionLanguageSupported(subject) || !context.isExpressionLanguagePresent(input)) {
final String unescaped = CSVUtils.unescape(input);
if (unescaped.length() != 1) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation("Value must be exactly 1 character but was " + input.length() + " in length")
.build();
}
if (illegalChars.contains(unescaped)) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation(input + " is not a valid character for this property")
.build();
if (illegalChars.contains(unescaped)) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation(input + " is not a valid character for this property")
.build();
}
}
return new ValidationResult.Builder()
@ -88,22 +89,16 @@ public class CSVValidators {
.build();
}
String unescapeString = unescapeString(input);
String unescaped = CSVUtils.unescapeJava(input);
return new ValidationResult.Builder()
.subject(subject)
.input(unescapeString)
.input(unescaped)
.explanation("Only non-null single characters are supported")
.valid((unescapeString.length() == 1 && unescapeString.charAt(0) != 0) || context.isExpressionLanguagePresent(input))
.valid((unescaped.length() == 1 && unescaped.charAt(0) != 0)
|| (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)))
.build();
}
private String unescapeString(String input) {
if (input != null && input.length() > 1) {
input = StringEscapeUtils.unescapeJava(input);
}
return input;
}
};
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.csv;
import org.apache.commons.csv.CSVFormat;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.util.MockConfigurationContext;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class CSVUtilsTest {
@Test
public void testIsDynamicCSVFormatWithStaticProperties() {
PropertyContext context = createContext("|", "'", "^", "~");
boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);
assertFalse(isDynamicCSVFormat);
}
@Test
public void testIsDynamicCSVFormatWithDynamicValueSeparator() {
PropertyContext context = createContext("${csv.delimiter}", "'", "^", "~");
boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);
assertTrue(isDynamicCSVFormat);
}
@Test
public void testIsDynamicCSVFormatWithDynamicQuoteCharacter() {
PropertyContext context = createContext("|", "${csv.quote}", "^", "~");
boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);
assertTrue(isDynamicCSVFormat);
}
@Test
public void testIsDynamicCSVFormatWithDynamicEscapeCharacter() {
PropertyContext context = createContext("|", "'", "${csv.escape}", "~");
boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);
assertTrue(isDynamicCSVFormat);
}
@Test
public void testIsDynamicCSVFormatWithDynamicCommentMarker() {
PropertyContext context = createContext("|", "'", "^", "${csv.comment}");
boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);
assertTrue(isDynamicCSVFormat);
}
@Test
public void testCustomFormat() {
PropertyContext context = createContext("|", "'", "^", "~");
CSVFormat csvFormat = CSVUtils.createCSVFormat(context, Collections.emptyMap());
assertEquals('|', csvFormat.getDelimiter());
assertEquals('\'', (char) csvFormat.getQuoteCharacter());
assertEquals('^', (char) csvFormat.getEscapeCharacter());
assertEquals('~', (char) csvFormat.getCommentMarker());
}
@Test
public void testCustomFormatWithEL() {
PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}");
Map<String, String> attributes = new HashMap<>();
attributes.put("csv.delimiter", "|");
attributes.put("csv.quote", "'");
attributes.put("csv.escape", "^");
attributes.put("csv.comment", "~");
CSVFormat csvFormat = CSVUtils.createCSVFormat(context, attributes);
assertEquals('|', csvFormat.getDelimiter());
assertEquals('\'', (char) csvFormat.getQuoteCharacter());
assertEquals('^', (char) csvFormat.getEscapeCharacter());
assertEquals('~', (char) csvFormat.getCommentMarker());
}
@Test
public void testCustomFormatWithELEmptyValues() {
PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}");
CSVFormat csvFormat = CSVUtils.createCSVFormat(context, Collections.emptyMap());
assertEquals(',', csvFormat.getDelimiter());
assertEquals('"', (char) csvFormat.getQuoteCharacter());
assertEquals('\\', (char) csvFormat.getEscapeCharacter());
assertNull(csvFormat.getCommentMarker());
}
@Test
public void testCustomFormatWithELInvalidValues() {
PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}");
Map<String, String> attributes = new HashMap<>();
attributes.put("csv.delimiter", "invalid");
attributes.put("csv.quote", "invalid");
attributes.put("csv.escape", "invalid");
attributes.put("csv.comment", "invalid");
CSVFormat csvFormat = CSVUtils.createCSVFormat(context, attributes);
assertEquals(',', csvFormat.getDelimiter());
assertEquals('"', (char) csvFormat.getQuoteCharacter());
assertEquals('\\', (char) csvFormat.getEscapeCharacter());
assertNull(csvFormat.getCommentMarker());
}
private PropertyContext createContext(String valueSeparator, String quoteChar, String escapeChar, String commentMarker) {
Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(CSVUtils.VALUE_SEPARATOR, valueSeparator);
properties.put(CSVUtils.QUOTE_CHAR, quoteChar);
properties.put(CSVUtils.ESCAPE_CHAR, escapeChar);
properties.put(CSVUtils.COMMENT_MARKER, commentMarker);
return new MockConfigurationContext(properties, null);
}
}

View File

@ -312,7 +312,7 @@ public class JoltTransformRecord extends AbstractProcessor {
final Record firstRecord = reader.nextRecord();
if (firstRecord == null) {
try (final OutputStream out = session.write(transformed);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out)) {
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, transformed)) {
writer.beginRecordSet();
writeResult = writer.finishRecordSet();
@ -339,7 +339,7 @@ public class JoltTransformRecord extends AbstractProcessor {
// and instead use a Map<RecordSchema, RecordSetWriter>. This way, even if many different output schemas are possible,
// the output FlowFiles will each only contain records that have the same schema.
try (final OutputStream out = session.write(transformed);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, transformed)) {
writer.beginRecordSet();

View File

@ -508,7 +508,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
throw new ProcessException(e);
}
writer = writerFactory.createWriter(logger, writeSchema, rawOut);
writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
writer.beginRecordSet();
tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);

View File

@ -122,7 +122,7 @@ public class PublisherLease implements Closeable {
recordCount++;
baos.reset();
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowFile)) {
writer.write(record);
writer.flush();
}

View File

@ -282,11 +282,11 @@ public class TestPublisherLease {
final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class);
final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any(), eq(flowFile))).thenReturn(writer);
lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic);
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any());
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any(), eq(flowFile));
verify(writer, times(2)).write(any(Record.class));
verify(producer, times(2)).send(any(), any());
}

View File

@ -57,7 +57,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out, final Map<String, String> variables) {
return new RecordSetWriter() {
@Override

View File

@ -558,7 +558,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
throw new ProcessException(e);
}
writer = writerFactory.createWriter(logger, writeSchema, rawOut);
writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
writer.beginRecordSet();
tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);

View File

@ -167,7 +167,7 @@ public class PublisherLease implements Closeable {
baos.reset();
Map<String, String> additionalAttributes = Collections.emptyMap();
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowFile)) {
final WriteResult writeResult = writer.write(record);
additionalAttributes = writeResult.getAttributes();
writer.flush();

View File

@ -277,11 +277,11 @@ public class TestPublisherLease {
final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap()));
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any(), eq(flowFile))).thenReturn(writer);
lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic);
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any());
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any(), eq(flowFile));
verify(writer, times(2)).write(any(Record.class));
verify(producer, times(2)).send(any(), any());
}

View File

@ -57,7 +57,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out, final Map<String, String> variables) {
return new RecordSetWriter() {
@Override

View File

@ -558,7 +558,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
throw new ProcessException(e);
}
writer = writerFactory.createWriter(logger, writeSchema, rawOut);
writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
writer.beginRecordSet();
tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);

View File

@ -166,7 +166,7 @@ public class PublisherLease implements Closeable {
baos.reset();
Map<String, String> additionalAttributes = Collections.emptyMap();
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowFile)) {
final WriteResult writeResult = writer.write(record);
additionalAttributes = writeResult.getAttributes();
writer.flush();

View File

@ -277,11 +277,11 @@ public class TestPublisherLease {
final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap()));
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any(), eq(flowFile))).thenReturn(writer);
lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic);
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any());
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any(), eq(flowFile));
verify(writer, times(2)).write(any(Record.class));
verify(producer, times(2)).send(any(), any());
}

View File

@ -57,7 +57,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out, final Map<String, String> variables) {
return new RecordSetWriter() {
@Override

View File

@ -558,7 +558,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
throw new ProcessException(e);
}
writer = writerFactory.createWriter(logger, writeSchema, rawOut);
writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
writer.beginRecordSet();
tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);

View File

@ -166,7 +166,7 @@ public class PublisherLease implements Closeable {
baos.reset();
Map<String, String> additionalAttributes = Collections.emptyMap();
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowFile)) {
final WriteResult writeResult = writer.write(record);
additionalAttributes = writeResult.getAttributes();
writer.flush();

View File

@ -278,11 +278,11 @@ public class TestPublisherLease {
final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap()));
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any(), eq(flowFile))).thenReturn(writer);
lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic);
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any());
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any(), eq(flowFile));
verify(writer, times(2)).write(any(Record.class));
verify(producer, times(2)).send(any(), any());
}

View File

@ -57,7 +57,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out, final Map<String, String> variables) {
return new RecordSetWriter() {
@Override

View File

@ -165,7 +165,7 @@ public class GetMongoRecord extends AbstractMongoQueryProcessor {
put("schema.name", schemaName);
}};
RecordSchema schema = writerFactory.getSchema(attrs, null);
RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out);
RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attrs);
long count = 0L;
writer.beginRecordSet();
while (cursor.hasNext()) {

View File

@ -230,7 +230,8 @@ public class FetchParquetTest {
final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class);
when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory");
when(recordSetWriterFactory.createWriter(any(ComponentLog.class), AdditionalMatchers.or(any(RecordSchema.class), isNull()), any(OutputStream.class))).thenReturn(recordSetWriter);
when(recordSetWriterFactory.createWriter(any(ComponentLog.class), AdditionalMatchers.or(any(RecordSchema.class), isNull()), any(OutputStream.class), any(FlowFile.class)))
.thenReturn(recordSetWriter);
testRunner.addControllerService("mock-writer-factory", recordSetWriterFactory);
testRunner.enableControllerService(recordSetWriterFactory);

View File

@ -202,7 +202,7 @@ public class ConvertExcelToCSVProcessor
final String desiredSheetsDelimited = context.getProperty(DESIRED_SHEETS).evaluateAttributeExpressions(flowFile).getValue();
final boolean formatValues = context.getProperty(FORMAT_VALUES).asBoolean();
final CSVFormat csvFormat = CSVUtils.createCSVFormat(context);
final CSVFormat csvFormat = CSVUtils.createCSVFormat(context, flowFile.getAttributes());
//Switch to 0 based index
final int firstRow = context.getProperty(ROWS_TO_SKIP).evaluateAttributeExpressions(flowFile).asInteger() - 1;

View File

@ -167,7 +167,7 @@ public class ConvertExcelToCSVProcessorTest {
public void testSkipRowsWithEL() throws Exception {
Map<String, String> attributes = new HashMap<String, String>();
attributes.put("rowsToSkip", "2");
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath(),attributes);
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath(), attributes);
testRunner.setProperty(ConvertExcelToCSVProcessor.ROWS_TO_SKIP, "${rowsToSkip}");
testRunner.setProperty(ConvertExcelToCSVProcessor.FORMAT_VALUES, "true");
@ -224,7 +224,7 @@ public class ConvertExcelToCSVProcessorTest {
public void testSkipColumnsWithEL() throws Exception {
Map<String, String> attributes = new HashMap<String, String>();
attributes.put("columnsToSkip", "2");
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath(),attributes);
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath(), attributes);
testRunner.setProperty(ConvertExcelToCSVProcessor.COLUMNS_TO_SKIP, "${columnsToSkip}");
testRunner.setProperty(ConvertExcelToCSVProcessor.FORMAT_VALUES, "true");
@ -280,6 +280,100 @@ public class ConvertExcelToCSVProcessorTest {
"9.8765E+08||\r\n");
}
@Test
public void testCustomValueSeparatorWithEL() throws Exception {
Map<String, String> attributes = new HashMap<String, String>();
attributes.put("csv.delimiter", "|");
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath(), attributes);
testRunner.setProperty(CSVUtils.VALUE_SEPARATOR, "${csv.delimiter}");
testRunner.setProperty(ConvertExcelToCSVProcessor.FORMAT_VALUES, "true");
testRunner.run();
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.SUCCESS, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.ORIGINAL, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
assertTrue(rowsSheet == 9);
LocalDateTime localDt = LocalDateTime.of(2017, 1, 1, 12, 0, 0);
ff.assertContentEquals("Numbers|Timestamps|Money\n" +
"1234.456|" + DateTimeFormatter.ofPattern("d/M/yy").format(localDt) + "|$ 123.45\n" +
"1234.46|" + DateTimeFormatter.ofPattern("hh:mm:ss a").format(localDt) + "|£ 123.45\n" +
"1234.5|" + DateTimeFormatter.ofPattern("EEEE, MMMM dd, yyyy").format(localDt) + "|¥ 123.45\n" +
"1,234.46|" + DateTimeFormatter.ofPattern("d/M/yy HH:mm").format(localDt) + "|$ 1,023.45\n" +
"1,234.4560|" + DateTimeFormatter.ofPattern("hh:mm a").format(localDt) + "|£ 1,023.45\n" +
"9.88E+08|" + DateTimeFormatter.ofPattern("yyyy/MM/dd/ HH:mm").format(localDt) + "|¥ 1,023.45\n" +
"9.877E+08||\n" +
"9.8765E+08||\n");
}
@Test
public void testCustomQuoteCharWithEL() throws Exception {
Map<String, String> attributes = new HashMap<String, String>();
attributes.put("csv.quote", "'");
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath(), attributes);
testRunner.setProperty(CSVUtils.QUOTE_CHAR, "${csv.quote}");
testRunner.setProperty(ConvertExcelToCSVProcessor.FORMAT_VALUES, "true");
testRunner.setProperty(CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_ALL);
testRunner.run();
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.SUCCESS, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.ORIGINAL, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
assertTrue(rowsSheet == 9);
LocalDateTime localDt = LocalDateTime.of(2017, 1, 1, 12, 0, 0);
ff.assertContentEquals("'Numbers','Timestamps','Money'\n" +
"'1234.456','" + DateTimeFormatter.ofPattern("d/M/yy").format(localDt) + "','$ 123.45'\n" +
"'1234.46','" + DateTimeFormatter.ofPattern("hh:mm:ss a").format(localDt) + "','£ 123.45'\n" +
"'1234.5','" + DateTimeFormatter.ofPattern("EEEE, MMMM dd, yyyy").format(localDt) + "','¥ 123.45'\n" +
"'1,234.46','" + DateTimeFormatter.ofPattern("d/M/yy HH:mm").format(localDt) + "','$ 1,023.45'\n" +
"'1,234.4560','" + DateTimeFormatter.ofPattern("hh:mm a").format(localDt) + "','£ 1,023.45'\n" +
"'9.88E+08','" + DateTimeFormatter.ofPattern("yyyy/MM/dd/ HH:mm").format(localDt) + "','¥ 1,023.45'\n" +
"'9.877E+08',,\n" +
"'9.8765E+08',,\n");
}
@Test
public void testCustomEscapeCharWithEL() throws Exception {
Map<String, String> attributes = new HashMap<String, String>();
attributes.put("csv.escape", "^");
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath(), attributes);
testRunner.setProperty(CSVUtils.ESCAPE_CHAR, "${csv.escape}");
testRunner.setProperty(ConvertExcelToCSVProcessor.FORMAT_VALUES, "true");
testRunner.run();
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.SUCCESS, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.ORIGINAL, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
assertTrue(rowsSheet == 9);
LocalDateTime localDt = LocalDateTime.of(2017, 1, 1, 12, 0, 0);
ff.assertContentEquals("Numbers,Timestamps,Money\n" +
"1234.456," + DateTimeFormatter.ofPattern("d/M/yy").format(localDt) + ",$ 123.45\n" +
"1234.46," + DateTimeFormatter.ofPattern("hh:mm:ss a").format(localDt) + ",£ 123.45\n" +
"1234.5," + DateTimeFormatter.ofPattern("EEEE^, MMMM dd^, yyyy").format(localDt) + ",¥ 123.45\n" +
"1^,234.46," + DateTimeFormatter.ofPattern("d/M/yy HH:mm").format(localDt) + ",$ 1^,023.45\n" +
"1^,234.4560," + DateTimeFormatter.ofPattern("hh:mm a").format(localDt) + ",£ 1^,023.45\n" +
"9.88E+08," + DateTimeFormatter.ofPattern("yyyy/MM/dd/ HH:mm").format(localDt) + ",¥ 1^,023.45\n" +
"9.877E+08,,\n" +
"9.8765E+08,,\n");
}
/**
* Validates that all sheets in the Excel document are exported.
*

View File

@ -62,10 +62,10 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
@Override
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException {
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out, Map<String, String> variables) throws SchemaNotFoundException, IOException {
if (recordFactory.get() != null) {
try {
return recordFactory.get().createWriter(logger, schema, out);
return recordFactory.get().createWriter(logger, schema, out, variables);
} catch (UndeclaredThrowableException ute) {
throw new IOException(ute.getCause());
}

View File

@ -102,7 +102,7 @@ class ScriptedRecordSetWriterTest {
def schema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null)
ByteArrayOutputStream outputStream = new ByteArrayOutputStream()
RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema, outputStream)
RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema, outputStream, Collections.emptyMap())
assertNotNull(recordSetWriter)
def recordSchema = new SimpleRecordSchema(

View File

@ -104,7 +104,7 @@ class GroovyRecordSetWriterFactory extends AbstractControllerService implements
}
@Override
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException {
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out, Map<String, String> variables) throws SchemaNotFoundException, IOException {
return new GroovyRecordSetWriter(out)
}

View File

@ -285,7 +285,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
final RecordSchema writeSchema = writerFactory.getSchema(null, recordSchema);
final ByteArrayOutputStream out = new ByteArrayOutputStream();
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, attributes)) {
writer.beginRecordSet();
Record record;

View File

@ -376,11 +376,12 @@ public class GetSolr extends SolrProcessor {
final RecordSchema schema = writerFactory.getSchema(null, null);
final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema);
final StringBuffer mimeType = new StringBuffer();
final FlowFile flowFileRef = flowFile;
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try {
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, flowFileRef);
writer.write(recordSet);
writer.flush();
mimeType.append(writer.getMimeType());

View File

@ -432,8 +432,9 @@ public class QuerySolr extends SolrProcessor {
final RecordSchema schema = writerFactory.getSchema(flowFileResponse.getAttributes(), null);
final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema);
final StringBuffer mimeType = new StringBuffer();
final FlowFile flowFileResponseRef = flowFileResponse;
flowFileResponse = session.write(flowFileResponse, out -> {
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out)) {
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, flowFileResponseRef)) {
writer.write(recordSet);
writer.flush();
mimeType.append(writer.getMimeType());

View File

@ -131,7 +131,7 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
Record firstRecord = reader.nextRecord();
if (firstRecord == null) {
final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, originalAttributes)) {
writer.beginRecordSet();
final WriteResult writeResult = writer.finishRecordSet();
@ -147,7 +147,7 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
firstRecord = AbstractRecordProcessor.this.process(firstRecord, original, context);
final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, firstRecord.getSchema());
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, originalAttributes)) {
writer.beginRecordSet();
writer.write(firstRecord);

View File

@ -219,11 +219,11 @@ public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
Tuple<FlowFile, RecordSetWriter> tuple = writers.get(relationship);
if (tuple == null) {
FlowFile outFlowFile = session.create(original);
final FlowFile outFlowFile = session.create(original);
final OutputStream out = session.write(outFlowFile);
final RecordSchema recordWriteSchema = writerFactory.getSchema(originalAttributes, record.getSchema());
recordSetWriter = writerFactory.createWriter(getLogger(), recordWriteSchema, out);
recordSetWriter = writerFactory.createWriter(getLogger(), recordWriteSchema, out, outFlowFile);
recordSetWriter.beginRecordSet();
tuple = new Tuple<>(outFlowFile, recordSetWriter);

View File

@ -243,7 +243,7 @@ public class ForkRecord extends AbstractProcessor {
final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
final OutputStream out = session.write(outFlowFile);
try (final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, out)) {
try (final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, out, outFlowFile)) {
recordSetWriter.beginRecordSet();

View File

@ -379,7 +379,7 @@ public class ListenTCPRecord extends AbstractProcessor {
final RecordSchema recordSchema = recordSetWriterFactory.getSchema(Collections.EMPTY_MAP, record.getSchema());
try (final OutputStream out = session.write(flowFile);
final RecordSetWriter recordWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, out)) {
final RecordSetWriter recordWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, out, flowFile)) {
// start the record set and write the first record from above
recordWriter.beginRecordSet();

View File

@ -274,7 +274,7 @@ public class ListenUDPRecord extends AbstractListenEventProcessor<StandardEvent>
final RecordSchema recordSchema = firstRecord.getSchema();
final RecordSchema writeSchema = writerFactory.getSchema(Collections.emptyMap(), recordSchema);
writer = writerFactory.createWriter(getLogger(), writeSchema, rawOut);
writer = writerFactory.createWriter(getLogger(), writeSchema, rawOut, flowFile);
writer.beginRecordSet();
flowFileRecordWriter = new FlowFileRecordWriter(flowFile, writer);

View File

@ -230,7 +230,7 @@ public class PartitionRecord extends AbstractProcessor {
final OutputStream out = session.write(childFlowFile);
writer = writerFactory.createWriter(getLogger(), writeSchema, out);
writer = writerFactory.createWriter(getLogger(), writeSchema, out, childFlowFile);
writer.beginRecordSet();
writerMap.put(recordValueMap, writer);
}

View File

@ -336,7 +336,7 @@ public class QueryRecord extends AbstractProcessor {
throw new ProcessException(e);
}
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), writeSchema, out)) {
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), writeSchema, out, original)) {
writeResultRef.set(resultSetWriter.write(recordSet));
mimeTypeRef.set(resultSetWriter.getMimeType());
} catch (final Exception e) {

View File

@ -169,7 +169,7 @@ public class SplitRecord extends AbstractProcessor {
final WriteResult writeResult;
try (final OutputStream out = session.write(split);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out)) {
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, split)) {
if (maxRecords == 1) {
final Record record = pushbackSet.next();
writeResult = writer.write(record);

View File

@ -445,7 +445,7 @@ public class ValidateRecord extends AbstractProcessor {
}
final OutputStream out = session.write(flowFile);
final RecordSetWriter created = factory.createWriter(getLogger(), outputSchema, out);
final RecordSetWriter created = factory.createWriter(getLogger(), outputSchema, out, flowFile);
created.beginRecordSet();
return created;
}

View File

@ -132,7 +132,7 @@ public class RecordBin {
this.out = new ByteCountingOutputStream(rawOut);
recordWriter = writerFactory.createWriter(logger, record.getSchema(), out);
recordWriter = writerFactory.createWriter(logger, record.getSchema(), out, flowFile);
recordWriter.beginRecordSet();
}

View File

@ -36,6 +36,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@ -77,7 +78,7 @@ public class RecordSqlWriter implements SqlWriter {
} catch (final SQLException | SchemaNotFoundException | IOException e) {
throw new ProcessException(e);
}
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream)) {
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream, Collections.emptyMap())) {
writeResultRef.set(resultSetWriter.write(recordSet));
if (mimeType == null) {
mimeType = resultSetWriter.getMimeType();
@ -115,7 +116,7 @@ public class RecordSqlWriter implements SqlWriter {
@Override
public void writeEmptyResultSet(OutputStream outputStream, ComponentLog logger) throws IOException {
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream)) {
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream, Collections.emptyMap())) {
mimeType = resultSetWriter.getMimeType();
resultSetWriter.beginRecordSet();
resultSetWriter.finishRecordSet();

View File

@ -27,7 +27,12 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.csv.CSVRecordSetWriter;
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException;
@ -203,9 +208,9 @@ public class TestConvertRecord {
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final SnappyInputStream sis = new SnappyInputStream(new ByteArrayInputStream(flowFile.toByteArray())); final OutputStream out = baos) {
@ -218,4 +223,50 @@ public class TestConvertRecord {
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/input/person.json"))), baos.toString(StandardCharsets.UTF_8.name()));
}
@Test
public void testCSVFormattingWithEL() throws InitializationException {
TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
CSVReader csvReader = new CSVReader();
runner.addControllerService("csv-reader", csvReader);
runner.setProperty(csvReader, CSVUtils.VALUE_SEPARATOR, "${csv.in.delimiter}");
runner.setProperty(csvReader, CSVUtils.QUOTE_CHAR, "${csv.in.quote}");
runner.setProperty(csvReader, CSVUtils.ESCAPE_CHAR, "${csv.in.escape}");
runner.setProperty(csvReader, CSVUtils.COMMENT_MARKER, "${csv.in.comment}");
runner.enableControllerService(csvReader);
CSVRecordSetWriter csvWriter = new CSVRecordSetWriter();
runner.addControllerService("csv-writer", csvWriter);
runner.setProperty(csvWriter, CSVUtils.VALUE_SEPARATOR, "${csv.out.delimiter}");
runner.setProperty(csvWriter, CSVUtils.QUOTE_CHAR, "${csv.out.quote}");
runner.setProperty(csvWriter, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_ALL);
runner.enableControllerService(csvWriter);
runner.setProperty(ConvertRecord.RECORD_READER, "csv-reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "csv-writer");
String ffContent = "~ comment\n" +
"id|username|password\n" +
"123|'John'|^|^'^^\n";
Map<String, String> ffAttributes = new HashMap<>();
ffAttributes.put("csv.in.delimiter", "|");
ffAttributes.put("csv.in.quote", "'");
ffAttributes.put("csv.in.escape", "^");
ffAttributes.put("csv.in.comment", "~");
ffAttributes.put("csv.out.delimiter", "\t");
ffAttributes.put("csv.out.quote", "`");
runner.enqueue(ffContent, ffAttributes);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
String expected = "`id`\t`username`\t`password`\n" +
"`123`\t`John`\t`|'^`\n";
assertEquals(expected, new String(flowFile.toByteArray()));
}
}

View File

@ -824,7 +824,7 @@ public class TestQueryRecord {
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out, final Map<String, String> variables) {
return new RecordSetWriter() {
@Override

View File

@ -19,9 +19,11 @@ package org.apache.nifi.serialization;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Map;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
@ -75,6 +77,47 @@ public interface RecordSetWriterFactory extends ControllerService {
*
* @return a RecordSetWriter that can write record sets to an OutputStream
* @throws IOException if unable to read from the given InputStream
*
* @deprecated Use {@link #createWriter(ComponentLog, RecordSchema, OutputStream, FlowFile)} or {@link #createWriter(ComponentLog, RecordSchema, OutputStream, Map)} instead.
*/
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException;
@Deprecated
default RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException {
return createWriter(logger, schema, out, Collections.emptyMap());
}
/**
* <p>
* Creates a new RecordSetWriter that is capable of writing record contents to an OutputStream.
* The method accepts a FlowFile whose attributes can be used to resolve properties specified via Expression Language.
* </p>
*
* @param logger the logger to use when logging information. This is passed in, rather than using the logger of the Controller Service
* because it allows messages to be logged for the component that is calling this Controller Service.
* @param schema the schema that will be used for writing records
* @param out the OutputStream to write to
* @param flowFile the FlowFile whose attributes are used to resolve properties specified via Expression Language
*
* @return a RecordSetWriter that can write record sets to an OutputStream
* @throws IOException if unable to read from the given InputStream
*/
default RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out, FlowFile flowFile) throws SchemaNotFoundException, IOException {
return createWriter(logger, schema, out, flowFile.getAttributes());
}
/**
* <p>
* Creates a new RecordSetWriter that is capable of writing record contents to an OutputStream.
* The method accepts a variables map that can be used to resolve properties specified via Expression Language.
* </p>
*
* @param logger the logger to use when logging information. This is passed in, rather than using the logger of the Controller Service
* because it allows messages to be logged for the component that is calling this Controller Service.
* @param schema the schema that will be used for writing records
* @param out the OutputStream to write to
* @param variables the variables which are used to resolve properties specified via Expression Language
*
* @return a RecordSetWriter that can write record sets to an OutputStream
* @throws IOException if unable to read from the given InputStream
*/
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out, Map<String, String> variables) throws SchemaNotFoundException, IOException;
}

View File

@ -49,6 +49,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@ -123,7 +124,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, final OutputStream out) throws IOException {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, final OutputStream out, final Map<String, String> variables) throws IOException {
final String strategyValue = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue();
final String compressionFormat = getConfigurationContext().getProperty(COMPRESSION_FORMAT).getValue();

View File

@ -54,7 +54,7 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy {
}
try {
final CSVFormat csvFormat = CSVUtils.createCSVFormat(context).withFirstRecordAsHeader();
final CSVFormat csvFormat = CSVUtils.createCSVFormat(context, variables).withFirstRecordAsHeader();
try (final Reader reader = new InputStreamReader(new BOMInputStream(contentStream));
final CSVParser csvParser = new CSVParser(reader, csvFormat)) {

View File

@ -46,6 +46,7 @@ import org.apache.nifi.stream.io.NonCloseableInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -55,7 +56,7 @@ import java.util.Map;
+ "the values. See Controller Service's Usage for further documentation.")
public class CSVReader extends SchemaRegistryService implements RecordReaderFactory {
private final AllowableValue headerDerivedAllowableValue = new AllowableValue("csv-header-derived", "Use String Fields From Header",
private static final AllowableValue HEADER_DERIVED = new AllowableValue("csv-header-derived", "Use String Fields From Header",
"The first non-comment line of the CSV file is a header line that contains the names of the columns. The schema will be derived by using the "
+ "column names in the header and assuming that all columns are of type String.");
@ -78,8 +79,9 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
.required(true)
.build();
private volatile ConfigurationContext context;
private volatile String csvParser;
private volatile CSVFormat csvFormat;
private volatile String dateFormat;
private volatile String timeFormat;
private volatile String timestampFormat;
@ -87,6 +89,9 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
private volatile boolean ignoreHeader;
private volatile String charSet;
// it will be initialized only if there are no dynamic csv formatting properties
private volatile CSVFormat csvFormat;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
@ -108,9 +113,10 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
}
@OnEnabled
public void storeCsvFormat(final ConfigurationContext context) {
public void storeStaticProperties(final ConfigurationContext context) {
this.context = context;
this.csvParser = context.getProperty(CSV_PARSER).getValue();
this.csvFormat = CSVUtils.createCSVFormat(context);
this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
@ -121,10 +127,15 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
// Ensure that if we are deriving schema from header that we always treat the first line as a header,
// regardless of the 'First Line is Header' property
final String accessStrategy = context.getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue();
if (headerDerivedAllowableValue.getValue().equals(accessStrategy) || SchemaInferenceUtil.INFER_SCHEMA.getValue().equals(accessStrategy)) {
this.csvFormat = this.csvFormat.withFirstRecordAsHeader();
if (HEADER_DERIVED.getValue().equals(accessStrategy) || SchemaInferenceUtil.INFER_SCHEMA.getValue().equals(accessStrategy)) {
this.firstLineIsHeader = true;
}
if (!CSVUtils.isDynamicCSVFormat(context)) {
this.csvFormat = CSVUtils.createCSVFormat(context, Collections.emptyMap());
} else {
this.csvFormat = null;
}
}
@Override
@ -134,6 +145,13 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(in), null);
in.reset();
CSVFormat csvFormat;
if (this.csvFormat != null) {
csvFormat = this.csvFormat;
} else {
csvFormat = CSVUtils.createCSVFormat(context, variables);
}
if(APACHE_COMMONS_CSV.getValue().equals(csvParser)) {
return new CSVRecordReader(in, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
} else if(JACKSON_CSV.getValue().equals(csvParser)) {
@ -145,10 +163,10 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
if (allowableValue.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) {
if (allowableValue.equalsIgnoreCase(HEADER_DERIVED.getValue())) {
return new CSVHeaderSchemaStrategy(context);
} else if (allowableValue.equalsIgnoreCase(SchemaInferenceUtil.INFER_SCHEMA.getValue())) {
final RecordSourceFactory<CSVRecordAndFieldNames> sourceFactory = (var, in) -> new CSVRecordSource(in, context);
final RecordSourceFactory<CSVRecordAndFieldNames> sourceFactory = (variables, in) -> new CSVRecordSource(in, context, variables);
final SchemaInferenceEngine<CSVRecordAndFieldNames> inference = new CSVSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
return new InferSchemaAccessStrategy<>(sourceFactory, inference, getLogger());
}
@ -159,7 +177,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
@Override
protected List<AllowableValue> getSchemaAccessStrategyValues() {
final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());
allowableValues.add(headerDerivedAllowableValue);
allowableValues.add(HEADER_DERIVED);
allowableValues.add(SchemaInferenceUtil.INFER_SCHEMA);
return allowableValues;
}

View File

@ -20,7 +20,9 @@ package org.apache.nifi.csv;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.csv.CSVFormat;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -41,10 +43,14 @@ import org.apache.nifi.serialization.record.RecordSchema;
+ "corresponding to the record fields.")
public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory {
private volatile CSVFormat csvFormat;
private volatile ConfigurationContext context;
private volatile boolean includeHeader;
private volatile String charSet;
// it will be initialized only if there are no dynamic csv formatting properties
private volatile CSVFormat csvFormat;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
@ -64,14 +70,28 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R
}
@OnEnabled
public void storeCsvFormat(final ConfigurationContext context) {
this.csvFormat = CSVUtils.createCSVFormat(context);
public void storeStaticProperties(final ConfigurationContext context) {
this.context = context;
this.includeHeader = context.getProperty(CSVUtils.INCLUDE_HEADER_LINE).asBoolean();
this.charSet = context.getProperty(CSVUtils.CHARSET).getValue();
if (!CSVUtils.isDynamicCSVFormat(context)) {
this.csvFormat = CSVUtils.createCSVFormat(context, Collections.emptyMap());
} else {
this.csvFormat = null;
}
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out, final Map<String, String> variables) throws SchemaNotFoundException, IOException {
CSVFormat csvFormat;
if (this.csvFormat != null) {
csvFormat = this.csvFormat;
} else {
csvFormat = CSVUtils.createCSVFormat(context, variables);
}
return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet);
}

View File

@ -33,12 +33,13 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class CSVRecordSource implements RecordSource<CSVRecordAndFieldNames> {
private final Iterator<CSVRecord> csvRecordIterator;
private final List<String> fieldNames;
public CSVRecordSource(final InputStream in, final PropertyContext context) throws IOException {
public CSVRecordSource(final InputStream in, final PropertyContext context, final Map<String, String> variables) throws IOException {
final String charset = context.getProperty(CSVUtils.CHARSET).getValue();
final Reader reader;
@ -48,7 +49,7 @@ public class CSVRecordSource implements RecordSource<CSVRecordAndFieldNames> {
throw new ProcessException(e);
}
final CSVFormat csvFormat = CSVUtils.createCSVFormat(context).withFirstRecordAsHeader().withTrim();
final CSVFormat csvFormat = CSVUtils.createCSVFormat(context, variables).withFirstRecordAsHeader().withTrim();
final CSVParser csvParser = new CSVParser(reader, csvFormat);
fieldNames = Collections.unmodifiableList(new ArrayList<>(csvParser.getHeaderMap().keySet()));

View File

@ -23,6 +23,7 @@ import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
@ -173,7 +174,7 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out, final Map<String, String> variables) throws SchemaNotFoundException, IOException {
final OutputStream bufferedOut = new BufferedOutputStream(out, 65536);
final OutputStream compressionOut;

View File

@ -39,6 +39,7 @@ import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Tags({"text", "freeform", "expression", "language", "el", "record", "recordset", "resultset", "writer", "serialize"})
@CapabilityDescription("Writes the contents of a RecordSet as free-form text. The configured "
@ -79,7 +80,7 @@ public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter i
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out, final Map<String, String> variables) {
return new FreeFormTextWriter(textValue, characterSet, out);
}

View File

@ -39,6 +39,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@Tags({"xml", "resultset", "writer", "serialize", "record", "recordset", "row"})
@CapabilityDescription("Writes a RecordSet to XML. The records are wrapped by a root tag.")
@ -165,7 +166,7 @@ public class XMLRecordSetWriter extends DateTimeTextRecordSetWriter implements R
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out, final Map<String, String> variables) throws SchemaNotFoundException, IOException {
final String nullSuppression = getConfigurationContext().getProperty(SUPPRESS_NULLS).getValue();
final NullSuppression nullSuppressionEnum;
if (nullSuppression.equals(ALWAYS_SUPPRESS.getValue())) {

View File

@ -40,7 +40,7 @@ public class TestCSVHeaderSchemaStrategy {
@Test
public void testSimple() throws SchemaNotFoundException, IOException {
final String headerLine = "a, b, c, d, e\\,z, f";
final String headerLine = "\"a\", b, c, d, e\\,z, f";
final byte[] headerBytes = headerLine.getBytes();
final Map<PropertyDescriptor, String> properties = new HashMap<>();
@ -66,4 +66,37 @@ public class TestCSVHeaderSchemaStrategy {
.allMatch(field -> field.getDataType().equals(RecordFieldType.STRING.getDataType())));
}
@Test
public void testWithEL() throws SchemaNotFoundException, IOException {
final String headerLine = "\'a\'; b; c; d; e^;z; f";
final byte[] headerBytes = headerLine.getBytes();
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(CSVUtils.CSV_FORMAT, CSVUtils.CUSTOM.getValue());
properties.put(CSVUtils.COMMENT_MARKER, "#");
properties.put(CSVUtils.VALUE_SEPARATOR, "${csv.delimiter}");
properties.put(CSVUtils.TRIM_FIELDS, "true");
properties.put(CSVUtils.QUOTE_CHAR, "${csv.quote}");
properties.put(CSVUtils.ESCAPE_CHAR, "${csv.escape}");
final Map<String, String> variables = new HashMap<>();
variables.put("csv.delimiter", ";");
variables.put("csv.quote", "'");
variables.put("csv.escape", "^");
final ConfigurationContext context = new MockConfigurationContext(properties, null);
final CSVHeaderSchemaStrategy strategy = new CSVHeaderSchemaStrategy(context);
final RecordSchema schema;
try (final InputStream bais = new ByteArrayInputStream(headerBytes)) {
schema = strategy.getSchema(variables, bais, null);
}
final List<String> expectedFieldNames = Arrays.asList("a", "b", "c", "d", "e;z", "f");
assertEquals(expectedFieldNames, schema.getFieldNames());
assertTrue(schema.getFields().stream()
.allMatch(field -> field.getDataType().equals(RecordFieldType.STRING.getDataType())));
}
}

View File

@ -58,7 +58,7 @@ public class TestCSVSchemaInference {
final InputStream bufferedIn = new BufferedInputStream(in)) {
final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>(
(var, content) -> new CSVRecordSource(content, context),
(variables, content) -> new CSVRecordSource(content, context, variables),
new CSVSchemaInference(timestampInference), Mockito.mock(ComponentLog.class));
schema = accessStrategy.getSchema(null, bufferedIn, null);
}
@ -82,4 +82,51 @@ public class TestCSVSchemaInference {
"componentId", "componentType", "componentName", "processGroupId", "processGroupName", "entityId", "entityType", "entitySize", "previousEntitySize", "updatedAttributes", "actorHostname",
"contentURI", "previousContentURI", "parentIds", "childIds", "platform", "application", "extra field", "numeric string"), fieldNames);
}
@Test
public void testInferenceIncludesAllRecordsWithEL() throws IOException {
final File file = new File("src/test/resources/csv/prov-events.csv");
final Map<PropertyDescriptor, String> properties = new HashMap<>();
new CSVReader().getSupportedPropertyDescriptors().forEach(prop -> properties.put(prop, prop.getDefaultValue()));
properties.put(CSVUtils.TRIM_FIELDS, "true");
properties.put(CSVUtils.VALUE_SEPARATOR, "${csv.delimiter}");
properties.put(CSVUtils.QUOTE_CHAR, "${csv.quote}");
properties.put(CSVUtils.ESCAPE_CHAR, "${csv.escape}");
final PropertyContext context = new MockConfigurationContext(properties, null);
final Map<String, String> attributes = new HashMap<>();
attributes.put("csv.delimiter", ",");
attributes.put("csv.quote", "\"");
attributes.put("csv.escape", "\\");
final RecordSchema schema;
try (final InputStream in = new FileInputStream(file);
final InputStream bufferedIn = new BufferedInputStream(in)) {
final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>(
(variables, content) -> new CSVRecordSource(content, context, variables),
new CSVSchemaInference(timestampInference), Mockito.mock(ComponentLog.class));
schema = accessStrategy.getSchema(attributes, bufferedIn, null);
}
assertSame(RecordFieldType.STRING, schema.getDataType("eventId").get().getFieldType());
assertSame(RecordFieldType.INT, schema.getDataType("eventOrdinal").get().getFieldType());
assertSame(RecordFieldType.STRING, schema.getDataType("eventType").get().getFieldType());
assertSame(RecordFieldType.LONG, schema.getDataType("timestampMillis").get().getFieldType());
assertEquals(RecordFieldType.TIMESTAMP.getDataType("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"), schema.getDataType("timestamp").get());
assertEquals(RecordFieldType.TIME.getDataType("HH:mm:ss"), schema.getDataType("eventTime").get());
assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"), schema.getDataType("eventDate").get());
assertEquals(RecordFieldType.STRING.getDataType(), schema.getDataType("maybeTime").get());
assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"), schema.getDataType("maybeDate").get());
assertSame(RecordFieldType.INT, schema.getDataType("parentIds").get().getFieldType());
assertSame(RecordFieldType.STRING, schema.getDataType("numeric string").get().getFieldType());
final List<String> fieldNames = schema.getFieldNames();
assertEquals(Arrays.asList("eventId", "eventOrdinal", "eventType", "timestampMillis", "timestamp", "eventDate", "eventTime", "maybeTime", "maybeDate", "durationMillis", "lineageStart",
"componentId", "componentType", "componentName", "processGroupId", "processGroupName", "entityId", "entityType", "entitySize", "previousEntitySize", "updatedAttributes", "actorHostname",
"contentURI", "previousContentURI", "parentIds", "childIds", "platform", "application", "extra field", "numeric string"), fieldNames);
}
}

View File

@ -33,7 +33,6 @@ public class TestCSVValidators {
/*** SingleCharValidator **/
@Test
public void testSingleCharNullValue() {
CSVValidators.SingleCharacterValidator validator = new CSVValidators.SingleCharacterValidator();
ValidationContext mockContext = Mockito.mock(ValidationContext.class);
ValidationResult result = validator.validate("EscapeChar", null, mockContext);
@ -66,6 +65,16 @@ public class TestCSVValidators {
assertTrue(result.isValid());
}
@Test
public void testSingleCharExpressionLanguage() {
CSVValidators.SingleCharacterValidator validator = new CSVValidators.SingleCharacterValidator();
ValidationContext mockContext = Mockito.mock(ValidationContext.class);
Mockito.when(mockContext.isExpressionLanguageSupported(Mockito.any())).thenReturn(true);
Mockito.when(mockContext.isExpressionLanguagePresent(Mockito.any())).thenReturn(true);
ValidationResult result = validator.validate("EscapeChar", "${csv.escape}", mockContext);
assertTrue(result.isValid());
}
/*** Unescaped SingleCharValidator **/
@ -95,4 +104,14 @@ public class TestCSVValidators {
assertTrue(result.isValid());
}
@Test
public void testUnescapedSingleCharExpressionLanguage() {
Validator validator = CSVValidators.UNESCAPED_SINGLE_CHAR_VALIDATOR;
ValidationContext mockContext = Mockito.mock(ValidationContext.class);
Mockito.when(mockContext.isExpressionLanguageSupported(Mockito.any())).thenReturn(true);
Mockito.when(mockContext.isExpressionLanguagePresent(Mockito.any())).thenReturn(true);
ValidationResult result = validator.validate("Delimiter", "${csv.delimiter}", mockContext);
assertTrue(result.isValid());
}
}

View File

@ -39,6 +39,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import static org.junit.Assert.assertThat;
@ -240,7 +241,7 @@ public class TestXMLRecordSetWriter {
@Override
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out)
throws SchemaNotFoundException, IOException {
return super.createWriter(logger, this.recordSchema, out);
return super.createWriter(logger, this.recordSchema, out, Collections.emptyMap());
}
}

View File

@ -64,6 +64,7 @@ public class TestXMLRecordSetWriterProcessor extends AbstractProcessor {
FlowFile flowFile = session.get();
final RecordSetWriterFactory writerFactory = context.getProperty(XML_WRITER).asControllerService(RecordSetWriterFactory.class);
final FlowFile flowFileRef = flowFile;
flowFile = session.write(flowFile, out -> {
try {
@ -72,7 +73,7 @@ public class TestXMLRecordSetWriterProcessor extends AbstractProcessor {
boolean multipleRecords = Boolean.parseBoolean(context.getProperty(MULTIPLE_RECORDS).getValue());
RecordSet recordSet = getRecordSet(multipleRecords);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, flowFileRef);
writer.write(recordSet);