NIFI-12491 Added Starting Row Schema Strategy to ExcelReader

This closes #9064

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
dan-s1 2024-06-24 19:41:53 +00:00 committed by exceptionfactory
parent 380ce0eb20
commit a424c0eac3
No known key found for this signature in database
6 changed files with 362 additions and 9 deletions

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.excel;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.FieldTypeInference;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.SchemaInferenceUtil;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.DataFormatter;
import org.apache.poi.ss.usermodel.Row;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ExcelHeaderSchemaStrategy implements SchemaAccessStrategy {
private static final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
static final int NUM_ROWS_TO_DETERMINE_TYPES = 10; // NOTE: This number is arbitrary.
static final AllowableValue USE_STARTING_ROW = new AllowableValue("Use Starting Row", "Use Starting Row",
"The configured first row of the Excel file is a header line that contains the names of the columns. The schema will be derived by using the "
+ "column names in the header and the following " + NUM_ROWS_TO_DETERMINE_TYPES + " rows to determine the type(s) of each column");
private final PropertyContext context;
private final ComponentLog logger;
private final TimeValueInference timeValueInference;
private final DataFormatter dataFormatter;
public ExcelHeaderSchemaStrategy(PropertyContext context, ComponentLog logger, TimeValueInference timeValueInference, Locale locale) {
this.context = context;
this.logger = logger;
this.timeValueInference = timeValueInference;
this.dataFormatter = locale == null ? new DataFormatter() : new DataFormatter(locale);
}
@Override
public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
if (this.context == null) {
throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema");
}
final String requiredSheetsDelimited = context.getProperty(ExcelReader.REQUIRED_SHEETS).evaluateAttributeExpressions(variables).getValue();
final List<String> requiredSheets = ExcelReader.getRequiredSheets(requiredSheetsDelimited);
final Integer rawFirstRow = context.getProperty(ExcelReader.STARTING_ROW).evaluateAttributeExpressions(variables).asInteger();
final int firstRow = rawFirstRow == null ? NumberUtils.toInt(ExcelReader.STARTING_ROW.getDefaultValue()) : rawFirstRow;
final int zeroBasedFirstRow = ExcelReader.getZeroBasedIndex(firstRow);
final String password = context.getProperty(ExcelReader.PASSWORD).getValue();
final ExcelRecordReaderConfiguration configuration = new ExcelRecordReaderConfiguration.Builder()
.withRequiredSheets(requiredSheets)
.withFirstRow(zeroBasedFirstRow)
.withPassword(password)
.build();
final RowIterator rowIterator = new RowIterator(contentStream, configuration, logger);
final Map<String, FieldTypeInference> typeMap = new LinkedHashMap<>();
List<String> fieldNames = null;
int index = 0;
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
if (index == 0) {
fieldNames = getFieldNames(firstRow, row);
} else if (index <= NUM_ROWS_TO_DETERMINE_TYPES) {
inferSchema(row, fieldNames, typeMap);
} else {
break;
}
index++;
}
if (typeMap.isEmpty()) {
final String message = String.format("Failed to infer schema from empty first %d rows", NUM_ROWS_TO_DETERMINE_TYPES);
throw new SchemaNotFoundException(message);
}
return createSchema(typeMap);
}
private List<String> getFieldNames(int firstRowIndex, Row row) throws SchemaNotFoundException {
if (!ExcelUtils.hasCells(row)) {
throw new SchemaNotFoundException(String.format("Field names could not be determined from configured header row %s, as this row has no cells with data", firstRowIndex));
}
final List<String> fieldNames = new ArrayList<>();
for (int index = 0; index < row.getLastCellNum(); index++) {
final Cell cell = row.getCell(index);
final String fieldName = dataFormatter.formatCellValue(cell);
// NOTE: This accounts for column(s) which may be empty in the configured starting row.
if (fieldName == null || fieldName.isEmpty()) {
fieldNames.add(ExcelUtils.FIELD_NAME_PREFIX + index);
} else {
fieldNames.add(fieldName);
}
}
return fieldNames;
}
private void inferSchema(final Row row, final List<String> fieldNames, final Map<String, FieldTypeInference> typeMap) throws SchemaNotFoundException {
// NOTE: This allows rows to be blank when inferring the schema
if (ExcelUtils.hasCells(row)) {
if (row.getLastCellNum() > fieldNames.size()) {
throw new SchemaNotFoundException(String.format("Row %s has %s cells, more than the expected %s number of field names", row.getRowNum(), row.getLastCellNum(), fieldNames.size()));
}
IntStream.range(0, row.getLastCellNum())
.forEach(index -> {
final Cell cell = row.getCell(index);
final String fieldName = fieldNames.get(index);
final FieldTypeInference typeInference = typeMap.computeIfAbsent(fieldName, key -> new FieldTypeInference());
final String formattedCellValue = dataFormatter.formatCellValue(cell);
final DataType dataType = SchemaInferenceUtil.getDataType(formattedCellValue, timeValueInference);
typeInference.addPossibleDataType(dataType);
});
}
}
private RecordSchema createSchema(final Map<String, FieldTypeInference> inferences) {
final List<RecordField> recordFields = inferences.entrySet().stream()
.map(entry -> new RecordField(entry.getKey(), entry.getValue().toDataType(), true))
.collect(Collectors.toList());
return new SimpleRecordSchema(recordFields);
}
@Override
public Set<SchemaField> getSuppliedSchemaFields() {
return schemaFields;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
import org.apache.nifi.schema.inference.RecordSourceFactory;
@ -76,7 +77,8 @@ public class ExcelReader extends SchemaRegistryService implements RecordReaderFa
.Builder().name("Starting Row")
.displayName("Starting Row")
.description("The row number of the first row to start processing (One based)."
+ " Use this to skip over rows of data at the top of a worksheet that are not part of the dataset.")
+ " Use this to skip over rows of data at the top of a worksheet that are not part of the dataset."
+ " When using the '" + ExcelHeaderSchemaStrategy.USE_STARTING_ROW.getValue() + "' strategy this should be the column header row.")
.required(true)
.defaultValue("1")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@ -155,7 +157,9 @@ public class ExcelReader extends SchemaRegistryService implements RecordReaderFa
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
if (SchemaInferenceUtil.INFER_SCHEMA.getValue().equals(allowableValue)) {
if (allowableValue.equalsIgnoreCase(ExcelHeaderSchemaStrategy.USE_STARTING_ROW.getValue())) {
return new ExcelHeaderSchemaStrategy(context, getLogger(), new TimeValueInference(dateFormat, timeFormat, timestampFormat), null);
} else if (SchemaInferenceUtil.INFER_SCHEMA.getValue().equals(allowableValue)) {
final RecordSourceFactory<Row> sourceFactory = (variables, in) -> new ExcelRecordSource(in, context, variables, getLogger());
final SchemaInferenceEngine<Row> inference = new ExcelSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
return new InferSchemaAccessStrategy<>(sourceFactory, inference, getLogger());
@ -167,17 +171,23 @@ public class ExcelReader extends SchemaRegistryService implements RecordReaderFa
@Override
protected List<AllowableValue> getSchemaAccessStrategyValues() {
final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());
allowableValues.add(ExcelHeaderSchemaStrategy.USE_STARTING_ROW);
allowableValues.add(SchemaInferenceUtil.INFER_SCHEMA);
return allowableValues;
}
@Override
protected AllowableValue getDefaultSchemaAccessStrategy() {
return SchemaInferenceUtil.INFER_SCHEMA;
return ExcelHeaderSchemaStrategy.USE_STARTING_ROW;
}
private int getStartingRow(final Map<String, String> variables) {
int rawStartingRow = configurationContext.getProperty(STARTING_ROW).evaluateAttributeExpressions(variables).asInteger();
String schemaAccessStrategy = configurationContext.getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue();
if (ExcelHeaderSchemaStrategy.USE_STARTING_ROW.getValue().equals(schemaAccessStrategy)) {
rawStartingRow++;
}
return getZeroBasedIndex(rawStartingRow);
}

View File

@ -38,7 +38,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ExcelSchemaInference implements SchemaInferenceEngine<Row> {
static final String FIELD_NAME_PREFIX = "column_";
private final TimeValueInference timeValueInference;
private final DataFormatter dataFormatter;
@ -66,7 +65,7 @@ public class ExcelSchemaInference implements SchemaInferenceEngine<Row> {
IntStream.range(0, row.getLastCellNum())
.forEach(index -> {
final Cell cell = row.getCell(index);
final String fieldName = FIELD_NAME_PREFIX + index;
final String fieldName = ExcelUtils.FIELD_NAME_PREFIX + index;
final FieldTypeInference typeInference = typeMap.computeIfAbsent(fieldName, key -> new FieldTypeInference());
final String formattedCellValue = dataFormatter.formatCellValue(cell);
final DataType dataType = SchemaInferenceUtil.getDataType(formattedCellValue, timeValueInference);

View File

@ -19,6 +19,8 @@ package org.apache.nifi.excel;
import org.apache.poi.ss.usermodel.Row;
public class ExcelUtils {
static final String FIELD_NAME_PREFIX = "column_";
private ExcelUtils() {
}

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.excel;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.xssf.usermodel.XSSFSheet;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(MockitoExtension.class)
public class TestExcelHeaderSchemaStrategy {
private static final TimeValueInference TIME_VALUE_INFERENCE = new TimeValueInference("MM/dd/yyyy", "HH:mm:ss.SSS", "yyyy/MM/dd/ HH:mm");
@Mock
ComponentLog logger;
@Test
void testWhereConfiguredStartRowIsEmpty() throws IOException {
Object[][] data = {{}, {1, "Manny"}, {2, "Moe"}, {3, "Jack"}};
final ByteArrayOutputStream outputStream = getSingleSheetWorkbook(data);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, null, null);
try (final InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray())) {
SchemaNotFoundException schemaNotFoundException = assertThrows(SchemaNotFoundException.class, () -> schemaStrategy.getSchema(null, inputStream, null));
assertTrue(schemaNotFoundException.getMessage().contains("no cells with data"));
}
}
@Test
void testWhereConfiguredStartRowHasEmptyCell() throws Exception {
Object[][] data = {{"ID", "", "Middle"}, {1, "Manny", "M"}, {2, "Moe", "M"}, {3, "Jack", "J"}};
final ByteArrayOutputStream outputStream = getSingleSheetWorkbook(data);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, TIME_VALUE_INFERENCE, null);
try (final InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray())) {
RecordSchema schema = schemaStrategy.getSchema(null, inputStream, null);
RecordField recordField = schema.getField(1);
assertEquals("column_1", recordField.getFieldName());
}
}
@Test
void testWhereInferenceRowHasMoreCellsThanFieldNames() throws Exception {
Object[][] data = {{"ID", "First", "Middle"}, {1, "Manny", "M"}, {2, "Moe", "M", "Extra"}, {3, "Jack", "J"}};
final ByteArrayOutputStream outputStream = getSingleSheetWorkbook(data);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, TIME_VALUE_INFERENCE, null);
try (final InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray())) {
SchemaNotFoundException schemaNotFoundException = assertThrows(SchemaNotFoundException.class, () -> schemaStrategy.getSchema(null, inputStream, null));
assertTrue(schemaNotFoundException.getMessage().contains("more than"));
}
}
@Test
void testWhereTotalRowsLessThanConfiguredInferenceRows() throws Exception {
Object[][] data = {{"ID", "First", "Middle"}, {1, "Manny", "M"}, {2, "Moe", "M"}, {3, "Jack", "J"}};
final ByteArrayOutputStream outputStream = getSingleSheetWorkbook(data);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, TIME_VALUE_INFERENCE, null);
assertTrue(data.length - 1 < ExcelHeaderSchemaStrategy.NUM_ROWS_TO_DETERMINE_TYPES);
try (final InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray())) {
assertDoesNotThrow(() -> schemaStrategy.getSchema(null, inputStream, null));
}
}
@Test
void testWhereConfiguredInferenceRowsHasAnEmptyRow() throws IOException {
Object[][] data = {{"ID", "First", "Middle"}, {1, "One", "O"}, {2, "Two", "T"}, {3, "Three", "T"},
{4, "Four", "F"}, {5, "Five", "F"}, {}, {7, "Seven", "S"}, {8, "Eight", "E"},
{9, "Nine", "N"}, {10, "Ten", "T"}};
final ByteArrayOutputStream outputStream = getSingleSheetWorkbook(data);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, TIME_VALUE_INFERENCE, null);
try (final InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray())) {
assertDoesNotThrow(() -> schemaStrategy.getSchema(null, inputStream, null));
}
}
@Test
void testWhereTotalRowsGreaterThanConfiguredInferenceRows() throws Exception {
Object[][] data = {{"ID", "First", "Middle"}, {1, "One", "O"}, {2, "Two", "T"}, {3, "Three", "T"},
{4, "Four", "F"}, {5, "Five", "F"}, {6, "Six", "S"}, {7, "Seven", "S"}, {8, "Eight", "E"},
{9, "Nine", "N"}, {10, "Ten", "T"}, {11, "Eleven", "E"}};
assertTrue(data.length - 1 > ExcelHeaderSchemaStrategy.NUM_ROWS_TO_DETERMINE_TYPES);
final ByteArrayOutputStream outputStream = getSingleSheetWorkbook(data);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, TIME_VALUE_INFERENCE, null);
try (final InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray())) {
assertDoesNotThrow(() -> schemaStrategy.getSchema(null, inputStream, null));
}
}
@Test
void testWhereConfiguredInferenceRowsAreAllBlank() throws IOException {
Object[][] data = {{"ID", "First", "Middle"}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {11, "Eleven", "E"}};
final ByteArrayOutputStream outputStream = getSingleSheetWorkbook(data);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, TIME_VALUE_INFERENCE, null);
try (final InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray())) {
SchemaNotFoundException schemaNotFoundException = assertThrows(SchemaNotFoundException.class, () -> schemaStrategy.getSchema(null, inputStream, null));
assertTrue(schemaNotFoundException.getMessage().contains("empty"));
}
}
private static ByteArrayOutputStream getSingleSheetWorkbook(Object[][] data) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (XSSFWorkbook workbook = new XSSFWorkbook()) {
final XSSFSheet sheet = workbook.createSheet("Sheet 1");
int rowCount = 0;
for (Object[] dataRow : data) {
Row row = sheet.createRow(rowCount++);
int columnCount = 0;
for (Object field : dataRow) {
Cell cell = row.createCell(columnCount++);
if (field instanceof String) {
cell.setCellValue((String) field);
} else if (field instanceof Number) {
cell.setCellValue(((Number) field).doubleValue());
}
}
}
workbook.write(outputStream);
}
return outputStream;
}
}

View File

@ -46,10 +46,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(MockitoExtension.class)
public class TestExcelSchemaInference {
private static final String EXPECTED_FIRST_FIELD_NAME = ExcelSchemaInference.FIELD_NAME_PREFIX + "0";
private static final String EXPECTED_SECOND_FIELD_NAME = ExcelSchemaInference.FIELD_NAME_PREFIX + "1";
private static final String EXPECTED_THIRD_FIELD_NAME = ExcelSchemaInference.FIELD_NAME_PREFIX + "2";
private static final String EXPECTED_FOURTH_FIELD_NAME = ExcelSchemaInference.FIELD_NAME_PREFIX + "3";
private static final String EXPECTED_FIRST_FIELD_NAME = ExcelUtils.FIELD_NAME_PREFIX + "0";
private static final String EXPECTED_SECOND_FIELD_NAME = ExcelUtils.FIELD_NAME_PREFIX + "1";
private static final String EXPECTED_THIRD_FIELD_NAME = ExcelUtils.FIELD_NAME_PREFIX + "2";
private static final String EXPECTED_FOURTH_FIELD_NAME = ExcelUtils.FIELD_NAME_PREFIX + "3";
private final TimeValueInference timestampInference = new TimeValueInference("MM/dd/yyyy", "HH:mm:ss.SSS", "yyyy/MM/dd/ HH:mm");
@Mock