NIFI-4892 - ValidateCSV: no doublequote escaping in invalid output

NIFI-4892 - ValidateCSV: no doublequote escaping in invalid output

NIFI-5907 - unit test

This closes #2481.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Pierre Villard 2018-02-20 10:36:31 +01:00 committed by Koji Kawamura
parent e9cb915ca9
commit 590fa2063c
2 changed files with 62 additions and 40 deletions

View File

@ -466,24 +466,25 @@ public class ValidateCsv extends AbstractProcessor {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
NifiCsvListReader listReader = null;
try {
listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref);
try(final NifiCsvListReader listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref)) {
// handling of header
if(header) {
List<String> headerList = listReader.read();
// read header
listReader.read();
if(!isWholeFFValidation) {
invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(headerList, csvPref, isFirstLineInvalid.get()));
out.write(print(listReader.getUntokenizedRow(), csvPref, true));
}
}));
validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(headerList, csvPref, isFirstLineValid.get()));
out.write(print(listReader.getUntokenizedRow(), csvPref, true));
}
}));
isFirstLineValid.set(false);
@ -496,14 +497,14 @@ public class ValidateCsv extends AbstractProcessor {
while (!stop) {
try {
final List<Object> list = listReader.read(cellProcs);
stop = list == null;
// read next row and check if no more row
stop = listReader.read(cellProcs) == null;
if(!isWholeFFValidation && !stop) {
validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(list, csvPref, isFirstLineValid.get()));
out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineValid.get()));
}
}));
okCount.set(okCount.get() + 1);
@ -524,7 +525,7 @@ public class ValidateCsv extends AbstractProcessor {
invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(e.getCsvContext().getRowSource(), csvPref, isFirstLineInvalid.get()));
out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineInvalid.get()));
}
}));
@ -546,10 +547,6 @@ public class ValidateCsv extends AbstractProcessor {
} catch (final IOException e) {
valid.set(false);
logger.error("Failed to validate {} against schema due to {}", new Object[]{flowFile}, e);
} finally {
if(listReader != null) {
listReader.close();
}
}
}
});
@ -602,35 +599,12 @@ public class ValidateCsv extends AbstractProcessor {
}
}
/**
* Method used to correctly write the lines by taking into account end of line
* character and separator character.
* @param list list of elements of the current row
* @param csvPref CSV preferences
* @param isFirstLine true if this is the first line we append
* @return String to append in the flow file
*/
private byte[] print(List<?> list, CsvPreference csvPref, boolean isFirstLine) {
private byte[] print(String row, CsvPreference csvPref, boolean isFirstLine) {
StringBuffer buffer = new StringBuffer();
if (!isFirstLine) {
buffer.append(csvPref.getEndOfLineSymbols());
}
final int size = list.size();
int i = 0;
for (Object item : list) {
if (item != null) {
buffer.append(item.toString());
}
if (i < size - 1) {
buffer.append((char) csvPref.getDelimiterChar());
}
i++;
}
return buffer.toString().getBytes();
return buffer.append(row).toString().getBytes();
}
/**

View File

@ -75,7 +75,7 @@ public class TestValidateCsv {
runner.run();
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("#Name,Birthdate,Weight\nJohn,,63.2\nBob,,45.0");
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("#Name,Birthdate,Weight\nJohn,\"\",63.2\nBob,,45.0");
runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
}
@ -337,4 +337,52 @@ public class TestValidateCsv {
runner.assertTransferCount(ValidateCsv.REL_VALID, 2);
runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
}
@Test
public void testEscapingLineByLine() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
final String row = "Header1,\"Header2,escaped\",Header3\r\nField1,\"Field2,escaped\",Field3";
runner.setProperty(ValidateCsv.SCHEMA, "ParseInt(),ParseInt(),ParseInt()");
runner.enqueue(row);
runner.run(1);
runner.assertTransferCount(ValidateCsv.REL_VALID, 0);
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals(row);
runner.clearTransferState();
runner.setProperty(ValidateCsv.SCHEMA, "null,null,null");
runner.enqueue(row);
runner.run(1);
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals(row);
}
@Test
public void testQuote() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
runner.setProperty(ValidateCsv.SCHEMA, "NotNull(), NotNull(), NotNull()");
runner.enqueue("Header 1, Header 2, Header 3\n\"Content 1a, Content 1b\", Content 2, Content 3");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("Header 1, Header 2, Header 3\n\"Content 1a, Content 1b\", Content 2, Content 3");
runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
}
}