NIFI-13304 Added SplitExcel Processor

This closes #8981

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
dan-s1 2024-06-14 18:55:49 +00:00 committed by exceptionfactory
parent 87107b311a
commit 20c815dc12
No known key found for this signature in database
11 changed files with 464 additions and 35 deletions

View File

@ -21,11 +21,8 @@
<artifactId>nifi-poi-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-poi-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -30,10 +30,12 @@
<excludes combine.children="append">
<exclude>src/test/resources/excel/collegeScorecard.xlsx</exclude>
<exclude>src/test/resources/excel/dataformatting.xlsx</exclude>
<exclude>src/test/resources/excel/dataWithSharedFormula.xlsx</exclude>
<exclude>src/test/resources/excel/dates.xlsx</exclude>
<exclude>src/test/resources/excel/notExcel.txt</exclude>
<exclude>src/test/resources/excel/numbers.xlsx</exclude>
<exclude>src/test/resources/excel/olderFormat.xls</exclude>
<exclude>src/test/resources/excel/sheetsWithEmptySheet.xlsx</exclude>
<exclude>src/test/resources/excel/simpleDataFormatting.xlsx</exclude>
<exclude>src/test/resources/excel/twoSheets.xlsx</exclude>
</excludes>
@ -63,7 +65,6 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
@ -82,8 +83,9 @@
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-poi-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -21,7 +21,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
@ -62,34 +61,6 @@ import java.util.Map;
+ "(XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents.")
public class ExcelReader extends SchemaRegistryService implements RecordReaderFactory {
public enum ProtectionType implements DescribedValue {
UNPROTECTED("Unprotected", "An Excel spreadsheet not protected by a password"),
PASSWORD("Password Protected", "An Excel spreadsheet protected by a password");
ProtectionType(String displayName, String description) {
this.displayName = displayName;
this.description = description;
}
private final String displayName;
private final String description;
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
}
public static final PropertyDescriptor REQUIRED_SHEETS = new PropertyDescriptor
.Builder().name("Required Sheets")
.displayName("Required Sheets")

View File

@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.excel;
import com.github.pjfanning.xlsx.StreamingReader;
import com.github.pjfanning.xlsx.exceptions.ExcelRuntimeException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.excel.ProtectionType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.poi.ss.usermodel.CellCopyPolicy;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.xssf.usermodel.XSSFSheet;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
@SideEffectFree
@SupportsBatching
@Tags({"split", "text"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Splits a multi sheet Microsoft Excel spreadsheet into multiple Microsoft Excel spreadsheets where each sheet from the original" +
" file is converted to an individual spreadsheet in its own flow file. This processor is currently only capable of processing .xlsx "
+ "(XSSF 2007 OOXML file format) Excel documents and not older .xls (HSSF '97(-2007) file format) documents.")
@WritesAttributes({
@WritesAttribute(attribute = "fragment.identifier", description = "All split Excel FlowFiles produced from the same parent Excel FlowFile will have the same randomly generated UUID added" +
" for this attribute"),
@WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the split Excel FlowFiles that were created from a single parent Excel FlowFile"),
@WritesAttribute(attribute = "fragment.count", description = "The number of split Excel FlowFiles generated from the parent Excel FlowFile"),
@WritesAttribute(attribute = "segment.original.filename", description = "The filename of the parent Excel FlowFile"),
@WritesAttribute(attribute = SplitExcel.SHEET_NAME, description = "The name of the Excel sheet from the original spreadsheet."),
@WritesAttribute(attribute = SplitExcel.TOTAL_ROWS, description = "The number of rows in the Excel sheet from the original spreadsheet.")})
public class SplitExcel extends AbstractProcessor {
public static final String SHEET_NAME = "sheetname";
public static final String TOTAL_ROWS = "total.rows";
public static final PropertyDescriptor PROTECTION_TYPE = new PropertyDescriptor.Builder()
.name("Protection Type")
.description("Specifies whether an Excel spreadsheet is protected by a password or not.")
.required(true)
.allowableValues(ProtectionType.class)
.defaultValue(ProtectionType.UNPROTECTED)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("The password for a password protected Excel spreadsheet")
.required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.dependsOn(PROTECTION_TYPE, ProtectionType.PASSWORD)
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship.")
.build();
public static final Relationship REL_SPLIT = new Relationship.Builder()
.name("split")
.description("The individual Excel 'segments' of the original Excel FlowFile will be routed to this relationship.")
.build();
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(PROTECTION_TYPE, PASSWORD);
private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_ORIGINAL, REL_FAILURE, REL_SPLIT);
private static final CellCopyPolicy CELL_COPY_POLICY = new CellCopyPolicy.Builder()
.cellFormula(CellCopyPolicy.DEFAULT_COPY_CELL_FORMULA_POLICY)
.cellStyle(CellCopyPolicy.DEFAULT_COPY_CELL_STYLE_POLICY)
.cellValue(CellCopyPolicy.DEFAULT_COPY_CELL_VALUE_POLICY)
.condenseRows(CellCopyPolicy.DEFAULT_CONDENSE_ROWS_POLICY)
.copyHyperlink(CellCopyPolicy.DEFAULT_COPY_HYPERLINK_POLICY)
.mergeHyperlink(CellCopyPolicy.DEFAULT_MERGE_HYPERLINK_POLICY)
.mergedRegions(CellCopyPolicy.DEFAULT_COPY_MERGED_REGIONS_POLICY)
.rowHeight(CellCopyPolicy.DEFAULT_COPY_ROW_HEIGHT_POLICY)
.build();
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile originalFlowFile = session.get();
if (originalFlowFile == null) {
return;
}
final String password = context.getProperty(PASSWORD).getValue();
final List<WorkbookSplit> workbookSplits = new ArrayList<>();
try {
session.read(originalFlowFile, in -> {
final Workbook originalWorkbook = StreamingReader.builder()
.rowCacheSize(100)
.bufferSize(4096)
.password(password)
.setReadHyperlinks(true) // NOTE: Needed for copying rows.
.setReadSharedFormulas(true) // NOTE: If not set to true, then data with shared formulas fail.
.open(in);
int index = 0;
for (final Sheet originalSheet : originalWorkbook) {
final String originalSheetName = originalSheet.getSheetName();
try (XSSFWorkbook newWorkbook = new XSSFWorkbook()) {
XSSFSheet newSheet = newWorkbook.createSheet(originalSheetName);
List<Row> originalRows = new ArrayList<>();
for (Row originalRow : originalSheet) {
originalRows.add(originalRow);
}
if (!originalRows.isEmpty()) {
newSheet.copyRows(originalRows, originalSheet.getFirstRowNum(), CELL_COPY_POLICY);
}
FlowFile newFlowFile = session.create(originalFlowFile);
try (final OutputStream out = session.write(newFlowFile)) {
newWorkbook.write(out);
workbookSplits.add(new WorkbookSplit(index, newFlowFile, originalSheetName, originalRows.size()));
}
}
index++;
}
});
} catch (ExcelRuntimeException | ProcessException e) {
getLogger().error("Failed to split {}", originalFlowFile, e);
session.remove(workbookSplits.stream()
.map(WorkbookSplit::content)
.toList());
workbookSplits.clear();
session.transfer(originalFlowFile, REL_FAILURE);
return;
}
final String fragmentId = UUID.randomUUID().toString();
final String originalFileName = originalFlowFile.getAttribute(CoreAttributes.FILENAME.key());
final int extensionIndex = originalFileName.lastIndexOf(".");
String originalFileNameWithoutExtension = originalFileName;
String originalFileNameExtension = "";
if (extensionIndex > -1) {
originalFileNameWithoutExtension = originalFileName.substring(0, extensionIndex);
originalFileNameExtension = originalFileName.substring(extensionIndex);
}
final Map<String, String> attributes = new HashMap<>();
attributes.put(FRAGMENT_COUNT.key(), String.valueOf(workbookSplits.size()));
attributes.put(FRAGMENT_ID.key(), fragmentId);
attributes.put(SEGMENT_ORIGINAL_FILENAME.key(), originalFileName);
for (WorkbookSplit split : workbookSplits) {
attributes.put(CoreAttributes.FILENAME.key(), String.format("%s-%s%s", originalFileNameWithoutExtension, split.index(), originalFileNameExtension));
attributes.put(FRAGMENT_INDEX.key(), Integer.toString(split.index()));
attributes.put(SHEET_NAME, split.sheetName());
attributes.put(TOTAL_ROWS, Integer.toString(split.numRows()));
session.putAllAttributes(split.content(), attributes);
}
session.transfer(originalFlowFile, REL_ORIGINAL);
final List<FlowFile> flowFileSplits = workbookSplits.stream()
.map(WorkbookSplit::content)
.toList();
session.transfer(flowFileSplits, REL_SPLIT);
}
private record WorkbookSplit(int index, FlowFile content, String sheetName, int numRows) {
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.excel.SplitExcel

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.excel;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class TestSplitExcel {
private TestRunner runner;
@BeforeAll
static void setUpBeforeAll() throws Exception {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (XSSFWorkbook workbook = new XSSFWorkbook()) {
workbook.createSheet("User Info");
workbook.createSheet("Vehicle Info");
workbook.write(outputStream);
}
}
@BeforeEach
void setUp() {
runner = TestRunners.newTestRunner(SplitExcel.class);
}
@Test
void testSingleSheet() throws IOException {
Path singleSheet = Paths.get("src/test/resources/excel/dates.xlsx");
runner.enqueue(singleSheet);
runner.run();
runner.assertTransferCount(SplitExcel.REL_SPLIT, 1);
runner.assertTransferCount(SplitExcel.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitExcel.REL_FAILURE, 0);
}
@Test
void testMultisheet() throws IOException {
Path multisheet = Paths.get("src/test/resources/excel/twoSheets.xlsx");
String fileName = multisheet.toFile().getName();
runner.enqueue(multisheet);
runner.run();
runner.assertTransferCount(SplitExcel.REL_SPLIT, 2);
runner.assertTransferCount(SplitExcel.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitExcel.REL_FAILURE, 0);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitExcel.REL_SPLIT);
String expectedSheetNamesPrefix = "TestSheet";
List<String> expectedSheetSuffixes = List.of("A", "B");
List<Integer> expectedTotalRows = List.of(4, 3);
for (int index = 0; index < flowFiles.size(); index++) {
MockFlowFile flowFile = flowFiles.get(index);
assertNotNull(flowFile.getAttribute(FRAGMENT_ID.key()));
assertEquals(Integer.toString(index), flowFile.getAttribute(FRAGMENT_INDEX.key()));
assertEquals(Integer.toString(flowFiles.size()), flowFile.getAttribute(FRAGMENT_COUNT.key()));
assertEquals(fileName, flowFile.getAttribute(SEGMENT_ORIGINAL_FILENAME.key()));
assertEquals(expectedSheetNamesPrefix + expectedSheetSuffixes.get(index), flowFile.getAttribute(SplitExcel.SHEET_NAME));
assertEquals(expectedTotalRows.get(index).toString(), flowFile.getAttribute(SplitExcel.TOTAL_ROWS));
}
}
@Test
void testNonExcel() throws IOException {
Path nonExcel = Paths.get("src/test/resources/excel/notExcel.txt");
runner.enqueue(nonExcel);
runner.run();
runner.assertTransferCount(SplitExcel.REL_SPLIT, 0);
runner.assertTransferCount(SplitExcel.REL_ORIGINAL, 0);
runner.assertTransferCount(SplitExcel.REL_FAILURE, 1);
}
@Test
void testWithEmptySheet() throws IOException {
Path sheetsWithEmptySheet = Paths.get("src/test/resources/excel/sheetsWithEmptySheet.xlsx");
String fileName = sheetsWithEmptySheet.toFile().getName();
runner.enqueue(sheetsWithEmptySheet);
runner.run();
runner.assertTransferCount(SplitExcel.REL_SPLIT, 3);
runner.assertTransferCount(SplitExcel.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitExcel.REL_FAILURE, 0);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitExcel.REL_SPLIT);
List<String> expectedSheetSuffixes = List.of("TestSheetA", "TestSheetB", "emptySheet");
List<Integer> expectedTotalRows = List.of(4, 3, 0);
for (int index = 0; index < flowFiles.size(); index++) {
MockFlowFile flowFile = flowFiles.get(index);
assertNotNull(flowFile.getAttribute(FRAGMENT_ID.key()));
assertEquals(Integer.toString(index), flowFile.getAttribute(FRAGMENT_INDEX.key()));
assertEquals(Integer.toString(flowFiles.size()), flowFile.getAttribute(FRAGMENT_COUNT.key()));
assertEquals(fileName, flowFile.getAttribute(SEGMENT_ORIGINAL_FILENAME.key()));
assertEquals(expectedSheetSuffixes.get(index), flowFile.getAttribute(SplitExcel.SHEET_NAME));
assertEquals(expectedTotalRows.get(index).toString(), flowFile.getAttribute(SplitExcel.TOTAL_ROWS));
}
}
@Test
void testDataWithSharedFormula() throws IOException {
Path dataWithSharedFormula = Paths.get("src/test/resources/excel/dataWithSharedFormula.xlsx");
runner.enqueue(dataWithSharedFormula);
runner.run();
runner.assertTransferCount(SplitExcel.REL_SPLIT, 2);
runner.assertTransferCount(SplitExcel.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitExcel.REL_FAILURE, 0);
}
}

View File

@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-poi-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-poi-utils</artifactId>
</project>

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.excel;
import org.apache.nifi.components.DescribedValue;
public enum ProtectionType implements DescribedValue {
UNPROTECTED("Unprotected", "An Excel spreadsheet not protected by a password"),
PASSWORD("Password Protected", "An Excel spreadsheet protected by a password");
ProtectionType(String displayName, String description) {
this.displayName = displayName;
this.description = description;
}
private final String displayName;
private final String description;
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -29,6 +29,7 @@
<modules>
<module>nifi-poi-nar</module>
<module>nifi-poi-services</module>
<module>nifi-poi-utils</module>
</modules>
<dependencyManagement>
<dependencies>