NIFI-4562: This closes #2331. If an Exception is thrown when merging FlowFiles, ensure that we remove the 'bundle' flowfile before exiting the method

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-12-08 13:09:00 -05:00 committed by joewitt
parent 3b74d2ddad
commit 1fc1d38fd8
1 changed files with 215 additions and 190 deletions

View File

@ -578,49 +578,54 @@ public class MergeContent extends BinFiles {
final ProcessSession session = bin.getSession(); final ProcessSession session = bin.getSession();
FlowFile bundle = session.create(bin.getContents()); FlowFile bundle = session.create(bin.getContents());
final AtomicReference<String> bundleMimeTypeRef = new AtomicReference<>(null); final AtomicReference<String> bundleMimeTypeRef = new AtomicReference<>(null);
bundle = session.write(bundle, new OutputStreamCallback() { try {
@Override bundle = session.write(bundle, new OutputStreamCallback() {
public void process(final OutputStream out) throws IOException { @Override
final byte[] header = getDelimiterContent(context, contents, HEADER); public void process(final OutputStream out) throws IOException {
if (header != null) { final byte[] header = getDelimiterContent(context, contents, HEADER);
out.write(header); if (header != null) {
} out.write(header);
}
boolean isFirst = true; boolean isFirst = true;
final Iterator<FlowFile> itr = contents.iterator(); final Iterator<FlowFile> itr = contents.iterator();
while (itr.hasNext()) { while (itr.hasNext()) {
final FlowFile flowFile = itr.next(); final FlowFile flowFile = itr.next();
bin.getSession().read(flowFile, false, new InputStreamCallback() { bin.getSession().read(flowFile, false, new InputStreamCallback() {
@Override @Override
public void process(final InputStream in) throws IOException { public void process(final InputStream in) throws IOException {
StreamUtils.copy(in, out); StreamUtils.copy(in, out);
}
});
if (itr.hasNext()) {
final byte[] demarcator = getDelimiterContent(context, contents, DEMARCATOR);
if (demarcator != null) {
out.write(demarcator);
}
} }
});
if (itr.hasNext()) { final String flowFileMimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
final byte[] demarcator = getDelimiterContent(context, contents, DEMARCATOR); if (isFirst) {
if (demarcator != null) { bundleMimeTypeRef.set(flowFileMimeType);
out.write(demarcator); isFirst = false;
} else {
if (bundleMimeTypeRef.get() != null && !bundleMimeTypeRef.get().equals(flowFileMimeType)) {
bundleMimeTypeRef.set(null);
}
} }
} }
final String flowFileMimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); final byte[] footer = getDelimiterContent(context, contents, FOOTER);
if (isFirst) { if (footer != null) {
bundleMimeTypeRef.set(flowFileMimeType); out.write(footer);
isFirst = false;
} else {
if (bundleMimeTypeRef.get() != null && !bundleMimeTypeRef.get().equals(flowFileMimeType)) {
bundleMimeTypeRef.set(null);
}
} }
} }
});
final byte[] footer = getDelimiterContent(context, contents, FOOTER); } catch (final Exception e) {
if (footer != null) { session.remove(bundle);
out.write(footer); throw e;
} }
}
});
session.getProvenanceReporter().join(contents, bundle); session.getProvenanceReporter().join(contents, bundle);
bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents)); bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents));
@ -719,48 +724,53 @@ public class MergeContent extends BinFiles {
final boolean keepPath = context.getProperty(KEEP_PATH).asBoolean(); final boolean keepPath = context.getProperty(KEEP_PATH).asBoolean();
FlowFile bundle = session.create(); // we don't pass the parents to the #create method because the parents belong to different sessions FlowFile bundle = session.create(); // we don't pass the parents to the #create method because the parents belong to different sessions
bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".tar"); try {
bundle = session.write(bundle, new OutputStreamCallback() { bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".tar");
@Override bundle = session.write(bundle, new OutputStreamCallback() {
public void process(final OutputStream rawOut) throws IOException { @Override
try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut); public void process(final OutputStream rawOut) throws IOException {
try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut);
final TarArchiveOutputStream out = new TarArchiveOutputStream(bufferedOut)) { final TarArchiveOutputStream out = new TarArchiveOutputStream(bufferedOut)) {
out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU); out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
for (final FlowFile flowFile : contents) { for (final FlowFile flowFile : contents) {
final String path = keepPath ? getPath(flowFile) : ""; final String path = keepPath ? getPath(flowFile) : "";
final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key()); final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key());
final TarArchiveEntry tarEntry = new TarArchiveEntry(entryName); final TarArchiveEntry tarEntry = new TarArchiveEntry(entryName);
tarEntry.setSize(flowFile.getSize()); tarEntry.setSize(flowFile.getSize());
final String permissionsVal = flowFile.getAttribute(TAR_PERMISSIONS_ATTRIBUTE); final String permissionsVal = flowFile.getAttribute(TAR_PERMISSIONS_ATTRIBUTE);
if (permissionsVal != null) { if (permissionsVal != null) {
try { try {
tarEntry.setMode(Integer.parseInt(permissionsVal)); tarEntry.setMode(Integer.parseInt(permissionsVal));
} catch (final Exception e) { } catch (final Exception e) {
getLogger().debug("Attribute {} of {} is set to {}; expected 3 digits between 0-7, so ignoring", getLogger().debug("Attribute {} of {} is set to {}; expected 3 digits between 0-7, so ignoring",
new Object[]{TAR_PERMISSIONS_ATTRIBUTE, flowFile, permissionsVal}); new Object[] {TAR_PERMISSIONS_ATTRIBUTE, flowFile, permissionsVal});
}
} }
}
final String modTime = context.getProperty(TAR_MODIFIED_TIME) final String modTime = context.getProperty(TAR_MODIFIED_TIME)
.evaluateAttributeExpressions(flowFile).getValue(); .evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isNotBlank(modTime)) { if (StringUtils.isNotBlank(modTime)) {
try { try {
tarEntry.setModTime(Instant.parse(modTime).toEpochMilli()); tarEntry.setModTime(Instant.parse(modTime).toEpochMilli());
} catch (final Exception e) { } catch (final Exception e) {
getLogger().debug("Attribute {} of {} is set to {}; expected ISO8601 format, so ignoring", getLogger().debug("Attribute {} of {} is set to {}; expected ISO8601 format, so ignoring",
new Object[]{TAR_MODIFIED_TIME, flowFile, modTime}); new Object[] {TAR_MODIFIED_TIME, flowFile, modTime});
}
} }
out.putArchiveEntry(tarEntry);
bin.getSession().exportTo(flowFile, out);
out.closeArchiveEntry();
} }
out.putArchiveEntry(tarEntry);
bin.getSession().exportTo(flowFile, out);
out.closeArchiveEntry();
} }
} }
} });
}); } catch (final Exception e) {
session.remove(bundle);
throw e;
}
bin.getSession().getProvenanceReporter().join(contents, bundle); bin.getSession().getProvenanceReporter().join(contents, bundle);
return bundle; return bundle;
@ -794,35 +804,40 @@ public class MergeContent extends BinFiles {
FlowFile bundle = session.create(contents); FlowFile bundle = session.create(contents);
bundle = session.write(bundle, new OutputStreamCallback() { try {
@Override bundle = session.write(bundle, new OutputStreamCallback() {
public void process(final OutputStream rawOut) throws IOException { @Override
try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut)) { public void process(final OutputStream rawOut) throws IOException {
// we don't want the packager closing the stream. V1 creates a TAR Output Stream, which then gets try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut)) {
// closed, which in turn closes the underlying OutputStream, and we want to protect ourselves against that. // we don't want the packager closing the stream. V1 creates a TAR Output Stream, which then gets
final OutputStream out = new NonCloseableOutputStream(bufferedOut); // closed, which in turn closes the underlying OutputStream, and we want to protect ourselves against that.
final OutputStream out = new NonCloseableOutputStream(bufferedOut);
for (final FlowFile flowFile : contents) { for (final FlowFile flowFile : contents) {
bin.getSession().read(flowFile, false, new InputStreamCallback() { bin.getSession().read(flowFile, false, new InputStreamCallback() {
@Override @Override
public void process(final InputStream rawIn) throws IOException { public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) { try (final InputStream in = new BufferedInputStream(rawIn)) {
final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes()); final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
// for backward compatibility purposes, we add the "legacy" NiFi attributes // for backward compatibility purposes, we add the "legacy" NiFi attributes
attributes.put("nf.file.name", attributes.get(CoreAttributes.FILENAME.key())); attributes.put("nf.file.name", attributes.get(CoreAttributes.FILENAME.key()));
attributes.put("nf.file.path", attributes.get(CoreAttributes.PATH.key())); attributes.put("nf.file.path", attributes.get(CoreAttributes.PATH.key()));
if (attributes.containsKey(CoreAttributes.MIME_TYPE.key())) { if (attributes.containsKey(CoreAttributes.MIME_TYPE.key())) {
attributes.put("content-type", attributes.get(CoreAttributes.MIME_TYPE.key())); attributes.put("content-type", attributes.get(CoreAttributes.MIME_TYPE.key()));
}
packager.packageFlowFile(in, out, attributes, flowFile.getSize());
} }
packager.packageFlowFile(in, out, attributes, flowFile.getSize());
} }
} });
}); }
} }
} }
} });
}); } catch (final Exception e) {
session.remove(bundle);
throw e;
}
bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".pkg"); bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".pkg");
session.getProvenanceReporter().join(contents, bundle); session.getProvenanceReporter().join(contents, bundle);
@ -860,34 +875,39 @@ public class MergeContent extends BinFiles {
FlowFile bundle = session.create(contents); FlowFile bundle = session.create(contents);
bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".zip"); try {
bundle = session.write(bundle, new OutputStreamCallback() { bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".zip");
@Override bundle = session.write(bundle, new OutputStreamCallback() {
public void process(final OutputStream rawOut) throws IOException { @Override
try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut); public void process(final OutputStream rawOut) throws IOException {
try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut);
final ZipOutputStream out = new ZipOutputStream(bufferedOut)) { final ZipOutputStream out = new ZipOutputStream(bufferedOut)) {
out.setLevel(compressionLevel); out.setLevel(compressionLevel);
for (final FlowFile flowFile : contents) { for (final FlowFile flowFile : contents) {
final String path = keepPath ? getPath(flowFile) : ""; final String path = keepPath ? getPath(flowFile) : "";
final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key()); final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key());
final ZipEntry zipEntry = new ZipEntry(entryName); final ZipEntry zipEntry = new ZipEntry(entryName);
zipEntry.setSize(flowFile.getSize()); zipEntry.setSize(flowFile.getSize());
try { try {
out.putNextEntry(zipEntry); out.putNextEntry(zipEntry);
bin.getSession().exportTo(flowFile, out); bin.getSession().exportTo(flowFile, out);
out.closeEntry(); out.closeEntry();
unmerged.remove(flowFile); unmerged.remove(flowFile);
} catch (ZipException e) { } catch (ZipException e) {
getLogger().error("Encountered exception merging {}", new Object[]{flowFile}, e); getLogger().error("Encountered exception merging {}", new Object[] {flowFile}, e);
}
} }
}
out.finish(); out.finish();
out.flush(); out.flush();
}
} }
} });
}); } catch (final Exception e) {
session.remove(bundle);
throw e;
}
session.getProvenanceReporter().join(contents, bundle); session.getProvenanceReporter().join(contents, bundle);
return bundle; return bundle;
@ -921,92 +941,97 @@ public class MergeContent extends BinFiles {
// we don't pass the parents to the #create method because the parents belong to different sessions // we don't pass the parents to the #create method because the parents belong to different sessions
FlowFile bundle = session.create(contents); FlowFile bundle = session.create(contents);
bundle = session.write(bundle, new OutputStreamCallback() { try {
@Override bundle = session.write(bundle, new OutputStreamCallback() {
public void process(final OutputStream rawOut) throws IOException { @Override
try (final OutputStream out = new BufferedOutputStream(rawOut)) { public void process(final OutputStream rawOut) throws IOException {
for (final FlowFile flowFile : contents) { try (final OutputStream out = new BufferedOutputStream(rawOut)) {
bin.getSession().read(flowFile, false, new InputStreamCallback() { for (final FlowFile flowFile : contents) {
@Override bin.getSession().read(flowFile, false, new InputStreamCallback() {
public void process(InputStream in) throws IOException { @Override
boolean canMerge = true; public void process(InputStream in) throws IOException {
try (DataFileStream<GenericRecord> reader = new DataFileStream<>(in, boolean canMerge = true;
try (DataFileStream<GenericRecord> reader = new DataFileStream<>(in,
new GenericDatumReader<GenericRecord>())) { new GenericDatumReader<GenericRecord>())) {
if (schema.get() == null) { if (schema.get() == null) {
// this is the first file - set up the writer, and store the // this is the first file - set up the writer, and store the
// Schema & metadata we'll use. // Schema & metadata we'll use.
schema.set(reader.getSchema()); schema.set(reader.getSchema());
if (!METADATA_STRATEGY_IGNORE.getValue().equals(metadataStrategy)) { if (!METADATA_STRATEGY_IGNORE.getValue().equals(metadataStrategy)) {
for (String key : reader.getMetaKeys()) { for (String key : reader.getMetaKeys()) {
if (!DataFileWriter.isReservedMeta(key)) { if (!DataFileWriter.isReservedMeta(key)) {
byte[] metadatum = reader.getMeta(key); byte[] metadatum = reader.getMeta(key);
metadata.put(key, metadatum); metadata.put(key, metadatum);
writer.setMeta(key, metadatum); writer.setMeta(key, metadatum);
}
}
}
inputCodec.set(reader.getMetaString(DataFileConstants.CODEC));
if (inputCodec.get() == null) {
inputCodec.set(DataFileConstants.NULL_CODEC);
}
writer.setCodec(CodecFactory.fromString(inputCodec.get()));
writer.create(schema.get(), out);
} else {
// check that we're appending to the same schema
if (!schema.get().equals(reader.getSchema())) {
getLogger().debug("Input file {} has different schema - {}, not merging",
new Object[]{flowFile.getId(), reader.getSchema().getName()});
canMerge = false;
unmerged.add(flowFile);
}
if (METADATA_STRATEGY_DO_NOT_MERGE.getValue().equals(metadataStrategy)
|| METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy)) {
// check that we're appending to the same metadata
for (String key : reader.getMetaKeys()) {
if (!DataFileWriter.isReservedMeta(key)) {
byte[] metadatum = reader.getMeta(key);
byte[] writersMetadatum = metadata.get(key);
if (!Arrays.equals(metadatum, writersMetadatum)) {
// Ignore additional metadata if ALL_COMMON is the strategy, otherwise don't merge
if (!METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy) || writersMetadatum != null) {
getLogger().debug("Input file {} has different non-reserved metadata, not merging",
new Object[]{flowFile.getId()});
canMerge = false;
unmerged.add(flowFile);
}
} }
} }
} }
} // else the metadata in the first FlowFile was either ignored or retained in the if-clause above inputCodec.set(reader.getMetaString(DataFileConstants.CODEC));
if (inputCodec.get() == null) {
inputCodec.set(DataFileConstants.NULL_CODEC);
}
writer.setCodec(CodecFactory.fromString(inputCodec.get()));
writer.create(schema.get(), out);
} else {
// check that we're appending to the same schema
if (!schema.get().equals(reader.getSchema())) {
getLogger().debug("Input file {} has different schema - {}, not merging",
new Object[] {flowFile.getId(), reader.getSchema().getName()});
canMerge = false;
unmerged.add(flowFile);
}
// check that we're appending to the same codec if (METADATA_STRATEGY_DO_NOT_MERGE.getValue().equals(metadataStrategy)
String thisCodec = reader.getMetaString(DataFileConstants.CODEC); || METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy)) {
if (thisCodec == null) { // check that we're appending to the same metadata
thisCodec = DataFileConstants.NULL_CODEC; for (String key : reader.getMetaKeys()) {
} if (!DataFileWriter.isReservedMeta(key)) {
if (!inputCodec.get().equals(thisCodec)) { byte[] metadatum = reader.getMeta(key);
getLogger().debug("Input file {} has different codec, not merging", byte[] writersMetadatum = metadata.get(key);
new Object[]{flowFile.getId()}); if (!Arrays.equals(metadatum, writersMetadatum)) {
canMerge = false; // Ignore additional metadata if ALL_COMMON is the strategy, otherwise don't merge
unmerged.add(flowFile); if (!METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy) || writersMetadatum != null) {
} getLogger().debug("Input file {} has different non-reserved metadata, not merging",
} new Object[] {flowFile.getId()});
canMerge = false;
unmerged.add(flowFile);
}
}
}
}
} // else the metadata in the first FlowFile was either ignored or retained in the if-clause above
// write the Avro content from the current FlowFile to the merged OutputStream // check that we're appending to the same codec
if (canMerge) { String thisCodec = reader.getMetaString(DataFileConstants.CODEC);
writer.appendAllFrom(reader, false); if (thisCodec == null) {
thisCodec = DataFileConstants.NULL_CODEC;
}
if (!inputCodec.get().equals(thisCodec)) {
getLogger().debug("Input file {} has different codec, not merging",
new Object[] {flowFile.getId()});
canMerge = false;
unmerged.add(flowFile);
}
}
// write the Avro content from the current FlowFile to the merged OutputStream
if (canMerge) {
writer.appendAllFrom(reader, false);
}
} }
} }
} });
}); }
writer.flush();
} finally {
writer.close();
} }
writer.flush();
} finally {
writer.close();
} }
} });
}); } catch (final Exception e) {
session.remove(bundle);
throw e;
}
final Collection<FlowFile> parents; final Collection<FlowFile> parents;
if (unmerged.isEmpty()) { if (unmerged.isEmpty()) {