diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 79a69a7b8b..e78cd4f4e8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -578,49 +578,54 @@ public class MergeContent extends BinFiles { final ProcessSession session = bin.getSession(); FlowFile bundle = session.create(bin.getContents()); final AtomicReference bundleMimeTypeRef = new AtomicReference<>(null); - bundle = session.write(bundle, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - final byte[] header = getDelimiterContent(context, contents, HEADER); - if (header != null) { - out.write(header); - } + try { + bundle = session.write(bundle, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + final byte[] header = getDelimiterContent(context, contents, HEADER); + if (header != null) { + out.write(header); + } - boolean isFirst = true; - final Iterator itr = contents.iterator(); - while (itr.hasNext()) { - final FlowFile flowFile = itr.next(); - bin.getSession().read(flowFile, false, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.copy(in, out); + boolean isFirst = true; + final Iterator itr = contents.iterator(); + while (itr.hasNext()) { + final FlowFile flowFile = itr.next(); + bin.getSession().read(flowFile, false, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.copy(in, out); + } + }); + + if (itr.hasNext()) { + final byte[] demarcator = getDelimiterContent(context, contents, DEMARCATOR); + if (demarcator != null) { + out.write(demarcator); + } } - }); - if (itr.hasNext()) { - final byte[] demarcator = getDelimiterContent(context, contents, DEMARCATOR); - if (demarcator != null) { - out.write(demarcator); + final String flowFileMimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); + if (isFirst) { + bundleMimeTypeRef.set(flowFileMimeType); + isFirst = false; + } else { + if (bundleMimeTypeRef.get() != null && !bundleMimeTypeRef.get().equals(flowFileMimeType)) { + bundleMimeTypeRef.set(null); + } } } - final String flowFileMimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); - if (isFirst) { - bundleMimeTypeRef.set(flowFileMimeType); - isFirst = false; - } else { - if (bundleMimeTypeRef.get() != null && !bundleMimeTypeRef.get().equals(flowFileMimeType)) { - bundleMimeTypeRef.set(null); - } + final byte[] footer = getDelimiterContent(context, contents, FOOTER); + if (footer != null) { + out.write(footer); } } - - final byte[] footer = getDelimiterContent(context, contents, FOOTER); - if (footer != null) { - out.write(footer); - } - } - }); + }); + } catch (final Exception e) { + session.remove(bundle); + throw e; + } session.getProvenanceReporter().join(contents, bundle); bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents)); @@ -719,48 +724,53 @@ public class MergeContent extends BinFiles { final boolean keepPath = context.getProperty(KEEP_PATH).asBoolean(); FlowFile bundle = session.create(); // we don't pass the parents to the #create method because the parents belong to different sessions - bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".tar"); - bundle = session.write(bundle, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut); + try { + bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".tar"); + bundle = session.write(bundle, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut); final TarArchiveOutputStream out = new TarArchiveOutputStream(bufferedOut)) { - out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU); - for (final FlowFile flowFile : contents) { - final String path = keepPath ? getPath(flowFile) : ""; - final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key()); + out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU); + for (final FlowFile flowFile : contents) { + final String path = keepPath ? getPath(flowFile) : ""; + final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key()); - final TarArchiveEntry tarEntry = new TarArchiveEntry(entryName); - tarEntry.setSize(flowFile.getSize()); - final String permissionsVal = flowFile.getAttribute(TAR_PERMISSIONS_ATTRIBUTE); - if (permissionsVal != null) { - try { - tarEntry.setMode(Integer.parseInt(permissionsVal)); - } catch (final Exception e) { - getLogger().debug("Attribute {} of {} is set to {}; expected 3 digits between 0-7, so ignoring", - new Object[]{TAR_PERMISSIONS_ATTRIBUTE, flowFile, permissionsVal}); + final TarArchiveEntry tarEntry = new TarArchiveEntry(entryName); + tarEntry.setSize(flowFile.getSize()); + final String permissionsVal = flowFile.getAttribute(TAR_PERMISSIONS_ATTRIBUTE); + if (permissionsVal != null) { + try { + tarEntry.setMode(Integer.parseInt(permissionsVal)); + } catch (final Exception e) { + getLogger().debug("Attribute {} of {} is set to {}; expected 3 digits between 0-7, so ignoring", + new Object[] {TAR_PERMISSIONS_ATTRIBUTE, flowFile, permissionsVal}); + } } - } - final String modTime = context.getProperty(TAR_MODIFIED_TIME) + final String modTime = context.getProperty(TAR_MODIFIED_TIME) .evaluateAttributeExpressions(flowFile).getValue(); - if (StringUtils.isNotBlank(modTime)) { - try { - tarEntry.setModTime(Instant.parse(modTime).toEpochMilli()); - } catch (final Exception e) { - getLogger().debug("Attribute {} of {} is set to {}; expected ISO8601 format, so ignoring", - new Object[]{TAR_MODIFIED_TIME, flowFile, modTime}); + if (StringUtils.isNotBlank(modTime)) { + try { + tarEntry.setModTime(Instant.parse(modTime).toEpochMilli()); + } catch (final Exception e) { + getLogger().debug("Attribute {} of {} is set to {}; expected ISO8601 format, so ignoring", + new Object[] {TAR_MODIFIED_TIME, flowFile, modTime}); + } } + + out.putArchiveEntry(tarEntry); + + bin.getSession().exportTo(flowFile, out); + out.closeArchiveEntry(); } - - out.putArchiveEntry(tarEntry); - - bin.getSession().exportTo(flowFile, out); - out.closeArchiveEntry(); } } - } - }); + }); + } catch (final Exception e) { + session.remove(bundle); + throw e; + } bin.getSession().getProvenanceReporter().join(contents, bundle); return bundle; @@ -794,35 +804,40 @@ public class MergeContent extends BinFiles { FlowFile bundle = session.create(contents); - bundle = session.write(bundle, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut)) { - // we don't want the packager closing the stream. V1 creates a TAR Output Stream, which then gets - // closed, which in turn closes the underlying OutputStream, and we want to protect ourselves against that. - final OutputStream out = new NonCloseableOutputStream(bufferedOut); + try { + bundle = session.write(bundle, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut)) { + // we don't want the packager closing the stream. V1 creates a TAR Output Stream, which then gets + // closed, which in turn closes the underlying OutputStream, and we want to protect ourselves against that. + final OutputStream out = new NonCloseableOutputStream(bufferedOut); - for (final FlowFile flowFile : contents) { - bin.getSession().read(flowFile, false, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - try (final InputStream in = new BufferedInputStream(rawIn)) { - final Map attributes = new HashMap<>(flowFile.getAttributes()); + for (final FlowFile flowFile : contents) { + bin.getSession().read(flowFile, false, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + final Map attributes = new HashMap<>(flowFile.getAttributes()); - // for backward compatibility purposes, we add the "legacy" NiFi attributes - attributes.put("nf.file.name", attributes.get(CoreAttributes.FILENAME.key())); - attributes.put("nf.file.path", attributes.get(CoreAttributes.PATH.key())); - if (attributes.containsKey(CoreAttributes.MIME_TYPE.key())) { - attributes.put("content-type", attributes.get(CoreAttributes.MIME_TYPE.key())); + // for backward compatibility purposes, we add the "legacy" NiFi attributes + attributes.put("nf.file.name", attributes.get(CoreAttributes.FILENAME.key())); + attributes.put("nf.file.path", attributes.get(CoreAttributes.PATH.key())); + if (attributes.containsKey(CoreAttributes.MIME_TYPE.key())) { + attributes.put("content-type", attributes.get(CoreAttributes.MIME_TYPE.key())); + } + packager.packageFlowFile(in, out, attributes, flowFile.getSize()); } - packager.packageFlowFile(in, out, attributes, flowFile.getSize()); } - } - }); + }); + } } } - } - }); + }); + } catch (final Exception e) { + session.remove(bundle); + throw e; + } bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".pkg"); session.getProvenanceReporter().join(contents, bundle); @@ -860,34 +875,39 @@ public class MergeContent extends BinFiles { FlowFile bundle = session.create(contents); - bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".zip"); - bundle = session.write(bundle, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut); + try { + bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".zip"); + bundle = session.write(bundle, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut); final ZipOutputStream out = new ZipOutputStream(bufferedOut)) { - out.setLevel(compressionLevel); - for (final FlowFile flowFile : contents) { - final String path = keepPath ? getPath(flowFile) : ""; - final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key()); - final ZipEntry zipEntry = new ZipEntry(entryName); - zipEntry.setSize(flowFile.getSize()); - try { - out.putNextEntry(zipEntry); + out.setLevel(compressionLevel); + for (final FlowFile flowFile : contents) { + final String path = keepPath ? getPath(flowFile) : ""; + final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final ZipEntry zipEntry = new ZipEntry(entryName); + zipEntry.setSize(flowFile.getSize()); + try { + out.putNextEntry(zipEntry); - bin.getSession().exportTo(flowFile, out); - out.closeEntry(); - unmerged.remove(flowFile); - } catch (ZipException e) { - getLogger().error("Encountered exception merging {}", new Object[]{flowFile}, e); + bin.getSession().exportTo(flowFile, out); + out.closeEntry(); + unmerged.remove(flowFile); + } catch (ZipException e) { + getLogger().error("Encountered exception merging {}", new Object[] {flowFile}, e); + } } - } - out.finish(); - out.flush(); + out.finish(); + out.flush(); + } } - } - }); + }); + } catch (final Exception e) { + session.remove(bundle); + throw e; + } session.getProvenanceReporter().join(contents, bundle); return bundle; @@ -921,92 +941,97 @@ public class MergeContent extends BinFiles { // we don't pass the parents to the #create method because the parents belong to different sessions FlowFile bundle = session.create(contents); - bundle = session.write(bundle, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final OutputStream out = new BufferedOutputStream(rawOut)) { - for (final FlowFile flowFile : contents) { - bin.getSession().read(flowFile, false, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - boolean canMerge = true; - try (DataFileStream reader = new DataFileStream<>(in, + try { + bundle = session.write(bundle, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(rawOut)) { + for (final FlowFile flowFile : contents) { + bin.getSession().read(flowFile, false, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + boolean canMerge = true; + try (DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { - if (schema.get() == null) { - // this is the first file - set up the writer, and store the - // Schema & metadata we'll use. - schema.set(reader.getSchema()); - if (!METADATA_STRATEGY_IGNORE.getValue().equals(metadataStrategy)) { - for (String key : reader.getMetaKeys()) { - if (!DataFileWriter.isReservedMeta(key)) { - byte[] metadatum = reader.getMeta(key); - metadata.put(key, metadatum); - writer.setMeta(key, metadatum); - } - } - } - inputCodec.set(reader.getMetaString(DataFileConstants.CODEC)); - if (inputCodec.get() == null) { - inputCodec.set(DataFileConstants.NULL_CODEC); - } - writer.setCodec(CodecFactory.fromString(inputCodec.get())); - writer.create(schema.get(), out); - } else { - // check that we're appending to the same schema - if (!schema.get().equals(reader.getSchema())) { - getLogger().debug("Input file {} has different schema - {}, not merging", - new Object[]{flowFile.getId(), reader.getSchema().getName()}); - canMerge = false; - unmerged.add(flowFile); - } - - if (METADATA_STRATEGY_DO_NOT_MERGE.getValue().equals(metadataStrategy) - || METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy)) { - // check that we're appending to the same metadata - for (String key : reader.getMetaKeys()) { - if (!DataFileWriter.isReservedMeta(key)) { - byte[] metadatum = reader.getMeta(key); - byte[] writersMetadatum = metadata.get(key); - if (!Arrays.equals(metadatum, writersMetadatum)) { - // Ignore additional metadata if ALL_COMMON is the strategy, otherwise don't merge - if (!METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy) || writersMetadatum != null) { - getLogger().debug("Input file {} has different non-reserved metadata, not merging", - new Object[]{flowFile.getId()}); - canMerge = false; - unmerged.add(flowFile); - } + if (schema.get() == null) { + // this is the first file - set up the writer, and store the + // Schema & metadata we'll use. + schema.set(reader.getSchema()); + if (!METADATA_STRATEGY_IGNORE.getValue().equals(metadataStrategy)) { + for (String key : reader.getMetaKeys()) { + if (!DataFileWriter.isReservedMeta(key)) { + byte[] metadatum = reader.getMeta(key); + metadata.put(key, metadatum); + writer.setMeta(key, metadatum); } } } - } // else the metadata in the first FlowFile was either ignored or retained in the if-clause above + inputCodec.set(reader.getMetaString(DataFileConstants.CODEC)); + if (inputCodec.get() == null) { + inputCodec.set(DataFileConstants.NULL_CODEC); + } + writer.setCodec(CodecFactory.fromString(inputCodec.get())); + writer.create(schema.get(), out); + } else { + // check that we're appending to the same schema + if (!schema.get().equals(reader.getSchema())) { + getLogger().debug("Input file {} has different schema - {}, not merging", + new Object[] {flowFile.getId(), reader.getSchema().getName()}); + canMerge = false; + unmerged.add(flowFile); + } - // check that we're appending to the same codec - String thisCodec = reader.getMetaString(DataFileConstants.CODEC); - if (thisCodec == null) { - thisCodec = DataFileConstants.NULL_CODEC; - } - if (!inputCodec.get().equals(thisCodec)) { - getLogger().debug("Input file {} has different codec, not merging", - new Object[]{flowFile.getId()}); - canMerge = false; - unmerged.add(flowFile); - } - } + if (METADATA_STRATEGY_DO_NOT_MERGE.getValue().equals(metadataStrategy) + || METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy)) { + // check that we're appending to the same metadata + for (String key : reader.getMetaKeys()) { + if (!DataFileWriter.isReservedMeta(key)) { + byte[] metadatum = reader.getMeta(key); + byte[] writersMetadatum = metadata.get(key); + if (!Arrays.equals(metadatum, writersMetadatum)) { + // Ignore additional metadata if ALL_COMMON is the strategy, otherwise don't merge + if (!METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy) || writersMetadatum != null) { + getLogger().debug("Input file {} has different non-reserved metadata, not merging", + new Object[] {flowFile.getId()}); + canMerge = false; + unmerged.add(flowFile); + } + } + } + } + } // else the metadata in the first FlowFile was either ignored or retained in the if-clause above - // write the Avro content from the current FlowFile to the merged OutputStream - if (canMerge) { - writer.appendAllFrom(reader, false); + // check that we're appending to the same codec + String thisCodec = reader.getMetaString(DataFileConstants.CODEC); + if (thisCodec == null) { + thisCodec = DataFileConstants.NULL_CODEC; + } + if (!inputCodec.get().equals(thisCodec)) { + getLogger().debug("Input file {} has different codec, not merging", + new Object[] {flowFile.getId()}); + canMerge = false; + unmerged.add(flowFile); + } + } + + // write the Avro content from the current FlowFile to the merged OutputStream + if (canMerge) { + writer.appendAllFrom(reader, false); + } } } - } - }); + }); + } + writer.flush(); + } finally { + writer.close(); } - writer.flush(); - } finally { - writer.close(); } - } - }); + }); + } catch (final Exception e) { + session.remove(bundle); + throw e; + } final Collection parents; if (unmerged.isEmpty()) {