diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java index 9796822aff..bd6a0e79d2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java @@ -466,24 +466,25 @@ public class ValidateCsv extends AbstractProcessor { session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { - NifiCsvListReader listReader = null; - try { - listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref); + try(final NifiCsvListReader listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref)) { // handling of header if(header) { - List headerList = listReader.read(); + + // read header + listReader.read(); + if(!isWholeFFValidation) { invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { - out.write(print(headerList, csvPref, isFirstLineInvalid.get())); + out.write(print(listReader.getUntokenizedRow(), csvPref, true)); } })); validFF.set(session.append(validFF.get(), new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { - out.write(print(headerList, csvPref, isFirstLineValid.get())); + out.write(print(listReader.getUntokenizedRow(), csvPref, true)); } })); isFirstLineValid.set(false); @@ -496,14 +497,14 @@ public class ValidateCsv extends AbstractProcessor { while (!stop) { try { - final List list = listReader.read(cellProcs); - stop = list == null; + // read next row and check if no more row + stop = listReader.read(cellProcs) == null; if(!isWholeFFValidation && !stop) { validFF.set(session.append(validFF.get(), new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { - out.write(print(list, csvPref, isFirstLineValid.get())); + out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineValid.get())); } })); okCount.set(okCount.get() + 1); @@ -524,7 +525,7 @@ public class ValidateCsv extends AbstractProcessor { invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { - out.write(print(e.getCsvContext().getRowSource(), csvPref, isFirstLineInvalid.get())); + out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineInvalid.get())); } })); @@ -546,10 +547,6 @@ public class ValidateCsv extends AbstractProcessor { } catch (final IOException e) { valid.set(false); logger.error("Failed to validate {} against schema due to {}", new Object[]{flowFile}, e); - } finally { - if(listReader != null) { - listReader.close(); - } } } }); @@ -602,35 +599,12 @@ public class ValidateCsv extends AbstractProcessor { } } - /** - * Method used to correctly write the lines by taking into account end of line - * character and separator character. - * @param list list of elements of the current row - * @param csvPref CSV preferences - * @param isFirstLine true if this is the first line we append - * @return String to append in the flow file - */ - private byte[] print(List list, CsvPreference csvPref, boolean isFirstLine) { + private byte[] print(String row, CsvPreference csvPref, boolean isFirstLine) { StringBuffer buffer = new StringBuffer(); - if (!isFirstLine) { buffer.append(csvPref.getEndOfLineSymbols()); } - - final int size = list.size(); - int i = 0; - for (Object item : list) { - if (item != null) { - buffer.append(item.toString()); - } - - if (i < size - 1) { - buffer.append((char) csvPref.getDelimiterChar()); - } - i++; - } - - return buffer.toString().getBytes(); + return buffer.append(row).toString().getBytes(); } /** diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java index b03aed4718..c694ab1c8f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java @@ -75,7 +75,7 @@ public class TestValidateCsv { runner.run(); runner.assertTransferCount(ValidateCsv.REL_VALID, 1); - runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("#Name,Birthdate,Weight\nJohn,,63.2\nBob,,45.0"); + runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("#Name,Birthdate,Weight\nJohn,\"\",63.2\nBob,,45.0"); runner.assertTransferCount(ValidateCsv.REL_INVALID, 0); } @@ -337,4 +337,52 @@ public class TestValidateCsv { runner.assertTransferCount(ValidateCsv.REL_VALID, 2); runner.assertTransferCount(ValidateCsv.REL_INVALID, 0); } + + @Test + public void testEscapingLineByLine() { + final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv()); + runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ","); + runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n"); + runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\""); + runner.setProperty(ValidateCsv.HEADER, "true"); + runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY); + + final String row = "Header1,\"Header2,escaped\",Header3\r\nField1,\"Field2,escaped\",Field3"; + runner.setProperty(ValidateCsv.SCHEMA, "ParseInt(),ParseInt(),ParseInt()"); + + runner.enqueue(row); + runner.run(1); + + runner.assertTransferCount(ValidateCsv.REL_VALID, 0); + runner.assertTransferCount(ValidateCsv.REL_INVALID, 1); + runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals(row); + runner.clearTransferState(); + + runner.setProperty(ValidateCsv.SCHEMA, "null,null,null"); + runner.enqueue(row); + runner.run(1); + + runner.assertTransferCount(ValidateCsv.REL_VALID, 1); + runner.assertTransferCount(ValidateCsv.REL_INVALID, 0); + runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals(row); + } + + @Test + public void testQuote() { + final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv()); + runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ","); + runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\n"); + runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\""); + runner.setProperty(ValidateCsv.HEADER, "true"); + runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY); + + runner.setProperty(ValidateCsv.SCHEMA, "NotNull(), NotNull(), NotNull()"); + + runner.enqueue("Header 1, Header 2, Header 3\n\"Content 1a, Content 1b\", Content 2, Content 3"); + runner.run(); + + runner.assertTransferCount(ValidateCsv.REL_VALID, 1); + runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("Header 1, Header 2, Header 3\n\"Content 1a, Content 1b\", Content 2, Content 3"); + runner.assertTransferCount(ValidateCsv.REL_INVALID, 0); + } }