NIFI-4465 ConvertExcelToCSV Data Formatting and Delimiters

This closes #2194.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
patricker 2017-10-05 13:01:47 +08:00 committed by Koji Kawamura
parent b950eed1a5
commit fd00df3d2f
7 changed files with 530 additions and 240 deletions

View File

@ -49,5 +49,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.4</version>
</dependency>
</dependencies>
</project>

View File

@ -23,22 +23,22 @@ import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.processor.util.StandardValidators;
public class CSVUtils {
static final AllowableValue CUSTOM = new AllowableValue("custom", "Custom Format",
public static final AllowableValue CUSTOM = new AllowableValue("custom", "Custom Format",
"The format of the CSV is configured by using the properties of this Controller Service, such as Value Separator");
static final AllowableValue RFC_4180 = new AllowableValue("rfc-4180", "RFC 4180", "CSV data follows the RFC 4180 Specification defined at https://tools.ietf.org/html/rfc4180");
static final AllowableValue EXCEL = new AllowableValue("excel", "Microsoft Excel", "CSV data follows the format used by Microsoft Excel");
static final AllowableValue TDF = new AllowableValue("tdf", "Tab-Delimited", "CSV data is Tab-Delimited instead of Comma Delimited");
static final AllowableValue INFORMIX_UNLOAD = new AllowableValue("informix-unload", "Informix Unload", "The format used by Informix when issuing the UNLOAD TO file_name command");
static final AllowableValue INFORMIX_UNLOAD_CSV = new AllowableValue("informix-unload-csv", "Informix Unload Escape Disabled",
public static final AllowableValue RFC_4180 = new AllowableValue("rfc-4180", "RFC 4180", "CSV data follows the RFC 4180 Specification defined at https://tools.ietf.org/html/rfc4180");
public static final AllowableValue EXCEL = new AllowableValue("excel", "Microsoft Excel", "CSV data follows the format used by Microsoft Excel");
public static final AllowableValue TDF = new AllowableValue("tdf", "Tab-Delimited", "CSV data is Tab-Delimited instead of Comma Delimited");
public static final AllowableValue INFORMIX_UNLOAD = new AllowableValue("informix-unload", "Informix Unload", "The format used by Informix when issuing the UNLOAD TO file_name command");
public static final AllowableValue INFORMIX_UNLOAD_CSV = new AllowableValue("informix-unload-csv", "Informix Unload Escape Disabled",
"The format used by Informix when issuing the UNLOAD TO file_name command with escaping disabled");
static final AllowableValue MYSQL = new AllowableValue("mysql", "MySQL Format", "CSV data follows the format used by MySQL");
public static final AllowableValue MYSQL = new AllowableValue("mysql", "MySQL Format", "CSV data follows the format used by MySQL");
static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder()
public static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder()
.name("CSV Format")
.description("Specifies which \"format\" the CSV data is in, or specifies if custom formatting should be used.")
.expressionLanguageSupported(false)
@ -46,7 +46,7 @@ public class CSVUtils {
.defaultValue(CUSTOM.getValue())
.required(true)
.build();
static final PropertyDescriptor VALUE_SEPARATOR = new PropertyDescriptor.Builder()
public static final PropertyDescriptor VALUE_SEPARATOR = new PropertyDescriptor.Builder()
.name("Value Separator")
.description("The character that is used to separate values/fields in a CSV Record")
.addValidator(CSVValidators.UNESCAPED_SINGLE_CHAR_VALIDATOR)
@ -54,7 +54,7 @@ public class CSVUtils {
.defaultValue(",")
.required(true)
.build();
static final PropertyDescriptor QUOTE_CHAR = new PropertyDescriptor.Builder()
public static final PropertyDescriptor QUOTE_CHAR = new PropertyDescriptor.Builder()
.name("Quote Character")
.description("The character that is used to quote values so that escape characters do not have to be used")
.addValidator(new CSVValidators.SingleCharacterValidator())
@ -62,7 +62,7 @@ public class CSVUtils {
.defaultValue("\"")
.required(true)
.build();
static final PropertyDescriptor FIRST_LINE_IS_HEADER = new PropertyDescriptor.Builder()
public static final PropertyDescriptor FIRST_LINE_IS_HEADER = new PropertyDescriptor.Builder()
.name("Skip Header Line")
.displayName("Treat First Line as Header")
.description("Specifies whether or not the first line of CSV should be considered a Header or should be considered a record. If the Schema Access Strategy "
@ -75,7 +75,7 @@ public class CSVUtils {
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor IGNORE_CSV_HEADER = new PropertyDescriptor.Builder()
public static final PropertyDescriptor IGNORE_CSV_HEADER = new PropertyDescriptor.Builder()
.name("ignore-csv-header")
.displayName("Ignore CSV Header Column Names")
.description("If the first line of a CSV is a header, and the configured schema does not match the fields named in the header line, this controls how "
@ -87,14 +87,14 @@ public class CSVUtils {
.defaultValue("false")
.required(false)
.build();
static final PropertyDescriptor COMMENT_MARKER = new PropertyDescriptor.Builder()
public static final PropertyDescriptor COMMENT_MARKER = new PropertyDescriptor.Builder()
.name("Comment Marker")
.description("The character that is used to denote the start of a comment. Any line that begins with this comment will be ignored.")
.addValidator(new CSVValidators.SingleCharacterValidator())
.expressionLanguageSupported(false)
.required(false)
.build();
static final PropertyDescriptor ESCAPE_CHAR = new PropertyDescriptor.Builder()
public static final PropertyDescriptor ESCAPE_CHAR = new PropertyDescriptor.Builder()
.name("Escape Character")
.description("The character that is used to escape characters that would otherwise have a specific meaning to the CSV Parser.")
.addValidator(new CSVValidators.SingleCharacterValidator())
@ -102,14 +102,14 @@ public class CSVUtils {
.defaultValue("\\")
.required(true)
.build();
static final PropertyDescriptor NULL_STRING = new PropertyDescriptor.Builder()
public static final PropertyDescriptor NULL_STRING = new PropertyDescriptor.Builder()
.name("Null String")
.description("Specifies a String that, if present as a value in the CSV, should be considered a null field instead of using the literal value.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.required(false)
.build();
static final PropertyDescriptor TRIM_FIELDS = new PropertyDescriptor.Builder()
public static final PropertyDescriptor TRIM_FIELDS = new PropertyDescriptor.Builder()
.name("Trim Fields")
.description("Whether or not white space should be removed from the beginning and end of fields")
.expressionLanguageSupported(false)
@ -119,14 +119,14 @@ public class CSVUtils {
.build();
// CSV Format fields for writers only
static final AllowableValue QUOTE_ALL = new AllowableValue("ALL", "Quote All Values", "All values will be quoted using the configured quote character.");
static final AllowableValue QUOTE_MINIMAL = new AllowableValue("MINIMAL", "Quote Minimal",
public static final AllowableValue QUOTE_ALL = new AllowableValue("ALL", "Quote All Values", "All values will be quoted using the configured quote character.");
public static final AllowableValue QUOTE_MINIMAL = new AllowableValue("MINIMAL", "Quote Minimal",
"Values will be quoted only if they are contain special characters such as newline characters or field separators.");
static final AllowableValue QUOTE_NON_NUMERIC = new AllowableValue("NON_NUMERIC", "Quote Non-Numeric Values", "Values will be quoted unless the value is a number.");
static final AllowableValue QUOTE_NONE = new AllowableValue("NONE", "Do Not Quote Values",
public static final AllowableValue QUOTE_NON_NUMERIC = new AllowableValue("NON_NUMERIC", "Quote Non-Numeric Values", "Values will be quoted unless the value is a number.");
public static final AllowableValue QUOTE_NONE = new AllowableValue("NONE", "Do Not Quote Values",
"Values will not be quoted. Instead, all special characters will be escaped using the configured escape character.");
static final PropertyDescriptor QUOTE_MODE = new PropertyDescriptor.Builder()
public static final PropertyDescriptor QUOTE_MODE = new PropertyDescriptor.Builder()
.name("Quote Mode")
.description("Specifies how fields should be quoted when they are written")
.expressionLanguageSupported(false)
@ -134,7 +134,7 @@ public class CSVUtils {
.defaultValue(QUOTE_MINIMAL.getValue())
.required(true)
.build();
static final PropertyDescriptor TRAILING_DELIMITER = new PropertyDescriptor.Builder()
public static final PropertyDescriptor TRAILING_DELIMITER = new PropertyDescriptor.Builder()
.name("Include Trailing Delimiter")
.description("If true, a trailing delimiter will be added to each CSV Record that is written. If false, the trailing delimiter will be omitted.")
.expressionLanguageSupported(false)
@ -142,7 +142,7 @@ public class CSVUtils {
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder()
public static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder()
.name("Record Separator")
.description("Specifies the characters to use in order to separate CSV Records")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -150,7 +150,7 @@ public class CSVUtils {
.defaultValue("\\n")
.required(true)
.build();
static final PropertyDescriptor INCLUDE_HEADER_LINE = new PropertyDescriptor.Builder()
public static final PropertyDescriptor INCLUDE_HEADER_LINE = new PropertyDescriptor.Builder()
.name("Include Header Line")
.description("Specifies whether or not the CSV column names should be written out as the first line.")
.allowableValues("true", "false")
@ -158,7 +158,7 @@ public class CSVUtils {
.required(true)
.build();
static CSVFormat createCSVFormat(final ConfigurationContext context) {
public static CSVFormat createCSVFormat(final PropertyContext context) {
final String formatName = context.getProperty(CSV_FORMAT).getValue();
if (formatName.equalsIgnoreCase(CUSTOM.getValue())) {
return buildCustomFormat(context);
@ -180,15 +180,15 @@ public class CSVUtils {
}
}
private static char getUnescapedChar(final ConfigurationContext context, final PropertyDescriptor property) {
private static char getUnescapedChar(final PropertyContext context, final PropertyDescriptor property) {
return StringEscapeUtils.unescapeJava(context.getProperty(property).getValue()).charAt(0);
}
private static char getChar(final ConfigurationContext context, final PropertyDescriptor property) {
private static char getChar(final PropertyContext context, final PropertyDescriptor property) {
return CSVUtils.unescape(context.getProperty(property).getValue()).charAt(0);
}
private static CSVFormat buildCustomFormat(final ConfigurationContext context) {
private static CSVFormat buildCustomFormat(final PropertyContext context) {
final char valueSeparator = getUnescapedChar(context, VALUE_SEPARATOR);
CSVFormat format = CSVFormat.newFormat(valueSeparator)
.withAllowMissingColumnNames()

View File

@ -17,7 +17,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<poi.version>3.14</poi.version>
<poi.version>3.17</poi.version>
</properties>
<parent>
@ -66,7 +66,6 @@
<artifactId>poi-ooxml</artifactId>
<version>${poi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
@ -75,6 +74,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-record-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>

View File

@ -19,14 +19,16 @@ package org.apache.nifi.processors.poi;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -34,6 +36,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
@ -48,15 +51,20 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
import org.apache.poi.openxml4j.exceptions.OpenXML4JException;
import org.apache.poi.openxml4j.opc.OPCPackage;
import org.apache.poi.ss.usermodel.DataFormatter;
import org.apache.poi.ss.util.CellAddress;
import org.apache.poi.ss.util.CellReference;
import org.apache.poi.util.SAXHelper;
import org.apache.poi.xssf.eventusermodel.ReadOnlySharedStringsTable;
import org.apache.poi.xssf.eventusermodel.XSSFReader;
import org.apache.poi.xssf.model.SharedStringsTable;
import org.apache.poi.xssf.usermodel.XSSFRichTextString;
import org.xml.sax.Attributes;
import org.apache.poi.xssf.eventusermodel.XSSFSheetXMLHandler;
import org.apache.poi.xssf.model.StylesTable;
import org.apache.poi.xssf.usermodel.XSSFComment;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import org.xml.sax.XMLReader;
import org.xml.sax.helpers.DefaultHandler;
import org.xml.sax.helpers.XMLReaderFactory;
import javax.xml.parsers.ParserConfigurationException;
@Tags({"excel", "csv", "poi"})
@ -78,17 +86,8 @@ public class ConvertExcelToCSVProcessor
public static final String SHEET_NAME = "sheetname";
public static final String ROW_NUM = "numrows";
public static final String SOURCE_FILE_NAME = "sourcefilename";
private static final String SAX_CELL_REF = "c";
private static final String SAX_CELL_TYPE = "t";
private static final String SAX_CELL_ADDRESS = "r";
private static final String SAX_CELL_STRING = "s";
private static final String SAX_CELL_CONTENT_REF = "v";
private static final String SAX_ROW_REF = "row";
private static final String SAX_SHEET_NAME_REF = "sheetPr";
private static final String DESIRED_SHEETS_DELIMITER = ",";
private static final String UNKNOWN_SHEET_NAME = "UNKNOWN";
private static final String SAX_PARSER = "org.apache.xerces.parsers.SAXParser";
private static final Pattern CELL_ADDRESS_REGEX = Pattern.compile("^([a-zA-Z]+)([\\d]+)$");
public static final PropertyDescriptor DESIRED_SHEETS = new PropertyDescriptor
.Builder().name("extract-sheets")
@ -101,6 +100,35 @@ public class ConvertExcelToCSVProcessor
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ROWS_TO_SKIP = new PropertyDescriptor
.Builder().name("excel-extract-first-row")
.displayName("Number of Rows to Skip")
.description("The row number of the first row to start processing."
+ "Use this to skip over rows of data at the top of your worksheet that are not part of the dataset."
+ "Empty rows of data anywhere in the spreadsheet will always be skipped, no matter what this value is set to.")
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor COLUMNS_TO_SKIP = new PropertyDescriptor
.Builder().name("excel-extract-column-to-skip")
.displayName("Columns To Skip")
.description("Comma delimited list of column numbers to skip. Use the columns number and not the letter designation. "
+ "Use this to skip over columns anywhere in your worksheet that you don't want extracted as part of the record.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor FORMAT_VALUES = new PropertyDescriptor.Builder()
.name("excel-format-values")
.displayName("Format Cell Values")
.description("Should the cell values be written to CSV using the formatting applied in Excel, or should they be printed as raw values.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static final Relationship ORIGINAL = new Relationship.Builder()
.name("original")
.description("Original Excel document received by this processor")
@ -124,6 +152,24 @@ public class ConvertExcelToCSVProcessor
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(DESIRED_SHEETS);
descriptors.add(ROWS_TO_SKIP);
descriptors.add(COLUMNS_TO_SKIP);
descriptors.add(FORMAT_VALUES);
descriptors.add(CSVUtils.CSV_FORMAT);
descriptors.add(CSVUtils.VALUE_SEPARATOR);
descriptors.add(CSVUtils.INCLUDE_HEADER_LINE);
descriptors.add(CSVUtils.QUOTE_CHAR);
descriptors.add(CSVUtils.ESCAPE_CHAR);
descriptors.add(CSVUtils.COMMENT_MARKER);
descriptors.add(CSVUtils.NULL_STRING);
descriptors.add(CSVUtils.TRIM_FIELDS);
descriptors.add(new PropertyDescriptor.Builder()
.fromPropertyDescriptor(CSVUtils.QUOTE_MODE)
.defaultValue(CSVUtils.QUOTE_NONE.getValue())
.build());
descriptors.add(CSVUtils.RECORD_SEPARATOR);
descriptors.add(CSVUtils.TRAILING_DELIMITER);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
@ -150,28 +196,46 @@ public class ConvertExcelToCSVProcessor
return;
}
try {
final String desiredSheetsDelimited = context.getProperty(DESIRED_SHEETS).evaluateAttributeExpressions().getValue();
final boolean formatValues = context.getProperty(FORMAT_VALUES).asBoolean();
final CSVFormat csvFormat = CSVUtils.createCSVFormat(context);
//Switch to 0 based index
final int firstRow = context.getProperty(ROWS_TO_SKIP).asInteger() - 1;
final String[] sColumnsToSkip = StringUtils
.split(context.getProperty(COLUMNS_TO_SKIP).getValue(), ",");
final List<Integer> columnsToSkip = new ArrayList<>();
if(sColumnsToSkip != null && sColumnsToSkip.length > 0) {
for (String c : sColumnsToSkip) {
try {
//Switch to 0 based index
columnsToSkip.add(Integer.parseInt(c) - 1);
} catch (NumberFormatException e) {
throw new ProcessException("Invalid column in Columns to Skip list.", e);
}
}
}
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream inputStream) throws IOException {
try {
String desiredSheetsDelimited = context.getProperty(DESIRED_SHEETS)
.evaluateAttributeExpressions().getValue();
OPCPackage pkg = OPCPackage.open(inputStream);
XSSFReader r = new XSSFReader(pkg);
SharedStringsTable sst = r.getSharedStringsTable();
ReadOnlySharedStringsTable sst = new ReadOnlySharedStringsTable(pkg);
StylesTable styles = r.getStylesTable();
XSSFReader.SheetIterator iter = (XSSFReader.SheetIterator) r.getSheetsData();
if (desiredSheetsDelimited != null) {
String[] desiredSheets = StringUtils
.split(desiredSheetsDelimited, DESIRED_SHEETS_DELIMITER);
if (desiredSheets != null) {
while (iter.hasNext()) {
InputStream sheet = iter.next();
String sheetName = iter.getSheetName();
@ -179,7 +243,8 @@ public class ConvertExcelToCSVProcessor
for (int i = 0; i < desiredSheets.length; i++) {
//If the sheetName is a desired one parse it
if (sheetName.equalsIgnoreCase(desiredSheets[i])) {
handleExcelSheet(session, flowFile, sst, sheet, sheetName);
ExcelSheetReadConfig readConfig = new ExcelSheetReadConfig(columnsToSkip, firstRow, sheetName, formatValues, sst, styles);
handleExcelSheet(session, flowFile, sheet, readConfig, csvFormat);
break;
}
}
@ -191,13 +256,17 @@ public class ConvertExcelToCSVProcessor
} else {
//Get all of the sheets in the document.
while (iter.hasNext()) {
handleExcelSheet(session, flowFile, sst, iter.next(), iter.getSheetName());
InputStream sheet = iter.next();
String sheetName = iter.getSheetName();
ExcelSheetReadConfig readConfig = new ExcelSheetReadConfig(columnsToSkip, firstRow, sheetName, formatValues, sst, styles);
handleExcelSheet(session, flowFile, sheet, readConfig, csvFormat);
}
}
} catch (InvalidFormatException ife) {
getLogger().error("Only .xlsx Excel 2007 OOXML files are supported", ife);
throw new UnsupportedOperationException("Only .xlsx Excel 2007 OOXML files are supported", ife);
} catch (OpenXML4JException e) {
} catch (OpenXML4JException | SAXException e) {
getLogger().error("Error occurred while processing Excel document metadata", e);
}
}
@ -206,7 +275,7 @@ public class ConvertExcelToCSVProcessor
session.transfer(flowFile, ORIGINAL);
} catch (RuntimeException ex) {
getLogger().error("Failed to process incoming Excel document", ex);
getLogger().error("Failed to process incoming Excel document. " + ex.getMessage(), ex);
FlowFile failedFlowFile = session.putAttribute(flowFile,
ConvertExcelToCSVProcessor.class.getName() + ".error", ex.getMessage());
session.transfer(failedFlowFile, FAILURE);
@ -220,45 +289,48 @@ public class ConvertExcelToCSVProcessor
* @param session
* The NiFi ProcessSession instance for the current invocation.
*/
private void handleExcelSheet(ProcessSession session, FlowFile originalParentFF,
SharedStringsTable sst, final InputStream sheetInputStream, String sName) throws IOException {
private void handleExcelSheet(ProcessSession session, FlowFile originalParentFF, final InputStream sheetInputStream, ExcelSheetReadConfig readConfig,
CSVFormat csvFormat) throws IOException {
FlowFile ff = session.create();
try {
final DataFormatter formatter = new DataFormatter();
final InputSource sheetSource = new InputSource(sheetInputStream);
final SheetToCSV sheetHandler = new SheetToCSV(readConfig, csvFormat);
final XMLReader parser = SAXHelper.newXMLReader();
//If Value Formatting is set to false then don't pass in the styles table.
// This will cause the XSSF Handler to return the raw value instead of the formatted one.
final StylesTable sst = readConfig.getFormatValues()?readConfig.getStyles():null;
final XSSFSheetXMLHandler handler = new XSSFSheetXMLHandler(
sst, null, readConfig.getSharedStringsTable(), sheetHandler, formatter, false);
XMLReader parser =
XMLReaderFactory.createXMLReader(
SAX_PARSER
);
ExcelSheetRowHandler handler = new ExcelSheetRowHandler(sst);
parser.setContentHandler(handler);
ff = session.write(ff, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
InputSource sheetSource = new InputSource(sheetInputStream);
ExcelSheetRowHandler eh = null;
PrintStream outPrint = new PrintStream(out);
sheetHandler.setOutput(outPrint);
try {
eh = (ExcelSheetRowHandler) parser.getContentHandler();
eh.setFlowFileOutputStream(out);
parser.setContentHandler(eh);
parser.parse(sheetSource);
sheetInputStream.close();
sheetHandler.close();
outPrint.close();
} catch (SAXException se) {
getLogger().error("Error occurred while processing Excel sheet {}", new Object[]{eh.getSheetName()}, se);
getLogger().error("Error occurred while processing Excel sheet {}", new Object[]{readConfig.getSheetName()}, se);
}
}
});
if (handler.getSheetName().equals(UNKNOWN_SHEET_NAME)) {
//Used the named parsed from the handler. This logic is only here because IF the handler does find a value that should take precedence.
ff = session.putAttribute(ff, SHEET_NAME, sName);
} else {
ff = session.putAttribute(ff, SHEET_NAME, handler.getSheetName());
sName = handler.getSheetName();
}
ff = session.putAttribute(ff, ROW_NUM, new Long(handler.getRowCount()).toString());
ff = session.putAttribute(ff, SHEET_NAME, readConfig.getSheetName());
ff = session.putAttribute(ff, ROW_NUM, new Long(sheetHandler.getRowCount()).toString());
if (StringUtils.isNotEmpty(originalParentFF.getAttribute(CoreAttributes.FILENAME.key()))) {
ff = session.putAttribute(ff, SOURCE_FILE_NAME, originalParentFF.getAttribute(CoreAttributes.FILENAME.key()));
@ -268,13 +340,13 @@ public class ConvertExcelToCSVProcessor
//Update the CoreAttributes.FILENAME to have the .csv extension now. Also update MIME.TYPE
ff = session.putAttribute(ff, CoreAttributes.FILENAME.key(), updateFilenameToCSVExtension(ff.getAttribute(CoreAttributes.UUID.key()),
ff.getAttribute(CoreAttributes.FILENAME.key()), sName));
ff.getAttribute(CoreAttributes.FILENAME.key()), readConfig.getSheetName()));
ff = session.putAttribute(ff, CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE);
session.transfer(ff, SUCCESS);
} catch (SAXException saxE) {
getLogger().error("Failed to create instance of SAXParser {}", new Object[]{SAX_PARSER}, saxE);
} catch (SAXException | ParserConfigurationException saxE) {
getLogger().error("Failed to create instance of Parser.", saxE);
ff = session.putAttribute(ff,
ConvertExcelToCSVProcessor.class.getName() + ".error", saxE.getMessage());
session.transfer(ff, FAILURE);
@ -283,162 +355,161 @@ public class ConvertExcelToCSVProcessor
}
}
static Integer columnToIndex(String col) {
int length = col.length();
int accumulator = 0;
for (int i = length; i > 0; i--) {
char c = col.charAt(i - 1);
int x = ((int) c) - 64;
accumulator += x * Math.pow(26, length - i);
}
// Make it to start with 0.
return accumulator - 1;
}
private static class CellAddress {
final int row;
final int col;
private CellAddress(int row, int col) {
this.row = row;
this.col = col;
}
}
/**
* Extracts every row from an Excel Sheet and generates a corresponding JSONObject whose key is the Excel CellAddress and value
* is the content of that CellAddress converted to a String
* Uses the XSSF Event SAX helpers to do most of the work
* of parsing the Sheet XML, and outputs the contents
* as a (basic) CSV.
*/
private class ExcelSheetRowHandler
extends DefaultHandler {
private class SheetToCSV implements XSSFSheetXMLHandler.SheetContentsHandler {
private ExcelSheetReadConfig readConfig;
CSVFormat csvFormat;
private SharedStringsTable sst;
private String currentContent;
private boolean nextIsString;
private CellAddress firstCellAddress;
private CellAddress firstRowLastCellAddress;
private CellAddress previousCellAddress;
private CellAddress nextCellAddress;
private OutputStream outputStream;
private boolean firstColInRow;
long rowCount;
String sheetName;
private boolean firstCellOfRow;
private boolean skipRow;
private int currentRow = -1;
private int currentCol = -1;
private int rowCount = 0;
private boolean rowHasValues=false;
private int skippedColumns=0;
private ExcelSheetRowHandler(SharedStringsTable sst) {
this.sst = sst;
this.firstColInRow = true;
this.rowCount = 0l;
this.sheetName = UNKNOWN_SHEET_NAME;
}
private CSVPrinter printer;
public void setFlowFileOutputStream(OutputStream outputStream) {
this.outputStream = outputStream;
}
private boolean firstRow=false;
private ArrayList<Object> fieldValues;
public void startElement(String uri, String localName, String name,
Attributes attributes) throws SAXException {
if (name.equals(SAX_CELL_REF)) {
String cellType = attributes.getValue(SAX_CELL_TYPE);
// Analyze cell address.
Matcher cellAddressMatcher = CELL_ADDRESS_REGEX.matcher(attributes.getValue(SAX_CELL_ADDRESS));
if (cellAddressMatcher.matches()) {
String col = cellAddressMatcher.group(1);
String row = cellAddressMatcher.group(2);
nextCellAddress = new CellAddress(Integer.parseInt(row), columnToIndex(col));
if (firstCellAddress == null) {
firstCellAddress = nextCellAddress;
}
}
if (cellType != null && cellType.equals(SAX_CELL_STRING)) {
nextIsString = true;
} else {
nextIsString = false;
}
} else if (name.equals(SAX_ROW_REF)) {
if (firstRowLastCellAddress == null) {
firstRowLastCellAddress = previousCellAddress;
}
firstColInRow = true;
previousCellAddress = null;
nextCellAddress = null;
} else if (name.equals(SAX_SHEET_NAME_REF)) {
sheetName = attributes.getValue(0);
}
currentContent = "";
}
private void fillEmptyColumns(int nextColumn) throws IOException {
final CellAddress previousCell = previousCellAddress != null ? previousCellAddress : firstCellAddress;
if (previousCell != null) {
for (int i = 0; i < (nextColumn - previousCell.col); i++) {
// Fill columns.
outputStream.write(",".getBytes());
}
}
}
public void endElement(String uri, String localName, String name)
throws SAXException {
if (nextIsString) {
int idx = Integer.parseInt(currentContent);
currentContent = new XSSFRichTextString(sst.getEntryAt(idx)).toString();
nextIsString = false;
}
if (name.equals(SAX_CELL_CONTENT_REF)
// Limit scanning from the first column, and up to the last column.
&& (firstCellAddress == null || firstCellAddress.col <= nextCellAddress.col)
&& (firstRowLastCellAddress == null || nextCellAddress.col <= firstRowLastCellAddress.col)) {
try {
// A cell is found.
fillEmptyColumns(nextCellAddress.col);
firstColInRow = false;
outputStream.write(currentContent.getBytes());
// Keep previously found cell address.
previousCellAddress = nextCellAddress;
} catch (IOException e) {
getLogger().error("IO error encountered while writing content of parsed cell " +
"value from sheet {}", new Object[]{getSheetName()}, e);
}
}
if (name.equals(SAX_ROW_REF)) {
//If this is the first row and the end of the row element has been encountered then that means no columns were present.
if (!firstColInRow) {
try {
if (firstRowLastCellAddress != null) {
fillEmptyColumns(firstRowLastCellAddress.col);
}
rowCount++;
outputStream.write("\n".getBytes());
} catch (IOException e) {
getLogger().error("IO error encountered while writing new line indicator", e);
}
}
}
}
public void characters(char[] ch, int start, int length)
throws SAXException {
currentContent += new String(ch, start, length);
}
public long getRowCount() {
public int getRowCount(){
return rowCount;
}
public String getSheetName() {
return sheetName;
public void setOutput(PrintStream output){
final OutputStreamWriter streamWriter = new OutputStreamWriter(output);
try {
printer = new CSVPrinter(streamWriter, csvFormat);
} catch (IOException e) {
throw new ProcessException("Failed to create CSV Printer.", e);
}
}
public SheetToCSV(ExcelSheetReadConfig readConfig, CSVFormat csvFormat){
this.readConfig = readConfig;
this.csvFormat = csvFormat;
}
@Override
public void startRow(int rowNum) {
if(rowNum <= readConfig.getOverrideFirstRow()) {
skipRow = true;
return;
}
// Prepare for this row
skipRow = false;
firstCellOfRow = true;
firstRow = currentRow==-1;
currentRow = rowNum;
currentCol = -1;
rowHasValues = false;
fieldValues = new ArrayList<>();
}
@Override
public void endRow(int rowNum) {
if(skipRow) {
return;
}
if(firstRow){
readConfig.setLastColumn(currentCol);
}
//if there was no data in this row, don't write it
if(!rowHasValues) {
return;
}
// Ensure the correct number of columns
int columnsToAdd = (readConfig.getLastColumn() - currentCol) - readConfig.getColumnsToSkip().size();
for (int i=0; i<columnsToAdd; i++) {
fieldValues.add(null);
}
try {
printer.printRecord(fieldValues);
} catch (IOException e) {
e.printStackTrace();
}
rowCount++;
}
@Override
public void cell(String cellReference, String formattedValue,
XSSFComment comment) {
if(skipRow) {
return;
}
// gracefully handle missing CellRef here in a similar way as XSSFCell does
if(cellReference == null) {
cellReference = new CellAddress(currentRow, currentCol).formatAsString();
}
// Did we miss any cells?
int thisCol = (new CellReference(cellReference)).getCol();
// Should we skip this
//Use the first row of the file to decide on the area of data to export
if(firstRow && firstCellOfRow){
readConfig.setFirstRow(currentRow);
readConfig.setFirstColumn(thisCol);
}
//if this cell falls outside our area, or has been explcitely marked as a skipped column, return and don't write it out.
if(!firstRow && (thisCol < readConfig.getFirstColumn() || thisCol > readConfig.getLastColumn())){
return;
}
if(readConfig.getColumnsToSkip().contains(thisCol)){
skippedColumns++;
return;
}
int missedCols = (thisCol - readConfig.getFirstColumn()) - (currentCol - readConfig.getFirstColumn()) - 1;
if(firstCellOfRow){
missedCols = (thisCol - readConfig.getFirstColumn());
}
missedCols -= skippedColumns;
if (firstCellOfRow) {
firstCellOfRow = false;
}
for (int i=0; i<missedCols; i++) {
fieldValues.add(null);
}
currentCol = thisCol;
fieldValues.add(formattedValue);
rowHasValues = true;
skippedColumns = 0;
}
@Override
public void headerFooter(String s, boolean b, String s1) {
}
public void close() throws IOException {
printer.close();
}
}
/**
* Takes the original input filename and updates it by removing the file extension and replacing it with
* the .csv extension.
@ -472,4 +543,87 @@ public class ConvertExcelToCSVProcessor
return stringBuilder.toString();
}
private class ExcelSheetReadConfig {
public String getSheetName(){
return sheetName;
}
public int getFirstColumn(){
return firstColumn;
}
public void setFirstColumn(int value){
this.firstColumn = value;
}
public int getLastColumn(){
return lastColumn;
}
public void setLastColumn(int lastColumn) {
this.lastColumn = lastColumn;
}
public int getOverrideFirstRow(){
return overrideFirstRow;
}
public boolean getFormatValues() {
return formatValues;
}
public int getFirstRow(){
return firstRow;
}
public void setFirstRow(int value){
firstRow = value;
}
public int getLastRow(){
return lastRow;
}
public void setLastRow(int value){
lastRow = value;
}
public List<Integer> getColumnsToSkip(){
return columnsToSkip;
}
public ReadOnlySharedStringsTable getSharedStringsTable(){
return sst;
}
public StylesTable getStyles(){
return styles;
}
private int firstColumn;
private int lastColumn;
private int firstRow;
private int lastRow;
private int overrideFirstRow;
private String sheetName;
private boolean formatValues;
private ReadOnlySharedStringsTable sst;
private StylesTable styles;
private List<Integer> columnsToSkip;
public ExcelSheetReadConfig(List<Integer> columnsToSkip, int overrideFirstRow, String sheetName, boolean formatValues,
ReadOnlySharedStringsTable sst, StylesTable styles){
this.sheetName = sheetName;
this.columnsToSkip = columnsToSkip;
this.overrideFirstRow = overrideFirstRow;
this.formatValues = formatValues;
this.sst = sst;
this.styles = styles;
}
}
}

View File

@ -20,9 +20,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
@ -41,16 +41,6 @@ public class ConvertExcelToCSVProcessorTest {
testRunner = TestRunners.newTestRunner(ConvertExcelToCSVProcessor.class);
}
@Test
public void testColToIndex() {
assertEquals(Integer.valueOf(0), ConvertExcelToCSVProcessor.columnToIndex("A"));
assertEquals(Integer.valueOf(1), ConvertExcelToCSVProcessor.columnToIndex("B"));
assertEquals(Integer.valueOf(25), ConvertExcelToCSVProcessor.columnToIndex("Z"));
assertEquals(Integer.valueOf(29), ConvertExcelToCSVProcessor.columnToIndex("AD"));
assertEquals(Integer.valueOf(239), ConvertExcelToCSVProcessor.columnToIndex("IF"));
assertEquals(Integer.valueOf(16383), ConvertExcelToCSVProcessor.columnToIndex("XFD"));
}
@Test
public void testMultipleSheetsGeneratesMultipleFlowFiles() throws Exception {
@ -81,6 +71,144 @@ public class ConvertExcelToCSVProcessorTest {
}
@Test
public void testDataFormatting() throws Exception {
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath());
testRunner.setProperty(ConvertExcelToCSVProcessor.FORMAT_VALUES, "false");
testRunner.run();
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.SUCCESS, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.ORIGINAL, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
assertTrue(rowsSheet == 9);
ff.assertContentEquals("Numbers,Timestamps,Money\n" +
"1234.4559999999999,42736.5,123.45\n" +
"1234.4559999999999,42736.5,123.45\n" +
"1234.4559999999999,42736.5,123.45\n" +
"1234.4559999999999,42736.5,1023.45\n" +
"1234.4559999999999,42736.5,1023.45\n" +
"987654321,42736.5,1023.45\n" +
"987654321,,\n" +
"987654321,,\n");
}
@Test
public void testQuoting() throws Exception {
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath());
testRunner.setProperty(CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL);
testRunner.setProperty(ConvertExcelToCSVProcessor.FORMAT_VALUES, "true");
testRunner.run();
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.SUCCESS, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.ORIGINAL, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
assertTrue(rowsSheet == 9);
ff.assertContentEquals("Numbers,Timestamps,Money\n" +
"1234.456,1/1/17,$ 123.45\n" +
"1234.46,12:00:00 PM,£ 123.45\n" +
"1234.5,\"Sunday, January 01, 2017\",¥ 123.45\n" +
"\"1,234.46\",1/1/17 12:00,\"$ 1,023.45\"\n" +
"\"1,234.4560\",12:00 PM,\"£ 1,023.45\"\n" +
"9.88E+08,2017/01/01/ 12:00,\"¥ 1,023.45\"\n" +
"9.877E+08,,\n" +
"9.8765E+08,,\n");
}
@Test
public void testSkipRows() throws Exception {
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath());
testRunner.setProperty(ConvertExcelToCSVProcessor.ROWS_TO_SKIP, "2");
testRunner.setProperty(ConvertExcelToCSVProcessor.FORMAT_VALUES, "true");
testRunner.run();
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.SUCCESS, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.ORIGINAL, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
assertEquals("Row count does match expected value.", "7", rowsSheet.toString());
ff.assertContentEquals("1234.46,12:00:00 PM,£ 123.45\n" +
"1234.5,Sunday\\, January 01\\, 2017,¥ 123.45\n" +
"1\\,234.46,1/1/17 12:00,$ 1\\,023.45\n" +
"1\\,234.4560,12:00 PM,£ 1\\,023.45\n" +
"9.88E+08,2017/01/01/ 12:00,¥ 1\\,023.45\n" +
"9.877E+08,,\n" +
"9.8765E+08,,\n");
}
@Test
public void testSkipColumns() throws Exception {
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath());
testRunner.setProperty(ConvertExcelToCSVProcessor.COLUMNS_TO_SKIP, "2");
testRunner.setProperty(ConvertExcelToCSVProcessor.FORMAT_VALUES, "true");
testRunner.run();
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.SUCCESS, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.ORIGINAL, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
assertTrue(rowsSheet == 9);
ff.assertContentEquals("Numbers,Money\n" +
"1234.456,$ 123.45\n" +
"1234.46,£ 123.45\n" +
"1234.5,¥ 123.45\n" +
"1\\,234.46,$ 1\\,023.45\n" +
"1\\,234.4560,£ 1\\,023.45\n" +
"9.88E+08,¥ 1\\,023.45\n" +
"9.877E+08,\n" +
"9.8765E+08,\n");
}
@Test
public void testCustomDelimiters() throws Exception {
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath());
testRunner.setProperty(CSVUtils.VALUE_SEPARATOR, "|");
testRunner.setProperty(CSVUtils.RECORD_SEPARATOR, "\\r\\n");
testRunner.setProperty(ConvertExcelToCSVProcessor.FORMAT_VALUES, "true");
testRunner.run();
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.SUCCESS, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.ORIGINAL, 1);
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
assertTrue(rowsSheet == 9);
ff.assertContentEquals("Numbers|Timestamps|Money\r\n" +
"1234.456|1/1/17|$ 123.45\r\n" +
"1234.46|12:00:00 PM|£ 123.45\r\n" +
"1234.5|Sunday, January 01, 2017|¥ 123.45\r\n" +
"1,234.46|1/1/17 12:00|$ 1,023.45\r\n" +
"1,234.4560|12:00 PM|£ 1,023.45\r\n" +
"9.88E+08|2017/01/01/ 12:00|¥ 1,023.45\r\n" +
"9.877E+08||\r\n" +
"9.8765E+08||\r\n");
}
/**
* Validates that all sheets in the Excel document are exported.
*
@ -181,7 +309,7 @@ public class ConvertExcelToCSVProcessorTest {
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
Long l = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
assertTrue(l == 8l);
ff.isContentEqual("test", StandardCharsets.UTF_8);
ff.assertContentEquals(new File("src/test/resources/with-blank-cells.csv"));
}
@ -199,8 +327,8 @@ public class ConvertExcelToCSVProcessorTest {
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 1);
List<LogMessage> errorMessages = testRunner.getLogger().getErrorMessages();
Assert.assertEquals(2, errorMessages.size());
Assert.assertEquals(1, errorMessages.size());
String messageText = errorMessages.get(0).getMsg();
Assert.assertTrue(messageText.contains("Excel") && messageText.contains("supported"));
Assert.assertTrue(messageText.contains("Excel") && messageText.contains("OLE2"));
}
}