mirror of https://github.com/apache/nifi.git
NIFI-10273 Supported file entries larger than 8.5GB for TAR in MergeContent
This closes #6369 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
5da9ce525c
commit
572799a201
|
@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericDatumWriter;
|
|||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
|
||||
import org.apache.commons.compress.archivers.tar.TarConstants;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
|
@ -91,6 +92,7 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -771,7 +773,6 @@ public class MergeContent extends BinFiles {
|
|||
public FlowFile merge(final Bin bin, final ProcessContext context) {
|
||||
final List<FlowFile> contents = bin.getContents();
|
||||
final ProcessSession session = bin.getSession();
|
||||
|
||||
final boolean keepPath = context.getProperty(KEEP_PATH).asBoolean();
|
||||
FlowFile bundle = session.create(); // we don't pass the parents to the #create method because the parents belong to different sessions
|
||||
|
||||
|
@ -782,7 +783,12 @@ public class MergeContent extends BinFiles {
|
|||
public void process(final OutputStream rawOut) throws IOException {
|
||||
try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut);
|
||||
final TarArchiveOutputStream out = new TarArchiveOutputStream(bufferedOut)) {
|
||||
|
||||
out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
|
||||
// if any one of the FlowFiles is larger than the default maximum tar entry size, then we set bigNumberMode to handle it
|
||||
if (getMaxEntrySize(contents) >= TarConstants.MAXSIZE) {
|
||||
out.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
|
||||
}
|
||||
for (final FlowFile flowFile : contents) {
|
||||
final String path = keepPath ? getPath(flowFile) : "";
|
||||
final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key());
|
||||
|
@ -827,6 +833,14 @@ public class MergeContent extends BinFiles {
|
|||
return bundle;
|
||||
}
|
||||
|
||||
private long getMaxEntrySize(final List<FlowFile> contents) {
|
||||
final OptionalLong maxSize = contents.stream()
|
||||
.parallel()
|
||||
.mapToLong(ff -> ff.getSize())
|
||||
.max();
|
||||
return maxSize.orElse(0L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMergedContentType() {
|
||||
return "application/tar";
|
||||
|
|
Loading…
Reference in New Issue