1
0
mirror of https://github.com/apache/nifi.git synced 2025-02-16 15:06:00 +00:00

NIFI-11167 Added ExcelReader to nifi-poi-nar

- Moved shared schema inference components to nifi-record-serialization-services-shared

This closes 

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
dan-s1 2023-03-31 19:16:53 +00:00 committed by exceptionfactory
parent bc6ac4b700
commit e4d1dab8f7
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
32 changed files with 1684 additions and 27 deletions
nifi-nar-bundles
nifi-poi-bundle
nifi-standard-services/nifi-record-serialization-services-bundle
nifi-record-serialization-services-shared
nifi-record-serialization-services
pom.xml
src/main/resources
META-INF/services
docs/org.apache.nifi.excel.ExcelReader
pom.xml

@ -35,5 +35,16 @@
<artifactId>nifi-poi-processors</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-poi-services</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

@ -15,11 +15,6 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<poi.version>5.2.3</poi.version>
</properties>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-poi-bundle</artifactId>
@ -46,17 +41,14 @@
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>${poi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>${poi.version}</version>
</dependency>
<dependency>
<groupId>com.github.pjfanning</groupId>
<artifactId>excel-streaming-reader</artifactId>
<version>4.0.5</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
@ -79,8 +71,6 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-poi-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-poi-services</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/excel/collegeScorecard.xlsx</exclude>
<exclude>src/test/resources/excel/dataformatting.xlsx</exclude>
<exclude>src/test/resources/excel/dates.xlsx</exclude>
<exclude>src/test/resources/excel/notExcel.txt</exclude>
<exclude>src/test/resources/excel/numbers.xlsx</exclude>
<exclude>src/test/resources/excel/olderFormat.xls</exclude>
<exclude>src/test/resources/excel/simpleDataFormatting.xlsx</exclude>
<exclude>src/test/resources/excel/twoSheets.xlsx</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services-shared</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
</dependency>
<dependency>
<groupId>com.github.pjfanning</groupId>
<artifactId>excel-streaming-reader</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.excel;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
import org.apache.nifi.schema.inference.RecordSourceFactory;
import org.apache.nifi.schema.inference.SchemaInferenceEngine;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.DateTimeUtils;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SchemaRegistryService;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.stream.io.NonCloseableInputStream;
import org.apache.poi.ss.usermodel.Row;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@Tags({"excel", "spreadsheet", "xlsx", "parse", "record", "row", "reader", "values", "cell"})
@CapabilityDescription("Parses a Microsoft Excel document returning each row in each sheet as a separate record. "
+ "This reader allows for inferring a schema from all the required sheets "
+ "or providing an explicit schema for interpreting the values."
+ "See Controller Service's Usage for further documentation. "
+ "This reader is currently only capable of processing .xlsx "
+ "(XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents.")
public class ExcelReader extends SchemaRegistryService implements RecordReaderFactory {
public static final PropertyDescriptor REQUIRED_SHEETS = new PropertyDescriptor
.Builder().name("Required Sheets")
.displayName("Required Sheets")
.description("Comma-separated list of Excel document sheet names whose rows should be extracted from the excel document. If this property" +
" is left blank then all the rows from all the sheets will be extracted from the Excel document. The list of names is case sensitive. Any sheets not" +
" specified in this value will be ignored. An exception will be thrown if a specified sheet(s) are not found.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor STARTING_ROW = new PropertyDescriptor
.Builder().name("Starting Row")
.displayName("Starting Row")
.description("The row number of the first row to start processing (One based)."
+ " Use this to skip over rows of data at the top of a worksheet that are not part of the dataset.")
.required(true)
.defaultValue("1")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
private volatile ConfigurationContext configurationContext;
private volatile int firstRow;
private volatile String dateFormat;
private volatile String timeFormat;
private volatile String timestampFormat;
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.configurationContext = context;
this.firstRow = getStartingRow(context);
this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
}
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger)
throws MalformedRecordException, IOException, SchemaNotFoundException {
// Use Mark/Reset of a BufferedInputStream in case we read from the Input Stream for the header.
in.mark(1024 * 1024);
final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(in), null);
in.reset();
final List<String> requiredSheets = getRequiredSheets(variables);
final ExcelRecordReaderConfiguration configuration = new ExcelRecordReaderConfiguration.Builder()
.withDateFormat(dateFormat)
.withRequiredSheets(requiredSheets)
.withFirstRow(firstRow)
.withSchema(schema)
.withTimeFormat(timeFormat)
.withTimestampFormat(timestampFormat)
.build();
return new ExcelRecordReader(configuration, in, logger);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(STARTING_ROW);
properties.add(REQUIRED_SHEETS);
properties.add(DateTimeUtils.DATE_FORMAT);
properties.add(DateTimeUtils.TIME_FORMAT);
properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
return properties;
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
if (SchemaInferenceUtil.INFER_SCHEMA.getValue().equals(allowableValue)) {
final RecordSourceFactory<Row> sourceFactory = (variables, in) -> new ExcelRecordSource(in, context, variables, getLogger());
final SchemaInferenceEngine<Row> inference = new ExcelSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
return new InferSchemaAccessStrategy<>(sourceFactory, inference, getLogger());
}
return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
}
@Override
protected List<AllowableValue> getSchemaAccessStrategyValues() {
final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());
allowableValues.add(SchemaInferenceUtil.INFER_SCHEMA);
return allowableValues;
}
@Override
protected AllowableValue getDefaultSchemaAccessStrategy() {
return SchemaInferenceUtil.INFER_SCHEMA;
}
private int getStartingRow(final PropertyContext context) {
int rawStartingRow = context.getProperty(STARTING_ROW).asInteger();
return getZeroBasedIndex(rawStartingRow);
}
static int getZeroBasedIndex(int rawStartingRow) {
return rawStartingRow > 0 ? rawStartingRow - 1 : 0;
}
private List<String> getRequiredSheets(final Map<String, String> attributes) {
final String requiredSheetsDelimited = configurationContext.getProperty(REQUIRED_SHEETS).evaluateAttributeExpressions(attributes).getValue();
return getRequiredSheets(requiredSheetsDelimited);
}
static List<String> getRequiredSheets(String requiredSheetsDelimited) {
if (requiredSheetsDelimited != null) {
String[] delimitedSheets = StringUtils.split(requiredSheetsDelimited, ",");
if (delimitedSheets != null) {
return Arrays.asList(delimitedSheets);
}
}
return Collections.emptyList();
}
}

@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.excel;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.DateUtil;
import org.apache.poi.ss.usermodel.Row;
import java.io.IOException;
import java.io.InputStream;
import java.text.DateFormat;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import static org.apache.commons.lang3.StringUtils.isEmpty;
public class ExcelRecordReader implements RecordReader {
private final RowIterator rowIterator;
private final RecordSchema schema;
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
private final String dateFormat;
private final String timeFormat;
private final String timestampFormat;
public ExcelRecordReader(ExcelRecordReaderConfiguration configuration, InputStream inputStream, ComponentLog logger) throws MalformedRecordException {
this.schema = configuration.getSchema();
if (isEmpty(configuration.getDateFormat())) {
this.dateFormat = null;
LAZY_DATE_FORMAT = null;
} else {
this.dateFormat = configuration.getDateFormat();
LAZY_DATE_FORMAT = () -> DataTypeUtils.getDateFormat(dateFormat);
}
if (isEmpty(configuration.getTimeFormat())) {
this.timeFormat = null;
LAZY_TIME_FORMAT = null;
} else {
this.timeFormat = configuration.getTimeFormat();
LAZY_TIME_FORMAT = () -> DataTypeUtils.getDateFormat(timeFormat);
}
if (isEmpty(configuration.getTimestampFormat())) {
this.timestampFormat = null;
LAZY_TIMESTAMP_FORMAT = null;
} else {
this.timestampFormat = configuration.getTimestampFormat();
LAZY_TIMESTAMP_FORMAT = () -> DataTypeUtils.getDateFormat(timestampFormat);
}
try {
this.rowIterator = new RowIterator(inputStream, configuration.getRequiredSheets(), configuration.getFirstRow(), logger);
} catch (RuntimeException e) {
throw new MalformedRecordException("Read initial Record from Excel XLSX failed", e);
}
}
@Override
public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws MalformedRecordException {
try {
if (rowIterator.hasNext()) {
Row currentRow = rowIterator.next();
Map<String, Object> currentRowValues = getCurrentRowValues(currentRow, coerceTypes, dropUnknownFields);
return new MapRecord(schema, currentRowValues);
}
} catch (Exception e) {
throw new MalformedRecordException("Read next Record from Excel XLSX failed", e);
}
return null;
}
@Override
public RecordSchema getSchema() {
return schema;
}
@Override
public void close() throws IOException {
this.rowIterator.close();
}
private Map<String, Object> getCurrentRowValues(Row currentRow, boolean coerceTypes, boolean dropUnknownFields) {
final List<RecordField> recordFields = schema.getFields();
final Map<String, Object> currentRowValues = new LinkedHashMap<>();
if (ExcelUtils.hasCells(currentRow)) {
IntStream.range(0, currentRow.getLastCellNum())
.forEach(index -> {
Cell cell = currentRow.getCell(index);
Object cellValue;
if (index >= recordFields.size()) {
if (!dropUnknownFields) {
cellValue = getCellValue(cell);
currentRowValues.put("unknown_field_index_" + index, cellValue);
}
} else {
final RecordField recordField = recordFields.get(index);
String fieldName = recordField.getFieldName();
DataType dataType = recordField.getDataType();
cellValue = getCellValue(cell);
final Object value = coerceTypes ? convert(cellValue, dataType, fieldName)
: convertSimpleIfPossible(cellValue, dataType, fieldName);
currentRowValues.put(fieldName, value);
}
});
}
return currentRowValues;
}
private static Object getCellValue(Cell cell) {
if (cell != null) {
switch (cell.getCellType()) {
case _NONE:
case BLANK:
case ERROR:
case FORMULA:
case STRING:
return cell.getStringCellValue();
case NUMERIC:
return DateUtil.isCellDateFormatted(cell) ? cell.getDateCellValue() : cell.getNumericCellValue();
case BOOLEAN:
return cell.getBooleanCellValue();
}
}
return null;
}
private Object convert(final Object value, final DataType dataType, final String fieldName) {
if (value == null || dataType == null) {
return value;
}
return DataTypeUtils.convertType(value, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
}
private Object convertSimpleIfPossible(final Object value, final DataType dataType, final String fieldName) {
if (value == null || dataType == null) {
return value;
}
switch (dataType.getFieldType()) {
case STRING:
return value;
case BOOLEAN:
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case DECIMAL:
case BYTE:
case CHAR:
case SHORT:
if (DataTypeUtils.isCompatibleDataType(value, dataType)) {
return DataTypeUtils.convertType(value, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
}
break;
case DATE:
if (DataTypeUtils.isDateTypeCompatible(value, dateFormat)) {
return DataTypeUtils.convertType(value, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
}
break;
case TIME:
if (DataTypeUtils.isTimeTypeCompatible(value, timeFormat)) {
return DataTypeUtils.convertType(value, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
}
break;
case TIMESTAMP:
if (DataTypeUtils.isTimestampTypeCompatible(value, timestampFormat)) {
return DataTypeUtils.convertType(value, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
}
break;
}
return value;
}
}

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.excel;
import org.apache.nifi.serialization.record.RecordSchema;
import java.util.Collections;
import java.util.List;
public class ExcelRecordReaderConfiguration {
private RecordSchema schema;
private List<String> requiredSheets;
private int firstRow;
private String dateFormat;
private String timeFormat;
private String timestampFormat;
private ExcelRecordReaderConfiguration() {
}
public RecordSchema getSchema() {
return schema;
}
public List<String> getRequiredSheets() {
return requiredSheets;
}
public int getFirstRow() {
return firstRow;
}
public String getDateFormat() {
return dateFormat;
}
public String getTimeFormat() {
return timeFormat;
}
public String getTimestampFormat() {
return timestampFormat;
}
public static final class Builder {
private RecordSchema schema;
private List<String> requiredSheets;
private int firstRow;
private String dateFormat;
private String timeFormat;
private String timestampFormat;
public Builder withSchema(RecordSchema schema) {
this.schema = schema;
return this;
}
public Builder withRequiredSheets(List<String> requiredSheets) {
this.requiredSheets = requiredSheets;
return this;
}
public Builder withFirstRow(int firstRow) {
this.firstRow = firstRow;
return this;
}
public Builder withDateFormat(String dateFormat) {
this.dateFormat = dateFormat;
return this;
}
public Builder withTimeFormat(String timeFormat) {
this.timeFormat = timeFormat;
return this;
}
public Builder withTimestampFormat(String timestampFormat) {
this.timestampFormat = timestampFormat;
return this;
}
public ExcelRecordReaderConfiguration build() {
ExcelRecordReaderConfiguration excelRecordReaderConfiguration = new ExcelRecordReaderConfiguration();
excelRecordReaderConfiguration.schema = this.schema;
excelRecordReaderConfiguration.timeFormat = this.timeFormat;
excelRecordReaderConfiguration.timestampFormat = this.timestampFormat;
excelRecordReaderConfiguration.requiredSheets = this.requiredSheets == null ? Collections.emptyList() : this.requiredSheets;
excelRecordReaderConfiguration.dateFormat = this.dateFormat;
excelRecordReaderConfiguration.firstRow = this.firstRow;
return excelRecordReaderConfiguration;
}
}
}

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.excel;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.inference.RecordSource;
import org.apache.poi.ss.usermodel.Row;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
public class ExcelRecordSource implements RecordSource<Row> {
private final RowIterator rowIterator;
public ExcelRecordSource(final InputStream in, final PropertyContext context, final Map<String, String> variables, final ComponentLog logger) {
final String requiredSheetsDelimited = context.getProperty(ExcelReader.REQUIRED_SHEETS).evaluateAttributeExpressions(variables).getValue();
final List<String> requiredSheets = ExcelReader.getRequiredSheets(requiredSheetsDelimited);
final Integer rawFirstRow = context.getProperty(ExcelReader.STARTING_ROW).evaluateAttributeExpressions(variables).asInteger();
final int firstRow = rawFirstRow == null ? NumberUtils.toInt(ExcelReader.STARTING_ROW.getDefaultValue()) : rawFirstRow;
final int zeroBasedFirstRow = ExcelReader.getZeroBasedIndex(firstRow);
this.rowIterator = new RowIterator(in, requiredSheets, zeroBasedFirstRow, logger);
}
@Override
public Row next() {
return rowIterator.hasNext() ? rowIterator.next() : null;
}
}

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.excel;
import org.apache.nifi.schema.inference.FieldTypeInference;
import org.apache.nifi.schema.inference.RecordSource;
import org.apache.nifi.schema.inference.SchemaInferenceEngine;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.SchemaInferenceUtil;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.DataFormatter;
import org.apache.poi.ss.usermodel.Row;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ExcelSchemaInference implements SchemaInferenceEngine<Row> {
static final String FIELD_NAME_PREFIX = "column_";
private final TimeValueInference timeValueInference;
private final DataFormatter dataFormatter;
public ExcelSchemaInference(TimeValueInference timeValueInference) {
this(timeValueInference, null);
}
public ExcelSchemaInference(TimeValueInference timeValueInference, Locale locale) {
this.timeValueInference = timeValueInference;
this.dataFormatter = locale == null ? new DataFormatter() : new DataFormatter(locale);
}
@Override
public RecordSchema inferSchema(RecordSource<Row> recordSource) throws IOException {
final Map<String, FieldTypeInference> typeMap = new LinkedHashMap<>();
Row row;
while ((row = recordSource.next()) != null) {
inferSchema(row, typeMap);
}
return createSchema(typeMap);
}
private void inferSchema(final Row row, final Map<String, FieldTypeInference> typeMap) {
if (ExcelUtils.hasCells(row)) {
IntStream.range(0, row.getLastCellNum())
.forEach(index -> {
final Cell cell = row.getCell(index);
final String fieldName = FIELD_NAME_PREFIX + index;
final FieldTypeInference typeInference = typeMap.computeIfAbsent(fieldName, key -> new FieldTypeInference());
final String formattedCellValue = dataFormatter.formatCellValue(cell);
final DataType dataType = SchemaInferenceUtil.getDataType(formattedCellValue, timeValueInference);
typeInference.addPossibleDataType(dataType);
});
}
}
private RecordSchema createSchema(final Map<String, FieldTypeInference> inferences) {
final List<RecordField> recordFields = inferences.entrySet().stream()
.map(entry -> new RecordField(entry.getKey(), entry.getValue().toDataType(), true))
.collect(Collectors.toList());
return new SimpleRecordSchema(recordFields);
}
}

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.excel;
import org.apache.poi.ss.usermodel.Row;
public class ExcelUtils {
private ExcelUtils() {
}
public static boolean hasCells(final Row row) {
return row != null && row.getFirstCellNum() != -1;
}
}

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.excel;
import com.github.pjfanning.xlsx.StreamingReader;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
class RowIterator implements Iterator<Row>, Closeable {
private final Workbook workbook;
private final Iterator<Sheet> sheets;
private final int firstRow;
private final ComponentLog logger;
private Sheet currentSheet;
private Iterator<Row> currentRows;
private Row currentRow;
RowIterator(final InputStream in, final List<String> requiredSheets, final int firstRow, final ComponentLog logger) {
this.workbook = StreamingReader.builder()
.rowCacheSize(100)
.bufferSize(4096)
.open(in);
if (requiredSheets == null || requiredSheets.isEmpty()) {
this.sheets = this.workbook.iterator();
} else {
final Map<String, Integer> requiredSheetsMap = requiredSheets.stream()
.collect(Collectors.toMap(key -> key, this.workbook::getSheetIndex));
final String sheetsNotFound = requiredSheetsMap.entrySet().stream()
.filter(entry -> entry.getValue() == -1)
.map(Map.Entry::getKey)
.collect(Collectors.joining(","));
if (!sheetsNotFound.isEmpty()) {
throw new ProcessException("Required Excel Sheets not found: " + sheetsNotFound);
}
this.sheets = requiredSheetsMap.values().stream()
.map(this.workbook::getSheetAt)
.collect(Collectors.toList()).iterator();
}
this.firstRow = firstRow;
this.logger = logger;
setCurrent();
}
@Override
public boolean hasNext() {
return currentRow != null;
}
@Override
public Row next() {
if (currentRow == null) {
throw new NoSuchElementException();
}
final Row next = currentRow;
setCurrent();
return next;
}
@Override
public void close() throws IOException {
this.workbook.close();
}
private void setCurrent() {
currentRow = getNextRow();
if (currentRow != null) {
return;
}
while (sheets.hasNext()) {
currentSheet = sheets.next();
currentRows = currentSheet.iterator();
currentRow = getNextRow();
if (currentRow != null) {
return;
}
}
}
private Row getNextRow() {
while (currentRows != null && !hasExhaustedRows()) {
final Row tempCurrentRow = currentRows.next();
if (tempCurrentRow.getRowNum() >= firstRow) {
return tempCurrentRow;
}
}
return null;
}
private boolean hasExhaustedRows() {
final boolean exhausted = !currentRows.hasNext();
if (exhausted) {
logger.debug("Exhausted all rows from sheet {}", currentSheet.getSheetName());
}
return exhausted;
}
}

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.excel.ExcelReader

@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.excel;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.InputStream;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(MockitoExtension.class)
public class TestExcelRecordReader {
private static final String DATA_FORMATTING_FILE = "dataformatting.xlsx";
private static final String MULTI_SHEET_FILE = "twoSheets.xlsx";
@Mock
ComponentLog logger;
@Test
public void testNonExcelFile() {
ExcelRecordReaderConfiguration configuration = new ExcelRecordReaderConfiguration.Builder()
.build();
MalformedRecordException mre = assertThrows(MalformedRecordException.class, () -> new ExcelRecordReader(configuration, getInputStream("notExcel.txt"), logger));
assertTrue(ExceptionUtils.getStackTrace(mre).contains("this is not a valid OOXML"));
}
@Test
public void testOlderExcelFormatFile() {
ExcelRecordReaderConfiguration configuration = new ExcelRecordReaderConfiguration.Builder().build();
MalformedRecordException mre = assertThrows(MalformedRecordException.class, () -> new ExcelRecordReader(configuration, getInputStream("olderFormat.xls"), logger));
assertTrue(ExceptionUtils.getStackTrace(mre).contains("data appears to be in the OLE2 Format"));
}
@Test
public void testMultipleRecordsSingleSheet() throws MalformedRecordException {
ExcelRecordReaderConfiguration configuration = new ExcelRecordReaderConfiguration.Builder()
.withSchema(getDataFormattingSchema())
.build();
ExcelRecordReader recordReader = new ExcelRecordReader(configuration, getInputStream(DATA_FORMATTING_FILE), logger);
List<Record> records = getRecords(recordReader, false, false);
assertEquals(9, records.size());
}
private RecordSchema getDataFormattingSchema() {
final List<RecordField> fields = Arrays.asList(
new RecordField("Numbers", RecordFieldType.DOUBLE.getDataType()),
new RecordField("Timestamps", RecordFieldType.DATE.getDataType()),
new RecordField("Money", RecordFieldType.DOUBLE.getDataType()),
new RecordField("Flags", RecordFieldType.BOOLEAN.getDataType()));
return new SimpleRecordSchema(fields);
}
private InputStream getInputStream(final String excelFile) {
final String resourcePath = String.format("/excel/%s", excelFile);
final InputStream resourceStream = getClass().getResourceAsStream(resourcePath);
if (resourceStream == null) {
throw new IllegalStateException(String.format("Resource [%s] not found", resourcePath));
}
return resourceStream;
}
private List<Record> getRecords(ExcelRecordReader recordReader, boolean coerceTypes, boolean dropUnknownFields) throws MalformedRecordException {
Record record;
List<Record> records = new ArrayList<>();
while ((record = recordReader.nextRecord(coerceTypes, dropUnknownFields)) != null) {
records.add(record);
}
return records;
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDropUnknownFields(boolean dropUnknownFields) throws MalformedRecordException {
final List<RecordField> fields = Arrays.asList(
new RecordField("Numbers", RecordFieldType.DOUBLE.getDataType()),
new RecordField("Timestamps", RecordFieldType.DATE.getDataType()));
ExcelRecordReaderConfiguration configuration = new ExcelRecordReaderConfiguration.Builder()
.withSchema(new SimpleRecordSchema(fields))
.build();
ExcelRecordReader recordReader = new ExcelRecordReader(configuration, getInputStream(DATA_FORMATTING_FILE), logger);
List<Record> records = getRecords(recordReader, false, dropUnknownFields);
assertEquals(9, records.size());
if(dropUnknownFields) {
records.forEach(record -> assertEquals(fields.size(), record.getRawFieldNames().size()));
} else {
records.forEach(record -> {
int rawNumFields = record.getRawFieldNames().size();
assertTrue(rawNumFields >= 2 && rawNumFields <= 4);
});
}
}
@Test
public void testSkipLines() throws MalformedRecordException {
ExcelRecordReaderConfiguration configuration = new ExcelRecordReaderConfiguration.Builder()
.withFirstRow(5)
.withSchema(getDataFormattingSchema())
.build();
ExcelRecordReader recordReader = new ExcelRecordReader(configuration, getInputStream(DATA_FORMATTING_FILE), logger);
List<Record> records = getRecords(recordReader, false, false);
assertEquals(4, records.size());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void tesCoerceTypes(boolean coerceTypes) throws MalformedRecordException {
String fieldName = "dates";
RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(new RecordField(fieldName, RecordFieldType.TIMESTAMP.getDataType())));
ExcelRecordReaderConfiguration configuration = new ExcelRecordReaderConfiguration.Builder()
.withDateFormat("MM/dd/yyyy")
.withTimeFormat(RecordFieldType.TIME.getDefaultFormat())
.withTimestampFormat(RecordFieldType.TIMESTAMP.getDefaultFormat())
.withSchema(schema)
.build();
ExcelRecordReader recordReader = new ExcelRecordReader(configuration, getInputStream("dates.xlsx"), logger);
List<Record> records = getRecords(recordReader, coerceTypes, false);
assertEquals(6, records.size());
records.forEach(record -> assertInstanceOf(Timestamp.class, record.getValue(fieldName)));
}
@Test
public void testSelectSpecificSheet() throws MalformedRecordException {
RecordSchema schema = getSpecificSheetSchema();
List<String> requiredSheets = Collections.singletonList("TestSheetA");
ExcelRecordReaderConfiguration configuration = new ExcelRecordReaderConfiguration.Builder()
.withSchema(schema)
.withFirstRow(1)
.withRequiredSheets(requiredSheets)
.build();
ExcelRecordReader recordReader = new ExcelRecordReader(configuration, getInputStream(MULTI_SHEET_FILE), logger);
List<Record> records = getRecords(recordReader, false, false);
assertEquals(3, records.size());
}
private RecordSchema getSpecificSheetSchema() {
return new SimpleRecordSchema(Arrays.asList(new RecordField("first", RecordFieldType.STRING.getDataType()),
new RecordField("second", RecordFieldType.STRING.getDataType()),
new RecordField("third", RecordFieldType.STRING.getDataType())));
}
@Test
public void testSelectSpecificSheetNotFound() {
RecordSchema schema = getSpecificSheetSchema();
List<String> requiredSheets = Collections.singletonList("notExistingSheet");
ExcelRecordReaderConfiguration configuration = new ExcelRecordReaderConfiguration.Builder()
.withSchema(schema)
.withFirstRow(1)
.withRequiredSheets(requiredSheets)
.build();
MalformedRecordException mre = assertThrows(MalformedRecordException.class,
() -> new ExcelRecordReader(configuration, getInputStream(MULTI_SHEET_FILE), logger));
assertInstanceOf(ProcessException.class, mre.getCause());
assertTrue(mre.getCause().getMessage().startsWith("Required Excel Sheets not found"));
}
@Test
public void testSelectAllSheets() throws MalformedRecordException {
RecordSchema schema = new SimpleRecordSchema(Arrays.asList(new RecordField("first", RecordFieldType.STRING.getDataType()),
new RecordField("second", RecordFieldType.STRING.getDataType())));
ExcelRecordReaderConfiguration configuration = new ExcelRecordReaderConfiguration.Builder()
.withSchema(schema)
.build();
ExcelRecordReader recordReader = new ExcelRecordReader(configuration, getInputStream(MULTI_SHEET_FILE), logger);
List<Record> records = getRecords(recordReader, false, false);
assertEquals(7, records.size());
}
}

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.excel;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockConfigurationContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(MockitoExtension.class)
public class TestExcelSchemaInference {
private static final String EXPECTED_FIRST_FIELD_NAME = ExcelSchemaInference.FIELD_NAME_PREFIX + "0";
private static final String EXPECTED_SECOND_FIELD_NAME = ExcelSchemaInference.FIELD_NAME_PREFIX + "1";
private static final String EXPECTED_THIRD_FIELD_NAME = ExcelSchemaInference.FIELD_NAME_PREFIX + "2";
private static final String EXPECTED_FOURTH_FIELD_NAME = ExcelSchemaInference.FIELD_NAME_PREFIX + "3";
private final TimeValueInference timestampInference = new TimeValueInference("MM/dd/yyyy", "HH:mm:ss.SSS", "yyyy/MM/dd/ HH:mm");
@Mock
ComponentLog logger;
@ParameterizedTest
@MethodSource("getLocales")
public void testInferenceAgainstDifferentLocales(Locale locale) throws IOException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
new ExcelReader().getSupportedPropertyDescriptors().forEach(prop -> properties.put(prop, prop.getDefaultValue()));
final PropertyContext context = new MockConfigurationContext(properties, null);
try (final InputStream inputStream = getResourceStream("/excel/numbers.xlsx")) {
final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>(
(variables, content) -> new ExcelRecordSource(content, context, variables, logger),
new ExcelSchemaInference(timestampInference, locale), logger);
final RecordSchema schema = accessStrategy.getSchema(null, inputStream, null);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(Collections.singletonList(EXPECTED_FIRST_FIELD_NAME), fieldNames);
if (Locale.FRENCH.equals(locale)) {
assertEquals(RecordFieldType.STRING, schema.getDataType(EXPECTED_FIRST_FIELD_NAME).get().getFieldType());
} else {
assertEquals(RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.FLOAT.getDataType(), RecordFieldType.STRING.getDataType()),
schema.getDataType(EXPECTED_FIRST_FIELD_NAME).get());
}
}
}
private static Stream<Arguments> getLocales() {
Locale hindi = new Locale("hin");
return Stream.of(
Arguments.of(Locale.ENGLISH),
Arguments.of(hindi),
Arguments.of(Locale.JAPANESE),
Arguments.of(Locale.FRENCH)
);
}
@Test
public void testInferenceIncludesAllRecords() throws IOException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
new ExcelReader().getSupportedPropertyDescriptors().forEach(prop -> properties.put(prop, prop.getDefaultValue()));
final PropertyContext context = new MockConfigurationContext(properties, null);
final RecordSchema schema;
try (final InputStream inputStream = getResourceStream("/excel/simpleDataFormatting.xlsx")) {
final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>(
(variables, content) -> new ExcelRecordSource(content, context, variables, logger),
new ExcelSchemaInference(timestampInference), Mockito.mock(ComponentLog.class));
schema = accessStrategy.getSchema(null, inputStream, null);
}
final List<String> fieldNames = schema.getFieldNames();
assertEquals(Arrays.asList(EXPECTED_FIRST_FIELD_NAME, EXPECTED_SECOND_FIELD_NAME,
EXPECTED_THIRD_FIELD_NAME, EXPECTED_FOURTH_FIELD_NAME), fieldNames);
assertEquals(RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.INT.getDataType(),
RecordFieldType.STRING.getDataType()), schema.getDataType(EXPECTED_FIRST_FIELD_NAME).get());
assertEquals(RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.TIMESTAMP.getDataType("yyyy/MM/dd/ HH:mm"), RecordFieldType.STRING.getDataType()),
schema.getDataType(EXPECTED_SECOND_FIELD_NAME).get());
assertEquals(RecordFieldType.STRING, schema.getDataType(EXPECTED_THIRD_FIELD_NAME).get().getFieldType());
assertEquals(RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.BOOLEAN.getDataType(),
RecordFieldType.STRING.getDataType()), schema.getDataType(EXPECTED_FOURTH_FIELD_NAME).get());
}
@Test
public void testInferenceIncludesAllRecordsWithEL() throws IOException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
new ExcelReader().getSupportedPropertyDescriptors().forEach(prop -> properties.put(prop, prop.getDefaultValue()));
properties.put(ExcelReader.REQUIRED_SHEETS, "${required.sheets}");
properties.put(ExcelReader.STARTING_ROW, "${rows.to.skip}");
final PropertyContext context = new MockConfigurationContext(properties, null);
final Map<String, String> attributes = new HashMap<>();
attributes.put("required.sheets", "Sheet1");
attributes.put("rows.to.skip", "2");
final RecordSchema schema;
try (final InputStream inputStream = getResourceStream("/excel/simpleDataFormatting.xlsx")) {
final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>(
(variables, content) -> new ExcelRecordSource(content, context, variables, logger),
new ExcelSchemaInference(timestampInference), Mockito.mock(ComponentLog.class));
schema = accessStrategy.getSchema(attributes, inputStream, null);
}
final List<String> fieldNames = schema.getFieldNames();
assertEquals(Arrays.asList(EXPECTED_FIRST_FIELD_NAME, EXPECTED_SECOND_FIELD_NAME,
EXPECTED_THIRD_FIELD_NAME, EXPECTED_FOURTH_FIELD_NAME), fieldNames);
assertEquals(RecordFieldType.INT.getDataType(), schema.getDataType(EXPECTED_FIRST_FIELD_NAME).get());
assertEquals(RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.TIMESTAMP.getDataType("yyyy/MM/dd/ HH:mm"), RecordFieldType.STRING.getDataType()),
schema.getDataType(EXPECTED_SECOND_FIELD_NAME).get());
assertEquals(RecordFieldType.STRING, schema.getDataType(EXPECTED_THIRD_FIELD_NAME).get().getFieldType());
assertEquals(RecordFieldType.BOOLEAN.getDataType(), schema.getDataType(EXPECTED_FOURTH_FIELD_NAME).get());
}
private InputStream getResourceStream(final String relativePath) {
final InputStream resourceStream = getClass().getResourceAsStream(relativePath);
if (resourceStream == null) {
throw new IllegalStateException(String.format("Resource [%s] not found", relativePath));
}
return resourceStream;
}
}

@ -0,0 +1 @@
This file does not contain Excel format.

@ -24,12 +24,14 @@
<artifactId>nifi-poi-bundle</artifactId>
<packaging>pom</packaging>
<properties>
<poi.version>5.2.3</poi.version>
</properties>
<modules>
<module>nifi-poi-processors</module>
<module>nifi-poi-nar</module>
<module>nifi-poi-services</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
@ -37,6 +39,27 @@
<artifactId>commons-compress</artifactId>
<version>1.23.0</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>${poi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>${poi.version}</version>
</dependency>
<dependency>
<groupId>com.github.pjfanning</groupId>
<artifactId>excel-streaming-reader</artifactId>
<version>4.0.5</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>

@ -0,0 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-record-serialization-services-shared</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-record-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-inference-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

@ -51,11 +51,11 @@ public class CachedSchemaAccessStrategy implements SchemaAccessStrategy {
final Optional<RecordSchema> schemaOption = schemaCacheService.getSchema(cacheIdentifier);
if (schemaOption.isPresent()) {
logger.debug("Found Cached Record Schema with identifier {}", new Object[] {cacheIdentifier});
logger.debug("Found Cached Record Schema with identifier {}", cacheIdentifier);
return schemaOption.get();
}
logger.debug("Encountered Cache Miss with identifier {}. Will delegate to backup Schema Access Strategy", new Object[] {cacheIdentifier});
logger.debug("Encountered Cache Miss with identifier {}. Will delegate to backup Schema Access Strategy", cacheIdentifier);
return backupStrategy.getSchema(variables, contentStream, readSchema);
}

@ -48,7 +48,7 @@ public class InferSchemaAccessStrategy<T> implements SchemaAccessStrategy {
final RecordSource<T> recordSource = recordSourceFactory.create(variables, new NonCloseableInputStream(contentStream));
final RecordSchema schema = schemaInference.inferSchema(recordSource);
logger.debug("Successfully inferred schema {}", new Object[] {schema});
logger.debug("Successfully inferred schema {}", schema);
return schema;
} finally {
contentStream.reset();

@ -41,6 +41,11 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services-shared</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
@ -61,11 +66,6 @@
<artifactId>nifi-json-record-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-inference-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
@ -115,10 +115,6 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>

@ -27,7 +27,6 @@ org.apache.nifi.csv.CSVReader
org.apache.nifi.csv.CSVRecordSetWriter
org.apache.nifi.cef.CEFReader
org.apache.nifi.grok.GrokReader
org.apache.nifi.text.FreeFormTextRecordSetWriter
@ -38,5 +37,4 @@ org.apache.nifi.syslog.Syslog5424Reader
org.apache.nifi.xml.XMLReader
org.apache.nifi.xml.XMLRecordSetWriter
org.apache.nifi.windowsevent.WindowsEventLogReader
org.apache.nifi.schema.inference.VolatileSchemaCache
org.apache.nifi.schema.inference.VolatileSchemaCache

@ -0,0 +1,319 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>ExcelReader</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<p>
The ExcelReader allows for interpreting input data as delimited Records. Each row in an Excel spreadsheet is a record
and each cell is considered a field. The reader allows for choosing which row to start from and which sheets
in a spreadsheet to ingest.
When using the "Infer Schema" strategy, the field names will be assumed to be the
cell numbers of each column prefixed with "column_". Otherwise, the names of fields can be supplied
when specifying the schema by using the Schema Text or looking up the schema in a Schema Registry.
</p>
<h2>Schemas and Type Coercion</h2>
<p>
When a record is parsed from incoming data, it is separated into fields. Each of these fields is then looked up against the
configured schema (by field name) in order to determine what the type of the data should be. If the field is not present in
the schema, that field is omitted from the Record. If the field is found in the schema, the data type of the received data
is compared against the data type specified in the schema. If the types match, the value of that field is used as-is. If the
schema indicates that the field should be of a different type, then the Controller Service will attempt to coerce the data
into the type specified by the schema. If the field cannot be coerced into the specified type, an Exception will be thrown.
</p>
<p>
The following rules apply when attempting to coerce a field value from one data type to another:
</p>
<ul>
<li>Any data type can be coerced into a String type.</li>
<li>Any numeric data type (Byte, Short, Int, Long, Float, Double) can be coerced into any other numeric data type.</li>
<li>Any numeric value can be coerced into a Date, Time, or Timestamp type, by assuming that the Long value is the number of
milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
<li>A String value can be coerced into a Date, Time, or Timestamp type, if its format matches the configured "Date Format," "Time Format,"
or "Timestamp Format."</li>
<li>A String value can be coerced into a numeric value if the value is of the appropriate type. For example, the String value
<code>8</code> can be coerced into any numeric type. However, the String value <code>8.2</code> can be coerced into a Double or Float
type but not an Integer.</li>
<li>A String value of "true" or "false" (regardless of case) can be coerced into a Boolean value.</li>
<li>A String value that is not empty can be coerced into a Char type. If the String contains more than 1 character, the first character is used
and the rest of the characters are ignored.</li>
<li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li>
<li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
<li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding
property (Date Format, Time Format, Timestamp Format property).</li>
</ul>
<p>
If none of the above rules apply when attempting to coerce a value from one data type to another, the coercion will fail and an Exception
will be thrown.
</p>
<h2>Schema Inference</h2>
<p>
While NiFi's Record API does require that each Record have a schema, it is often convenient to infer the schema based on the values in the data,
rather than having to manually create a schema. This is accomplished by selecting a value of "Infer Schema" for the "Schema Access Strategy" property.
When using this strategy, the Reader will determine the schema by first parsing all data in the FlowFile, keeping track of all fields that it has encountered
and the type of each field. Once all data has been parsed, a schema is formed that encompasses all fields that have been encountered.
</p>
<p>
A common concern when inferring schemas is how to handle the condition of two values that have different types. For example, consider a FlowFile with the following two records:
</p>
<code><pre>
name, age
John, 8
Jane, Ten
</pre></code>
<p>
It is clear that the "name" field will be inferred as a STRING type. However, how should we handle the "age" field? Should the field be an CHOICE between INT and STRING? Should we
prefer LONG over INT? Should we just use a STRING? Should the field be considered nullable?
</p>
<p>
To help understand how this Record Reader infers schemas, we have the following list of rules that are followed in the inference logic:
</p>
<ul>
<li>All fields are inferred to be nullable.</li>
<li>
When two values are encountered for the same field in two different records (or two values are encountered for an ARRAY type), the inference engine prefers
to use a "wider" data type over using a CHOICE data type. A data type "A" is said to be wider than data type "B" if and only if data type "A" encompasses all
values of "B" in addition to other values. For example, the LONG type is wider than the INT type but not wider than the BOOLEAN type (and BOOLEAN is also not wider
than LONG). INT is wider than SHORT. The STRING type is considered wider than all other types with the Exception of MAP, RECORD, ARRAY, and CHOICE.
</li>
<li>
Before inferring the type of a value, leading and trailing whitespace are removed. Additionally, if the value is surrounded by double-quotes ("), the double-quotes
are removed. Therefore, the value <code>16</code> is interpreted the same as <code> "16"</code>. Both will be interpreted as an INT. However, the value
<code>" 16"</code> will be inferred as a STRING type because the white space is enclosed within double-quotes, which means that the white space is considered
part of the value.
</li>
<li>
If the "Time Format," "Timestamp Format," or "Date Format" properties are configured, any value that would otherwise be considered a STRING type is first checked against
the configured formats to see if it matches any of them. If the value matches the Timestamp Format, the value is considered a Timestamp field. If it matches the Date Format,
it is considered a Date field. If it matches the Time Format, it is considered a Time field. In the unlikely event that the value matches more than one of the configured
formats, they will be matched in the order: Timestamp, Date, Time. I.e., if a value matched both the Timestamp Format and the Date Format, the type that is inferred will be
Timestamp. Because parsing dates and times can be expensive, it is advisable not to configure these formats if dates, times, and timestamps are not expected, or if processing
the data as a STRING is acceptable. For use cases when this is important, though, the inference engine is intelligent enough to optimize the parsing by first checking several
very cheap conditions. For example, the string's length is examined to see if it is too long or too short to match the pattern. This results in far more efficient processing
than would result if attempting to parse each string value as a timestamp.
</li>
<li>The MAP type is never inferred.</li>
<li>The ARRAY type is never inferred.</li>
<li>The RECORD type is never inferred.</li>
<li>If a field exists but all values are null, then the field is inferred to be of type STRING.</li>
</ul>
<h2>Caching of Inferred Schemas</h2>
<p>
This Record Reader requires that if a schema is to be inferred, that all records be read in order to ensure that the schema that gets inferred is applicable for all
records in the FlowFile. However, this can become expensive, especially if the data undergoes many different transformations. To alleviate the cost of inferring schemas,
the Record Reader can be configured with a "Schema Inference Cache" by populating the property with that name. This is a Controller Service that can be shared by Record
Readers and Record Writers.
</p>
<p>
Whenever a Record Writer is used to write data, if it is configured with a "Schema Cache," it will also add the schema to the Schema Cache. This will result in an
identifier for that schema being added as an attribute to the FlowFile.
</p>
<p>
Whenever a Record Reader is used to read data, if it is configured with a "Schema Inference Cache", it will first look for a "schema.cache.identifier" attribute on the FlowFile.
If the attribute exists, it will use the value of that attribute to lookup the schema in the schema cache. If it is able to find a schema in the cache with that identifier,
then it will use that schema instead of reading, parsing, and analyzing the data to infer the schema. If the attribute is not available on the FlowFile, or if the attribute is
available but the cache does not have a schema with that identifier, then the Record Reader will proceed to infer the schema as described above.
</p>
<p>
The end result is that users are able to chain together many different Processors to operate on Record-oriented data. Typically, only the first such Processor in the chain will
incur the "penalty" of inferring the schema. For all other Processors in the chain, the Record Reader is able to simply lookup the schema in the Schema Cache by identifier.
This allows the Record Reader to infer a schema accurately, since it is inferred based on all data in the FlowFile, and still allows this to happen efficiently since the schema
will typically only be inferred once, regardless of how many Processors handle the data.
</p>
<h2>Examples</h2>
<h3>Example 1</h3>
<p>
As an example, consider a FlowFile whose contents are an Excel spreadsheet whose only sheet consists of the following:
</p>
<code>
id, name, balance, join_date, notes<br />
1, John, 48.23, 04/03/2007 "Our very<br />
first customer!"<br />
2, Jane, 1245.89, 08/22/2009,<br />
3, Frank Franklin, "48481.29", 04/04/2016,<br />
</code>
<p>
Additionally, let's consider that this Controller Service is configured to skip the first line and is configured
with the Schema Registry pointing to an AvroSchemaRegistry which contains the following schema:
</p>
<code>
<pre>
{
"namespace": "nifi",
"name": "balances",
"type": "record",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "name": "type": "string" },
{ "name": "balance": "type": "double" },
{ "name": "join_date", "type": {
"type": "int",
"logicalType": "date"
}},
{ "name": "notes": "type": "string" }
]
}
</pre>
</code>
<p>
In the example above, we see that the 'join_date' column is a Date type. In order for the Excel Reader to be able to properly parse a value as a date,
we need to provide the reader with the date format to use. In this example, we would configure the Date Format property to be <code>MM/dd/yyyy</code>
to indicate that it is a two-digit month, followed by a two-digit day, followed by a four-digit year - each separated by a slash.
In this case, the result will be that this FlowFile consists of 3 different records. The first record will contain the following values:
</p>
<table>
<head>
<th>Field Name</th>
<th>Field Value</th>
</head>
<body>
<tr>
<td>id</td>
<td>1</td>
</tr>
<tr>
<td>name</td>
<td>John</td>
</tr>
<tr>
<td>balance</td>
<td>48.23</td>
</tr>
<tr>
<td>join_date</td>
<td>04/03/2007</td>
</tr>
<tr>
<td>notes</td>
<td>Our very<br />first customer!</td>
</tr>
</body>
</table>
<p>
The second record will contain the following values:
</p>
<table>
<head>
<th>Field Name</th>
<th>Field Value</th>
</head>
<body>
<tr>
<td>id</td>
<td>2</td>
</tr>
<tr>
<td>name</td>
<td>Jane</td>
</tr>
<tr>
<td>balance</td>
<td>1245.89</td>
</tr>
<tr>
<td>join_date</td>
<td>08/22/2009</td>
</tr>
<tr>
<td>notes</td>
<td></td>
</tr>
</body>
</table>
<p>
The third record will contain the following values:
</p>
<table>
<head>
<th>Field Name</th>
<th>Field Value</th>
</head>
<body>
<tr>
<td>id</td>
<td>3</td>
</tr>
<tr>
<td>name</td>
<td>Frank Franklin</td>
</tr>
<tr>
<td>balance</td>
<td>48481.29</td>
</tr>
<tr>
<td>join_date</td>
<td>04/04/2016</td>
</tr>
<tr>
<td>notes</td>
<td></td>
</tr>
</body>
</table>
<h3>Example 2 - Schema with Excel Header Line</h3>
<p>
When data consists of a header line whose columns are indicative of all the datatypes of those columns in the rest of the Excel spreadsheet, the reader provides
a couple of different properties for configuring how to handle these column names. The
"Schema Access Strategy" property as well as the associated properties ("Schema Registry," "Schema Text," and
"Schema Name" properties) can be used to specify how to obtain the schema. If the "Schema Access Strategy" is set
to "Use Fields From Header" then the header line of the first chosen Excel sheet will be used to determine the schema. Otherwise,
a schema will be referenced elsewhere and the column names specified in those schemas will be used instead of the cell numbers.
</p>
</body>
</html>

@ -26,6 +26,7 @@
<modules>
<module>nifi-record-serialization-services</module>
<module>nifi-record-serialization-services-nar</module>
<module>nifi-record-serialization-services-shared</module>
</modules>
<dependencyManagement>