mirror of https://github.com/apache/nifi.git
NIFI-12708: add option in UnpackContent to specify encoding charset for filenames in zip unpacking
This closes #8350 The processor can now take a filename encoding parameter and pass it to zip unpacking. This will allow user to unzip files with specific encoding to get correct filenames in output. This for example help with zip files created on Windows which by default uses Cp437 for filename encoding. If the filename contains special character like German alphabet ä, ü etc., decoding this with Linux's default encoding usually UTF8 output will contain `?` in it. When the same file is processed with property set with `Cp437`, the processor outputs correct filenames with special characters preserved. Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
parent
f39f3ea252
commit
e00d2b6d5e
|
@ -697,6 +697,7 @@
|
|||
<exclude>src/test/resources/TestUnpackContent/folder/cal.txt</exclude>
|
||||
<exclude>src/test/resources/TestUnpackContent/folder/date.txt</exclude>
|
||||
<exclude>src/test/resources/TestUnpackContent/invalid_data.zip</exclude>
|
||||
<exclude>src/test/resources/TestUnpackContent/windows-with-cp437.zip</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/input/addresses.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/input/embedded-string.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/input/multi-arrays.json</exclude>
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import net.lingala.zip4j.io.inputstream.ZipInputStream;
|
||||
import net.lingala.zip4j.model.LocalFileHeader;
|
||||
import net.lingala.zip4j.model.enums.EncryptionMethod;
|
||||
|
@ -24,6 +25,7 @@ import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
|||
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
|
||||
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
|
||||
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
|
||||
import org.apache.commons.io.Charsets;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.ReadsAttribute;
|
||||
|
@ -34,6 +36,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
|
|||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.documentation.UseCase;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
|
@ -104,6 +107,13 @@ import java.util.regex.Pattern;
|
|||
@WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the unpacked file (tar only)"),
|
||||
@WritesAttribute(attribute = "file.encryptionMethod", description = "The encryption method for entries in Zip archives")})
|
||||
@SeeAlso(MergeContent.class)
|
||||
@UseCase(
|
||||
description = "Unpack Zip containing filenames with special characters, created on Windows with filename charset 'Cp437' or 'IBM437'.",
|
||||
configuration = """
|
||||
Set "Packaging Format" value to "zip" or "use mime.type attribute".
|
||||
Set "Filename Character Set" value to "Cp437" or "IBM437".
|
||||
"""
|
||||
)
|
||||
public class UnpackContent extends AbstractProcessor {
|
||||
// attribute keys
|
||||
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
|
||||
|
@ -139,6 +149,21 @@ public class UnpackContent extends AbstractProcessor {
|
|||
PackageFormat.FLOWFILE_STREAM_FORMAT_V2.toString(), PackageFormat.FLOWFILE_TAR_FORMAT.toString())
|
||||
.defaultValue(PackageFormat.AUTO_DETECT_FORMAT.toString())
|
||||
.build();
|
||||
public static final PropertyDescriptor ZIP_FILENAME_CHARSET = new PropertyDescriptor.Builder()
|
||||
.name("Filename Character Set")
|
||||
.displayName("Filename Character Set")
|
||||
.description(
|
||||
"If supplied this character set will be supplied to the Zip utility to attempt to decode filenames using the specific character set. "
|
||||
+ "If not specified the default platform character set will be used. This is useful if a Zip was created with a different character "
|
||||
+ "set than the platform default and the zip uses non standard values to specify.")
|
||||
.required(false)
|
||||
.dependsOn(
|
||||
PACKAGING_FORMAT,
|
||||
PackageFormat.ZIP_FORMAT.toString(),
|
||||
PackageFormat.AUTO_DETECT_FORMAT.toString())
|
||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||
.defaultValue(Charset.defaultCharset().toString())
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
|
||||
.name("File Filter")
|
||||
|
@ -192,6 +217,7 @@ public class UnpackContent extends AbstractProcessor {
|
|||
|
||||
private static final List<PropertyDescriptor> properties = List.of(
|
||||
PACKAGING_FORMAT,
|
||||
ZIP_FILENAME_CHARSET,
|
||||
FILE_FILTER,
|
||||
PASSWORD,
|
||||
ALLOW_STORED_ENTRIES_WITH_DATA_DESCRIPTOR
|
||||
|
@ -231,7 +257,10 @@ public class UnpackContent extends AbstractProcessor {
|
|||
}
|
||||
final PropertyValue allowStoredEntriesWithDataDescriptorVal = context.getProperty(ALLOW_STORED_ENTRIES_WITH_DATA_DESCRIPTOR);
|
||||
final boolean allowStoredEntriesWithDataDescriptor = allowStoredEntriesWithDataDescriptorVal.isSet() ? allowStoredEntriesWithDataDescriptorVal.asBoolean() : false;
|
||||
zipUnpacker = new ZipUnpacker(fileFilter, password, allowStoredEntriesWithDataDescriptor);
|
||||
|
||||
final String filenamesEncodingVal = context.getProperty(ZIP_FILENAME_CHARSET).getValue();
|
||||
Charset filenamesEncoding =Charsets.toCharset(filenamesEncodingVal);
|
||||
zipUnpacker = new ZipUnpacker(fileFilter, password, allowStoredEntriesWithDataDescriptor, filenamesEncoding);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -267,34 +296,31 @@ public class UnpackContent extends AbstractProcessor {
|
|||
|
||||
// set the Unpacker to use for this FlowFile. FlowFileUnpackager objects maintain state and are not reusable.
|
||||
final Unpacker unpacker;
|
||||
final boolean addFragmentAttrs;
|
||||
switch (packagingFormat) {
|
||||
case TAR_FORMAT:
|
||||
case X_TAR_FORMAT:
|
||||
final boolean addFragmentAttrs = switch (packagingFormat) {
|
||||
case TAR_FORMAT, X_TAR_FORMAT -> {
|
||||
unpacker = tarUnpacker;
|
||||
addFragmentAttrs = true;
|
||||
break;
|
||||
case ZIP_FORMAT:
|
||||
yield true;
|
||||
}
|
||||
case ZIP_FORMAT -> {
|
||||
unpacker = zipUnpacker;
|
||||
addFragmentAttrs = true;
|
||||
break;
|
||||
case FLOWFILE_STREAM_FORMAT_V2:
|
||||
yield true;
|
||||
}
|
||||
case FLOWFILE_STREAM_FORMAT_V2 -> {
|
||||
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
|
||||
addFragmentAttrs = false;
|
||||
break;
|
||||
case FLOWFILE_STREAM_FORMAT_V3:
|
||||
yield false;
|
||||
}
|
||||
case FLOWFILE_STREAM_FORMAT_V3 -> {
|
||||
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
|
||||
addFragmentAttrs = false;
|
||||
break;
|
||||
case FLOWFILE_TAR_FORMAT:
|
||||
yield false;
|
||||
}
|
||||
case FLOWFILE_TAR_FORMAT -> {
|
||||
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
|
||||
addFragmentAttrs = false;
|
||||
break;
|
||||
case AUTO_DETECT_FORMAT:
|
||||
default:
|
||||
yield false;
|
||||
}
|
||||
default ->
|
||||
// The format of the unpacker should be known before initialization
|
||||
throw new ProcessException(packagingFormat + " is not a valid packaging format");
|
||||
}
|
||||
};
|
||||
|
||||
final List<FlowFile> unpacked = new ArrayList<>();
|
||||
try {
|
||||
|
@ -309,7 +335,7 @@ public class UnpackContent extends AbstractProcessor {
|
|||
finishFragmentAttributes(session, flowFile, unpacked);
|
||||
}
|
||||
session.transfer(unpacked, REL_SUCCESS);
|
||||
final String fragmentId = unpacked.size() > 0 ? unpacked.get(0).getAttribute(FRAGMENT_ID) : null;
|
||||
final String fragmentId = !unpacked.isEmpty() ? unpacked.getFirst().getAttribute(FRAGMENT_ID) : null;
|
||||
flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, unpacked.size());
|
||||
session.transfer(flowFile, REL_ORIGINAL);
|
||||
session.getProvenanceReporter().fork(flowFile, unpacked);
|
||||
|
@ -395,20 +421,21 @@ public class UnpackContent extends AbstractProcessor {
|
|||
private static class ZipUnpacker extends Unpacker {
|
||||
private final char[] password;
|
||||
private final boolean allowStoredEntriesWithDataDescriptor;
|
||||
|
||||
public ZipUnpacker(final Pattern fileFilter, final char[] password, final boolean allowStoredEntriesWithDataDescriptor) {
|
||||
private final Charset filenameEncoding;
|
||||
public ZipUnpacker(final Pattern fileFilter, final char[] password, final boolean allowStoredEntriesWithDataDescriptor,final Charset filenameEncoding) {
|
||||
super(fileFilter);
|
||||
this.password = password;
|
||||
this.allowStoredEntriesWithDataDescriptor = allowStoredEntriesWithDataDescriptor;
|
||||
this.filenameEncoding = filenameEncoding;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
|
||||
final String fragmentId = UUID.randomUUID().toString();
|
||||
if (password == null) {
|
||||
session.read(source, new CompressedZipInputStreamCallback(fileFilter, session, source, unpacked, fragmentId, allowStoredEntriesWithDataDescriptor));
|
||||
session.read(source, new CompressedZipInputStreamCallback(fileFilter, session, source, unpacked, fragmentId, allowStoredEntriesWithDataDescriptor,filenameEncoding));
|
||||
} else {
|
||||
session.read(source, new EncryptedZipInputStreamCallback(fileFilter, session, source, unpacked, fragmentId, password));
|
||||
session.read(source, new EncryptedZipInputStreamCallback(fileFilter, session, source, unpacked, fragmentId, password,filenameEncoding));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -473,6 +500,7 @@ public class UnpackContent extends AbstractProcessor {
|
|||
private static class CompressedZipInputStreamCallback extends ZipInputStreamCallback {
|
||||
|
||||
private final boolean allowStoredEntriesWithDataDescriptor;
|
||||
private final Charset filenameEncoding;
|
||||
|
||||
private CompressedZipInputStreamCallback(
|
||||
final Pattern fileFilter,
|
||||
|
@ -480,15 +508,18 @@ public class UnpackContent extends AbstractProcessor {
|
|||
final FlowFile sourceFlowFile,
|
||||
final List<FlowFile> unpacked,
|
||||
final String fragmentId,
|
||||
final boolean allowStoredEntriesWithDataDescriptor
|
||||
final boolean allowStoredEntriesWithDataDescriptor,
|
||||
final Charset filenameEncoding
|
||||
) {
|
||||
super(fileFilter, session, sourceFlowFile, unpacked, fragmentId);
|
||||
this.allowStoredEntriesWithDataDescriptor = allowStoredEntriesWithDataDescriptor;
|
||||
this.filenameEncoding = filenameEncoding;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final InputStream inputStream) throws IOException {
|
||||
try (final ZipArchiveInputStream zipInputStream = new ZipArchiveInputStream(new BufferedInputStream(inputStream), null, true, allowStoredEntriesWithDataDescriptor)) {
|
||||
try (final ZipArchiveInputStream zipInputStream = new ZipArchiveInputStream(new BufferedInputStream(inputStream),
|
||||
filenameEncoding.toString(), true, allowStoredEntriesWithDataDescriptor)) {
|
||||
ZipArchiveEntry zipEntry;
|
||||
while ((zipEntry = zipInputStream.getNextZipEntry()) != null) {
|
||||
processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getName(), EncryptionMethod.NONE);
|
||||
|
@ -499,6 +530,7 @@ public class UnpackContent extends AbstractProcessor {
|
|||
|
||||
private static class EncryptedZipInputStreamCallback extends ZipInputStreamCallback {
|
||||
private final char[] password;
|
||||
private final Charset filenameEncoding;
|
||||
|
||||
private EncryptedZipInputStreamCallback(
|
||||
final Pattern fileFilter,
|
||||
|
@ -506,15 +538,17 @@ public class UnpackContent extends AbstractProcessor {
|
|||
final FlowFile sourceFlowFile,
|
||||
final List<FlowFile> unpacked,
|
||||
final String fragmentId,
|
||||
final char[] password
|
||||
final char[] password,
|
||||
final Charset filenameEncoding
|
||||
) {
|
||||
super(fileFilter, session, sourceFlowFile, unpacked, fragmentId);
|
||||
this.password = password;
|
||||
this.filenameEncoding = filenameEncoding;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final InputStream inputStream) throws IOException {
|
||||
try (final ZipInputStream zipInputStream = new ZipInputStream(new BufferedInputStream(inputStream), password)) {
|
||||
try (final ZipInputStream zipInputStream = new ZipInputStream(new BufferedInputStream(inputStream), password,filenameEncoding)) {
|
||||
LocalFileHeader zipEntry;
|
||||
while ((zipEntry = zipInputStream.getNextEntry()) != null) {
|
||||
processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getFileName(), zipEntry.getEncryptionMethod());
|
||||
|
|
|
@ -16,9 +16,12 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import net.lingala.zip4j.io.outputstream.ZipOutputStream;
|
||||
import net.lingala.zip4j.model.ZipParameters;
|
||||
import net.lingala.zip4j.model.enums.EncryptionMethod;
|
||||
import org.apache.commons.io.Charsets;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
|
@ -222,6 +225,92 @@ public class TestUnpackContent {
|
|||
flowFile.assertContentEquals(path.toFile());
|
||||
}
|
||||
}
|
||||
@Test
|
||||
public void testZipEncodingField() {
|
||||
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
|
||||
unpackRunner.setProperty(UnpackContent.ZIP_FILENAME_CHARSET, "invalid-encoding");
|
||||
unpackRunner.assertNotValid();
|
||||
unpackRunner.setProperty(UnpackContent.ZIP_FILENAME_CHARSET, "IBM437");
|
||||
unpackRunner.assertValid();
|
||||
unpackRunner.setProperty(UnpackContent.ZIP_FILENAME_CHARSET, "Cp437");
|
||||
unpackRunner.assertValid();
|
||||
unpackRunner.setProperty(UnpackContent.ZIP_FILENAME_CHARSET, StandardCharsets.ISO_8859_1.name());
|
||||
unpackRunner.assertValid();
|
||||
unpackRunner.setProperty(UnpackContent.ZIP_FILENAME_CHARSET, StandardCharsets.UTF_8.name());
|
||||
unpackRunner.assertValid();
|
||||
|
||||
}
|
||||
@Test
|
||||
public void testZipWithCp437Encoding() throws IOException {
|
||||
String zipFilename = "windows-with-cp437.zip";
|
||||
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
final TestRunner autoUnpackRunner = TestRunners.newTestRunner(new UnpackContent());
|
||||
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
|
||||
unpackRunner.setProperty(UnpackContent.ZIP_FILENAME_CHARSET, "Cp437");
|
||||
unpackRunner.setProperty(UnpackContent.ALLOW_STORED_ENTRIES_WITH_DATA_DESCRIPTOR, "true"); // just forces this to be exercised
|
||||
|
||||
autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString());
|
||||
autoUnpackRunner.setProperty(UnpackContent.ZIP_FILENAME_CHARSET, "Cp437");
|
||||
|
||||
unpackRunner.enqueue(dataPath.resolve(zipFilename));
|
||||
unpackRunner.enqueue(dataPath.resolve(zipFilename));
|
||||
|
||||
Map<String, String> attributes = new HashMap<>(1);
|
||||
attributes.put("mime.type", "application/zip");
|
||||
autoUnpackRunner.enqueue(dataPath.resolve(zipFilename), attributes);
|
||||
autoUnpackRunner.enqueue(dataPath.resolve(zipFilename), attributes);
|
||||
unpackRunner.run(2);
|
||||
autoUnpackRunner.run(2);
|
||||
|
||||
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
|
||||
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
|
||||
|
||||
final List<MockFlowFile> unpacked =
|
||||
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
|
||||
for (final MockFlowFile flowFile : unpacked) {
|
||||
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
|
||||
// In this test case only check for presence of `?` in filename and path for failure, since the zip was created on Windows,
|
||||
// it will always output `?` if Cp437 encoding is not used during unpacking. The zip file also contains file and folder
|
||||
// without special characters.
|
||||
// As a result of these conditions, this test does not check for valid special character presence.
|
||||
assertTrue(StringUtils.containsNone(filename, "?"), "filename contains '?': " + filename);
|
||||
final String path = flowFile.getAttribute(CoreAttributes.PATH.key());
|
||||
assertTrue(StringUtils.containsNone(path, "?"), "path contains '?': " + path);
|
||||
}
|
||||
}
|
||||
@Test
|
||||
public void testEncryptedZipWithCp437Encoding() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new UnpackContent());
|
||||
runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString());
|
||||
runner.setProperty(UnpackContent.ALLOW_STORED_ENTRIES_WITH_DATA_DESCRIPTOR, "false");
|
||||
runner.setProperty(UnpackContent.ZIP_FILENAME_CHARSET, "Cp437");
|
||||
final String password = String.class.getSimpleName();
|
||||
runner.setProperty(UnpackContent.PASSWORD, password);
|
||||
|
||||
final char[] streamPassword = password.toCharArray();
|
||||
final String contents = TestRunner.class.getCanonicalName();
|
||||
String specialChar = "\u00E4";
|
||||
String pathInZip = "path_with_special_%s_char/".formatted(specialChar);
|
||||
String filename = "filename_with_special_char%s.txt".formatted(specialChar);
|
||||
final byte[] zipEncrypted = createZipEncryptedCp437(EncryptionMethod.AES, streamPassword, contents,pathInZip.concat(filename));
|
||||
runner.enqueue(zipEncrypted);
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(UnpackContent.REL_SUCCESS, 1);
|
||||
runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
|
||||
|
||||
final List<MockFlowFile> unpacked =
|
||||
runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
|
||||
for (final MockFlowFile flowFile : unpacked) {
|
||||
final String outputFilename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
|
||||
assertTrue(StringUtils.containsNone(outputFilename, "?"), "filename contains '?': " + outputFilename);
|
||||
assertTrue(StringUtils.contains(outputFilename, specialChar), "filename missing '%s': %s".formatted(specialChar,outputFilename));
|
||||
final String path = flowFile.getAttribute(CoreAttributes.PATH.key());
|
||||
assertTrue(StringUtils.containsNone(path, "?"), "path contains '?': " + path);
|
||||
assertTrue(StringUtils.contains(path, specialChar), "path missing '%s': %s".formatted(specialChar,path));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testZipEncryptionZipStandard() throws IOException {
|
||||
|
@ -526,4 +615,20 @@ public class TestUnpackContent {
|
|||
|
||||
return outputStream.toByteArray();
|
||||
}
|
||||
|
||||
private byte[] createZipEncryptedCp437(final EncryptionMethod encryptionMethod, final char[] password, final String contents, String filename) throws IOException {
|
||||
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
final ZipOutputStream zipOutputStream = new ZipOutputStream(outputStream, password, Charsets.toCharset("Cp437"));
|
||||
|
||||
final ZipParameters zipParameters = new ZipParameters();
|
||||
zipParameters.setEncryptionMethod(encryptionMethod);
|
||||
zipParameters.setEncryptFiles(true);
|
||||
zipParameters.setFileNameInZip(filename);
|
||||
zipOutputStream.putNextEntry(zipParameters);
|
||||
zipOutputStream.write(contents.getBytes());
|
||||
zipOutputStream.closeEntry();
|
||||
zipOutputStream.close();
|
||||
|
||||
return outputStream.toByteArray();
|
||||
}
|
||||
}
|
||||
|
|
Binary file not shown.
Loading…
Reference in New Issue