mirror of https://github.com/apache/nifi.git
NIFI-8764 Refactored UnpackContent to use both Commons Compress and Zip4j
- UnpackContent uses Zip4j when configured with a password property - UnpackContent uses Commons Compress when a password is not specified NIFI-8764 Updated Password property description mentioning disabled algorithms NIFI-8764 Adjusted Password property description Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #5201
This commit is contained in:
parent
b75965d08f
commit
75de68e013
|
@ -36,10 +36,14 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import net.lingala.zip4j.io.inputstream.ZipInputStream;
|
||||||
import net.lingala.zip4j.model.LocalFileHeader;
|
import net.lingala.zip4j.model.LocalFileHeader;
|
||||||
|
import net.lingala.zip4j.model.enums.EncryptionMethod;
|
||||||
import org.apache.commons.compress.archivers.ArchiveEntry;
|
import org.apache.commons.compress.archivers.ArchiveEntry;
|
||||||
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||||
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
|
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
|
||||||
|
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
|
||||||
|
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
@ -67,7 +71,6 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.processors.standard.util.FileInfo;
|
import org.apache.nifi.processors.standard.util.FileInfo;
|
||||||
import org.apache.nifi.stream.io.StreamUtils;
|
import org.apache.nifi.stream.io.StreamUtils;
|
||||||
|
@ -75,7 +78,6 @@ import org.apache.nifi.util.FlowFileUnpackager;
|
||||||
import org.apache.nifi.util.FlowFileUnpackagerV1;
|
import org.apache.nifi.util.FlowFileUnpackagerV1;
|
||||||
import org.apache.nifi.util.FlowFileUnpackagerV2;
|
import org.apache.nifi.util.FlowFileUnpackagerV2;
|
||||||
import org.apache.nifi.util.FlowFileUnpackagerV3;
|
import org.apache.nifi.util.FlowFileUnpackagerV3;
|
||||||
import net.lingala.zip4j.io.inputstream.ZipInputStream;
|
|
||||||
|
|
||||||
@EventDriven
|
@EventDriven
|
||||||
@SideEffectFree
|
@SideEffectFree
|
||||||
|
@ -154,7 +156,7 @@ public class UnpackContent extends AbstractProcessor {
|
||||||
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
|
||||||
.name("Password")
|
.name("Password")
|
||||||
.displayName("Password")
|
.displayName("Password")
|
||||||
.description("Password used for decrypting archive entries. Supports Zip files encrypted with ZipCrypto or AES")
|
.description("Password used for decrypting Zip archives encrypted with ZipCrypto or AES. Configuring a password disables support for alternative Zip compression algorithms.")
|
||||||
.required(false)
|
.required(false)
|
||||||
.sensitive(true)
|
.sensitive(true)
|
||||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
|
@ -239,7 +241,7 @@ public class UnpackContent extends AbstractProcessor {
|
||||||
packagingFormat = null;
|
packagingFormat = null;
|
||||||
final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
|
final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
|
||||||
if (mimeType == null) {
|
if (mimeType == null) {
|
||||||
logger.error("No mime.type attribute set for {}; routing to failure", new Object[]{flowFile});
|
logger.error("No mime.type attribute set for {}; routing to failure", flowFile);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -250,7 +252,7 @@ public class UnpackContent extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (packagingFormat == null) {
|
if (packagingFormat == null) {
|
||||||
logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType});
|
logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", flowFile, mimeType);
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -291,7 +293,7 @@ public class UnpackContent extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
unpacker.unpack(session, flowFile, unpacked);
|
unpacker.unpack(session, flowFile, unpacked);
|
||||||
if (unpacked.isEmpty()) {
|
if (unpacked.isEmpty()) {
|
||||||
logger.error("Unable to unpack {} because it does not appear to have any entries; routing to failure", new Object[]{flowFile});
|
logger.error("Unable to unpack {} because it does not appear to have any entries; routing to failure", flowFile);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -304,20 +306,20 @@ public class UnpackContent extends AbstractProcessor {
|
||||||
flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, unpacked.size());
|
flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, unpacked.size());
|
||||||
session.transfer(flowFile, REL_ORIGINAL);
|
session.transfer(flowFile, REL_ORIGINAL);
|
||||||
session.getProvenanceReporter().fork(flowFile, unpacked);
|
session.getProvenanceReporter().fork(flowFile, unpacked);
|
||||||
logger.info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked});
|
logger.info("Unpacked {} into {} and transferred to success", flowFile, unpacked);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
logger.error("Unable to unpack {} due to {}; routing to failure", new Object[]{flowFile, e});
|
logger.error("Unable to unpack {}; routing to failure", flowFile, e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
session.remove(unpacked);
|
session.remove(unpacked);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static abstract class Unpacker {
|
private static abstract class Unpacker {
|
||||||
private Pattern fileFilter = null;
|
protected Pattern fileFilter = null;
|
||||||
|
|
||||||
public Unpacker() {}
|
public Unpacker() {}
|
||||||
|
|
||||||
public Unpacker(Pattern fileFilter) {
|
public Unpacker(final Pattern fileFilter) {
|
||||||
this.fileFilter = fileFilter;
|
this.fileFilter = fileFilter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -340,54 +342,45 @@ public class UnpackContent extends AbstractProcessor {
|
||||||
@Override
|
@Override
|
||||||
public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
|
public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
|
||||||
final String fragmentId = UUID.randomUUID().toString();
|
final String fragmentId = UUID.randomUUID().toString();
|
||||||
session.read(source, new InputStreamCallback() {
|
session.read(source, inputStream -> {
|
||||||
|
int fragmentCount = 0;
|
||||||
|
try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inputStream))) {
|
||||||
|
TarArchiveEntry tarEntry;
|
||||||
|
while ((tarEntry = tarIn.getNextTarEntry()) != null) {
|
||||||
|
if (tarEntry.isDirectory() || !fileMatches(tarEntry)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
final File file = new File(tarEntry.getName());
|
||||||
|
final Path filePath = file.toPath();
|
||||||
|
String filePathString = filePath.getParent() == null ? "/" : filePath.getParent() + "/";
|
||||||
|
final Path absPath = filePath.toAbsolutePath();
|
||||||
|
final String absPathString = absPath.getParent().toString() + "/";
|
||||||
|
|
||||||
@Override
|
FlowFile unpackedFile = session.create(source);
|
||||||
public void process(final InputStream in) throws IOException {
|
try {
|
||||||
int fragmentCount = 0;
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(in))) {
|
attributes.put(CoreAttributes.FILENAME.key(), file.getName());
|
||||||
TarArchiveEntry tarEntry;
|
attributes.put(CoreAttributes.PATH.key(), filePathString);
|
||||||
while ((tarEntry = tarIn.getNextTarEntry()) != null) {
|
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
|
||||||
if (tarEntry.isDirectory() || !fileMatches(tarEntry)) {
|
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
|
||||||
continue;
|
|
||||||
}
|
|
||||||
final File file = new File(tarEntry.getName());
|
|
||||||
final Path filePath = file.toPath();
|
|
||||||
String filePathString = filePath.getParent() == null ? "/" : filePath.getParent() + "/";
|
|
||||||
final Path absPath = filePath.toAbsolutePath();
|
|
||||||
final String absPathString = absPath.getParent().toString() + "/";
|
|
||||||
|
|
||||||
FlowFile unpackedFile = session.create(source);
|
attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode()));
|
||||||
try {
|
attributes.put(FILE_OWNER_ATTRIBUTE, String.valueOf(tarEntry.getUserName()));
|
||||||
final Map<String, String> attributes = new HashMap<>();
|
attributes.put(FILE_GROUP_ATTRIBUTE, String.valueOf(tarEntry.getGroupName()));
|
||||||
attributes.put(CoreAttributes.FILENAME.key(), file.getName());
|
|
||||||
attributes.put(CoreAttributes.PATH.key(), filePathString);
|
|
||||||
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
|
|
||||||
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
|
|
||||||
|
|
||||||
attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode()));
|
final String timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant());
|
||||||
attributes.put(FILE_OWNER_ATTRIBUTE, String.valueOf(tarEntry.getUserName()));
|
attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString);
|
||||||
attributes.put(FILE_GROUP_ATTRIBUTE, String.valueOf(tarEntry.getGroupName()));
|
attributes.put(FILE_CREATION_TIME_ATTRIBUTE, timeAsString);
|
||||||
|
|
||||||
final String timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant());
|
attributes.put(FRAGMENT_ID, fragmentId);
|
||||||
attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString);
|
attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));
|
||||||
attributes.put(FILE_CREATION_TIME_ATTRIBUTE, timeAsString);
|
|
||||||
|
|
||||||
attributes.put(FRAGMENT_ID, fragmentId);
|
unpackedFile = session.putAllAttributes(unpackedFile, attributes);
|
||||||
attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));
|
|
||||||
|
|
||||||
unpackedFile = session.putAllAttributes(unpackedFile, attributes);
|
final long fileSize = tarEntry.getSize();
|
||||||
|
unpackedFile = session.write(unpackedFile, outputStream -> StreamUtils.copy(tarIn, outputStream, fileSize));
|
||||||
final long fileSize = tarEntry.getSize();
|
} finally {
|
||||||
unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
|
unpacked.add(unpackedFile);
|
||||||
@Override
|
|
||||||
public void process(final OutputStream out) throws IOException {
|
|
||||||
StreamUtils.copy(tarIn, out, fileSize);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} finally {
|
|
||||||
unpacked.add(unpackedFile);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -396,7 +389,7 @@ public class UnpackContent extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ZipUnpacker extends Unpacker {
|
private static class ZipUnpacker extends Unpacker {
|
||||||
private char[] password;
|
private final char[] password;
|
||||||
|
|
||||||
public ZipUnpacker(final Pattern fileFilter, final char[] password) {
|
public ZipUnpacker(final Pattern fileFilter, final char[] password) {
|
||||||
super(fileFilter);
|
super(fileFilter);
|
||||||
|
@ -406,50 +399,118 @@ public class UnpackContent extends AbstractProcessor {
|
||||||
@Override
|
@Override
|
||||||
public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
|
public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
|
||||||
final String fragmentId = UUID.randomUUID().toString();
|
final String fragmentId = UUID.randomUUID().toString();
|
||||||
session.read(source, new InputStreamCallback() {
|
if (password == null) {
|
||||||
@Override
|
session.read(source, new CompressedZipInputStreamCallback(fileFilter, session, source, unpacked, fragmentId));
|
||||||
public void process(final InputStream in) throws IOException {
|
} else {
|
||||||
int fragmentCount = 0;
|
session.read(source, new EncryptedZipInputStreamCallback(fileFilter, session, source, unpacked, fragmentId, password));
|
||||||
try (final ZipInputStream zipIn = new ZipInputStream(new BufferedInputStream(in), password)) {
|
}
|
||||||
LocalFileHeader zipEntry;
|
}
|
||||||
while ((zipEntry = zipIn.getNextEntry()) != null) {
|
|
||||||
final String zipEntryName = zipEntry.getFileName();
|
|
||||||
if (zipEntry.isDirectory() || !fileMatches(zipEntryName)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
final File file = new File(zipEntryName);
|
|
||||||
final String parentDirectory = (file.getParent() == null) ? "/" : file.getParent();
|
|
||||||
final Path absPath = file.toPath().toAbsolutePath();
|
|
||||||
final String absPathString = absPath.getParent().toString() + "/";
|
|
||||||
|
|
||||||
FlowFile unpackedFile = session.create(source);
|
private abstract static class ZipInputStreamCallback implements InputStreamCallback {
|
||||||
try {
|
private static final String PATH_SEPARATOR = "/";
|
||||||
final Map<String, String> attributes = new HashMap<>();
|
|
||||||
attributes.put(CoreAttributes.FILENAME.key(), file.getName());
|
|
||||||
attributes.put(CoreAttributes.PATH.key(), parentDirectory);
|
|
||||||
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
|
|
||||||
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
|
|
||||||
|
|
||||||
attributes.put(FRAGMENT_ID, fragmentId);
|
private final Pattern fileFilter;
|
||||||
attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));
|
|
||||||
|
|
||||||
final String encryptionMethod = zipEntry.getEncryptionMethod().toString();
|
private final ProcessSession session;
|
||||||
attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod);
|
|
||||||
|
|
||||||
unpackedFile = session.putAllAttributes(unpackedFile, attributes);
|
private final FlowFile sourceFlowFile;
|
||||||
unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
|
|
||||||
@Override
|
private final List<FlowFile> unpacked;
|
||||||
public void process(final OutputStream out) throws IOException {
|
|
||||||
StreamUtils.copy(zipIn, out);
|
private final String fragmentId;
|
||||||
}
|
|
||||||
});
|
private int fragmentIndex;
|
||||||
} finally {
|
|
||||||
unpacked.add(unpackedFile);
|
private ZipInputStreamCallback(
|
||||||
}
|
final Pattern fileFilter,
|
||||||
}
|
final ProcessSession session,
|
||||||
|
final FlowFile sourceFlowFile,
|
||||||
|
final List<FlowFile> unpacked,
|
||||||
|
final String fragmentId
|
||||||
|
) {
|
||||||
|
this.fileFilter = fileFilter;
|
||||||
|
this.session = session;
|
||||||
|
this.sourceFlowFile = sourceFlowFile;
|
||||||
|
this.unpacked = unpacked;
|
||||||
|
this.fragmentId = fragmentId;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isFileEntryMatched(final boolean directory, final String fileName) {
|
||||||
|
return !directory && (fileFilter == null || fileFilter.matcher(fileName).find());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void processEntry(final InputStream zipInputStream, final boolean directory, final String zipEntryName, final EncryptionMethod encryptionMethod) {
|
||||||
|
if (isFileEntryMatched(directory, zipEntryName)) {
|
||||||
|
final File file = new File(zipEntryName);
|
||||||
|
final String parentDirectory = (file.getParent() == null) ? PATH_SEPARATOR : file.getParent();
|
||||||
|
|
||||||
|
FlowFile unpackedFile = session.create(sourceFlowFile);
|
||||||
|
try {
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), file.getName());
|
||||||
|
attributes.put(CoreAttributes.PATH.key(), parentDirectory);
|
||||||
|
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), file.toPath().toAbsolutePath() + PATH_SEPARATOR);
|
||||||
|
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
|
||||||
|
attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString());
|
||||||
|
|
||||||
|
attributes.put(FRAGMENT_ID, fragmentId);
|
||||||
|
attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentIndex));
|
||||||
|
|
||||||
|
unpackedFile = session.putAllAttributes(unpackedFile, attributes);
|
||||||
|
unpackedFile = session.write(unpackedFile, outputStream -> StreamUtils.copy(zipInputStream, outputStream));
|
||||||
|
} finally {
|
||||||
|
unpacked.add(unpackedFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class CompressedZipInputStreamCallback extends ZipInputStreamCallback {
|
||||||
|
private CompressedZipInputStreamCallback(
|
||||||
|
final Pattern fileFilter,
|
||||||
|
final ProcessSession session,
|
||||||
|
final FlowFile sourceFlowFile,
|
||||||
|
final List<FlowFile> unpacked,
|
||||||
|
final String fragmentId
|
||||||
|
) {
|
||||||
|
super(fileFilter, session, sourceFlowFile, unpacked, fragmentId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(final InputStream inputStream) throws IOException {
|
||||||
|
try (final ZipArchiveInputStream zipInputStream = new ZipArchiveInputStream(new BufferedInputStream(inputStream))) {
|
||||||
|
ZipArchiveEntry zipEntry;
|
||||||
|
while ((zipEntry = zipInputStream.getNextZipEntry()) != null) {
|
||||||
|
processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getName(), EncryptionMethod.NONE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class EncryptedZipInputStreamCallback extends ZipInputStreamCallback {
|
||||||
|
private final char[] password;
|
||||||
|
|
||||||
|
private EncryptedZipInputStreamCallback(
|
||||||
|
final Pattern fileFilter,
|
||||||
|
final ProcessSession session,
|
||||||
|
final FlowFile sourceFlowFile,
|
||||||
|
final List<FlowFile> unpacked,
|
||||||
|
final String fragmentId,
|
||||||
|
final char[] password
|
||||||
|
) {
|
||||||
|
super(fileFilter, session, sourceFlowFile, unpacked, fragmentId);
|
||||||
|
this.password = password;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(final InputStream inputStream) throws IOException {
|
||||||
|
try (final ZipInputStream zipInputStream = new ZipInputStream(new BufferedInputStream(inputStream), password)) {
|
||||||
|
LocalFileHeader zipEntry;
|
||||||
|
while ((zipEntry = zipInputStream.getNextEntry()) != null) {
|
||||||
|
processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getFileName(), zipEntry.getEncryptionMethod());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -463,48 +524,42 @@ public class UnpackContent extends AbstractProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
|
public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
|
||||||
session.read(source, new InputStreamCallback() {
|
session.read(source, inputStream -> {
|
||||||
@Override
|
try (final InputStream in = new BufferedInputStream(inputStream)) {
|
||||||
public void process(final InputStream rawIn) throws IOException {
|
while (unpackager.hasMoreData()) {
|
||||||
try (final InputStream in = new BufferedInputStream(rawIn)) {
|
final AtomicReference<Map<String, String>> attributesRef = new AtomicReference<>(null);
|
||||||
while (unpackager.hasMoreData()) {
|
FlowFile unpackedFile = session.create(source);
|
||||||
final AtomicReference<Map<String, String>> attributesRef = new AtomicReference<>(null);
|
try {
|
||||||
FlowFile unpackedFile = session.create(source);
|
unpackedFile = session.write(unpackedFile, outputStream -> {
|
||||||
try {
|
try (final OutputStream out = new BufferedOutputStream(outputStream)) {
|
||||||
unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
|
final Map<String, String> attributes = unpackager.unpackageFlowFile(in, out);
|
||||||
@Override
|
if (attributes == null) {
|
||||||
public void process(final OutputStream rawOut) throws IOException {
|
throw new IOException("Failed to unpack " + source + ": stream had no Attributes");
|
||||||
try (final OutputStream out = new BufferedOutputStream(rawOut)) {
|
|
||||||
final Map<String, String> attributes = unpackager.unpackageFlowFile(in, out);
|
|
||||||
if (attributes == null) {
|
|
||||||
throw new IOException("Failed to unpack " + source + ": stream had no Attributes");
|
|
||||||
}
|
|
||||||
attributesRef.set(attributes);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
attributesRef.set(attributes);
|
||||||
|
|
||||||
final Map<String, String> attributes = attributesRef.get();
|
|
||||||
|
|
||||||
// Remove the UUID from the attributes because we don't want to use the same UUID for this FlowFile.
|
|
||||||
// If we do, then we get into a weird situation if we use MergeContent to create a FlowFile Package
|
|
||||||
// and later unpack it -- in this case, we have two FlowFiles with the same UUID.
|
|
||||||
attributes.remove(CoreAttributes.UUID.key());
|
|
||||||
|
|
||||||
// maintain backward compatibility with legacy NiFi attribute names
|
|
||||||
mapAttributes(attributes, "nf.file.name", CoreAttributes.FILENAME.key());
|
|
||||||
mapAttributes(attributes, "nf.file.path", CoreAttributes.PATH.key());
|
|
||||||
mapAttributes(attributes, "content-encoding", CoreAttributes.MIME_TYPE.key());
|
|
||||||
mapAttributes(attributes, "content-type", CoreAttributes.MIME_TYPE.key());
|
|
||||||
|
|
||||||
if (!attributes.containsKey(CoreAttributes.MIME_TYPE.key())) {
|
|
||||||
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
|
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
unpackedFile = session.putAllAttributes(unpackedFile, attributes);
|
final Map<String, String> attributes = attributesRef.get();
|
||||||
} finally {
|
|
||||||
unpacked.add(unpackedFile);
|
// Remove the UUID from the attributes because we don't want to use the same UUID for this FlowFile.
|
||||||
|
// If we do, then we get into a weird situation if we use MergeContent to create a FlowFile Package
|
||||||
|
// and later unpack it -- in this case, we have two FlowFiles with the same UUID.
|
||||||
|
attributes.remove(CoreAttributes.UUID.key());
|
||||||
|
|
||||||
|
// maintain backward compatibility with legacy NiFi attribute names
|
||||||
|
mapAttributes(attributes, "nf.file.name", CoreAttributes.FILENAME.key());
|
||||||
|
mapAttributes(attributes, "nf.file.path", CoreAttributes.PATH.key());
|
||||||
|
mapAttributes(attributes, "content-encoding", CoreAttributes.MIME_TYPE.key());
|
||||||
|
mapAttributes(attributes, "content-type", CoreAttributes.MIME_TYPE.key());
|
||||||
|
|
||||||
|
if (!attributes.containsKey(CoreAttributes.MIME_TYPE.key())) {
|
||||||
|
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unpackedFile = session.putAllAttributes(unpackedFile, attributes);
|
||||||
|
} finally {
|
||||||
|
unpacked.add(unpackedFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,8 @@ import org.junit.Test;
|
||||||
|
|
||||||
public class TestUnpackContent {
|
public class TestUnpackContent {
|
||||||
|
|
||||||
|
private static final String FIRST_FRAGMENT_INDEX = "1";
|
||||||
|
|
||||||
private static final Path dataPath = Paths.get("src/test/resources/TestUnpackContent");
|
private static final Path dataPath = Paths.get("src/test/resources/TestUnpackContent");
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -501,6 +503,7 @@ public class TestUnpackContent {
|
||||||
|
|
||||||
final MockFlowFile unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS).iterator().next();
|
final MockFlowFile unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS).iterator().next();
|
||||||
unpacked.assertAttributeEquals(UnpackContent.FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString());
|
unpacked.assertAttributeEquals(UnpackContent.FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString());
|
||||||
|
unpacked.assertAttributeEquals(UnpackContent.FRAGMENT_INDEX, FIRST_FRAGMENT_INDEX);
|
||||||
|
|
||||||
final byte[] unpackedBytes = runner.getContentAsByteArray(unpacked);
|
final byte[] unpackedBytes = runner.getContentAsByteArray(unpacked);
|
||||||
final String unpackedContents = new String(unpackedBytes);
|
final String unpackedContents = new String(unpackedBytes);
|
||||||
|
|
Loading…
Reference in New Issue