NIFI-7643 Removed absolute.path attribute from UnpackContent

- Do not include the absolute.path attribute from Zip/Tar files in UnpackContent; some code cleanup

This closes #7902

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-10-19 09:57:50 -04:00 committed by exceptionfactory
parent 8bfb6be5ba
commit 015b721800
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
1 changed files with 47 additions and 70 deletions

View File

@ -46,7 +46,6 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
@ -68,9 +67,7 @@ import java.nio.file.Path;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -187,29 +184,24 @@ public class UnpackContent extends AbstractProcessor {
.description("The original FlowFile is sent to this relationship when it cannot be unpacked for some reason")
.build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> properties;
private static final Set<Relationship> relationships = Set.of(
REL_SUCCESS,
REL_FAILURE,
REL_ORIGINAL
);
private static final List<PropertyDescriptor> properties = List.of(
PACKAGING_FORMAT,
FILE_FILTER,
PASSWORD,
ALLOW_STORED_ENTRIES_WITH_DATA_DESCRIPTOR
);
private Pattern fileFilter;
private Unpacker tarUnpacker;
private Unpacker zipUnpacker;
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_ORIGINAL);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(PACKAGING_FORMAT);
properties.add(FILE_FILTER);
properties.add(PASSWORD);
properties.add(ALLOW_STORED_ENTRIES_WITH_DATA_DESCRIPTOR);
this.properties = Collections.unmodifiableList(properties);
}
@Override
public Set<Relationship> getRelationships() {
@ -277,31 +269,31 @@ public class UnpackContent extends AbstractProcessor {
final Unpacker unpacker;
final boolean addFragmentAttrs;
switch (packagingFormat) {
case TAR_FORMAT:
case X_TAR_FORMAT:
unpacker = tarUnpacker;
addFragmentAttrs = true;
break;
case ZIP_FORMAT:
unpacker = zipUnpacker;
addFragmentAttrs = true;
break;
case FLOWFILE_STREAM_FORMAT_V2:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
addFragmentAttrs = false;
break;
case FLOWFILE_STREAM_FORMAT_V3:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
addFragmentAttrs = false;
break;
case FLOWFILE_TAR_FORMAT:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
addFragmentAttrs = false;
break;
case AUTO_DETECT_FORMAT:
default:
// The format of the unpacker should be known before initialization
throw new ProcessException(packagingFormat + " is not a valid packaging format");
case TAR_FORMAT:
case X_TAR_FORMAT:
unpacker = tarUnpacker;
addFragmentAttrs = true;
break;
case ZIP_FORMAT:
unpacker = zipUnpacker;
addFragmentAttrs = true;
break;
case FLOWFILE_STREAM_FORMAT_V2:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
addFragmentAttrs = false;
break;
case FLOWFILE_STREAM_FORMAT_V3:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
addFragmentAttrs = false;
break;
case FLOWFILE_TAR_FORMAT:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
addFragmentAttrs = false;
break;
case AUTO_DETECT_FORMAT:
default:
// The format of the unpacker should be known before initialization
throw new ProcessException(packagingFormat + " is not a valid packaging format");
}
final List<FlowFile> unpacked = new ArrayList<>();
@ -368,15 +360,12 @@ public class UnpackContent extends AbstractProcessor {
final File file = new File(tarEntry.getName());
final Path filePath = file.toPath();
String filePathString = filePath.getParent() == null ? "/" : filePath.getParent() + "/";
final Path absPath = filePath.toAbsolutePath();
final String absPathString = absPath.getParent().toString() + "/";
FlowFile unpackedFile = session.create(source);
try {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), file.getName());
attributes.put(CoreAttributes.PATH.key(), filePathString);
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode()));
@ -466,7 +455,6 @@ public class UnpackContent extends AbstractProcessor {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), file.getName());
attributes.put(CoreAttributes.PATH.key(), parentDirectory);
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), file.toPath().toAbsolutePath() + PATH_SEPARATOR);
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString());
@ -484,7 +472,7 @@ public class UnpackContent extends AbstractProcessor {
private static class CompressedZipInputStreamCallback extends ZipInputStreamCallback {
private boolean allowStoredEntriesWithDataDescriptor;
private final boolean allowStoredEntriesWithDataDescriptor;
private CompressedZipInputStreamCallback(
final Pattern fileFilter,
@ -583,11 +571,6 @@ public class UnpackContent extends AbstractProcessor {
}
}
private static void mapAttributes(final Map<String, String> attributes, final String oldKey, final String newKey) {
if (!attributes.containsKey(newKey) && attributes.containsKey(oldKey)) {
attributes.put(newKey, attributes.get(oldKey));
}
}
private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
// first pass verifies all FlowFiles have the FRAGMENT_INDEX attribute and gets the total number of fragments
@ -649,21 +632,15 @@ public class UnpackContent extends AbstractProcessor {
}
public static PackageFormat getFormat(String textValue) {
switch (textValue) {
case AUTO_DETECT_FORMAT_NAME:
return AUTO_DETECT_FORMAT;
case TAR_FORMAT_NAME:
return TAR_FORMAT;
case ZIP_FORMAT_NAME:
return ZIP_FORMAT;
case FLOWFILE_STREAM_FORMAT_V3_NAME:
return FLOWFILE_STREAM_FORMAT_V3;
case FLOWFILE_STREAM_FORMAT_V2_NAME:
return FLOWFILE_STREAM_FORMAT_V2;
case FLOWFILE_TAR_FORMAT_NAME:
return FLOWFILE_TAR_FORMAT;
}
return null;
return switch (textValue) {
case AUTO_DETECT_FORMAT_NAME -> AUTO_DETECT_FORMAT;
case TAR_FORMAT_NAME -> TAR_FORMAT;
case ZIP_FORMAT_NAME -> ZIP_FORMAT;
case FLOWFILE_STREAM_FORMAT_V3_NAME -> FLOWFILE_STREAM_FORMAT_V3;
case FLOWFILE_STREAM_FORMAT_V2_NAME -> FLOWFILE_STREAM_FORMAT_V2;
case FLOWFILE_TAR_FORMAT_NAME -> FLOWFILE_TAR_FORMAT;
default -> null;
};
}
}
}