From ddad70ba009a875e088020ae1ea8f29caeb529f9 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 22 Jun 2015 12:42:22 -0400 Subject: [PATCH] NIFI-378: updated documentation to explain contract of MergeContent's Defragment strategy a bit more clearly --- .../processors/standard/MergeContent.java | 57 ++++++++++--------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 2883a758e2..65f4124d55 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -82,31 +82,34 @@ import org.apache.nifi.util.ObjectHolder; + "created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate.") @ReadsAttributes({ @ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the property is set to Defragment. " - + "All FlowFiles with the same value for this attribute will be bundled together"), - @ReadsAttribute(attribute = "fragment.index", description = "Applicable only if the property is set to Defragment. This " - + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute and must be a unique integer " - + "between 0 and the value of the fragment.count attribute. This attribute indicates the order in which the fragments should be assembled"), + + "All FlowFiles with the same value for this attribute will be bundled together."), + @ReadsAttribute(attribute = "fragment.index", description = "Applicable only if the property is set to Defragment. " + + "This attribute indicates the order in which the fragments should be assembled. This " + + "attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all " + + "FlowFiles that have the same value for the \"fragment.identifier\" attribute) integer " + + "between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the " + + "\"fragment.identifier\" attribute and the same value for the \"fragment.index\" attribute, the behavior of this Processor is undefined."), @ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the property is set to Defragment. This " - + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same " - + "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected " - + "in the given bundle"), + + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same " + + "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected " + + "in the given bundle."), @ReadsAttribute(attribute = "segment.original.filename", description = "Applicable only if the property is set to Defragment. " - + "This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same " - + "bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged " - + "FlowFile"), + + "This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same " + + "bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged " + + "FlowFile."), @ReadsAttribute(attribute = "tar.permissions", description = "Applicable only if the property is set to TAR. The value of this " - + "attribute must be 3 characters; each character must be in the range 0 to 7 (inclusive) and indicates the file permissions that should " - + "be used for the FlowFile's TAR entry. If this attribute is missing or has an invalid value, the default value of 644 will be used")}) + + "attribute must be 3 characters; each character must be in the range 0 to 7 (inclusive) and indicates the file permissions that should " + + "be used for the FlowFile's TAR entry. If this attribute is missing or has an invalid value, the default value of 644 will be used") }) @WritesAttributes({ @WritesAttribute(attribute = "filename", description = "When more than 1 file is merged, the filename comes from the segment.original.filename " - + "attribute. If that attribute does not exist in the source FlowFiles, then the filename is set to the number of nanoseconds matching " - + "system time. Then a filename extension may be applied:" - + "if Merge Format is TAR, then the filename will be appended with .tar, " - + "if Merge Format is ZIP, then the filename will be appended with .zip, " - + "if Merge Format is FlowFileStream, then the filename will be appended with .pkg"), + + "attribute. If that attribute does not exist in the source FlowFiles, then the filename is set to the number of nanoseconds matching " + + "system time. Then a filename extension may be applied:" + + "if Merge Format is TAR, then the filename will be appended with .tar, " + + "if Merge Format is ZIP, then the filename will be appended with .zip, " + + "if Merge Format is FlowFileStream, then the filename will be appended with .pkg"), @WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"), @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively " - + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output")}) + + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output") }) @SeeAlso(SegmentContent.class) public class MergeContent extends BinFiles { @@ -131,7 +134,9 @@ public class MergeContent extends BinFiles { "Defragment", "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must " + "have the attributes , , and or alternatively (for backward compatibility " - + "purposes) , , and "); + + "purposes) , , and . All FlowFiles with the same value for \"fragment.identifier\" " + + "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. All FlowFiles " + + "in this group must have a unique value for the \"fragment.index\" attribute between 0 and the value of the \"fragment.count\" attribute."); public static final AllowableValue DELIMITER_STRATEGY_FILENAME = new AllowableValue( "Filename", "Filename", "The values of Header, Footer, and Demarcator will be retrieved from the contents of a file"); @@ -307,7 +312,7 @@ public class MergeContent extends BinFiles { @Override protected Collection additionalCustomValidation(ValidationContext context) { - Collection results = new ArrayList<>(); + final Collection results = new ArrayList<>(); final String delimiterStrategy = context.getProperty(DELIMITER_STRATEGY).getValue(); if(DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategy)) { @@ -353,7 +358,7 @@ public class MergeContent extends BinFiles { @Override protected String getGroupId(final ProcessContext context, final FlowFile flowFile) { final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue(); - String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName); + String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName); // when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) { @@ -442,7 +447,7 @@ public class MergeContent extends BinFiles { bundle = session.putAllAttributes(bundle, bundleAttributes); - final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles"; + final String inputDescription = binCopy.size() < 10 ? binCopy.toString() : binCopy.size() + " FlowFiles"; getLogger().info("Merged {} into {}", new Object[]{inputDescription, bundle}); session.transfer(bundle, REL_MERGED); @@ -640,18 +645,18 @@ public class MergeContent extends BinFiles { } if (".".equals(path.getName(0).toString())) { - path = (path.getNameCount() == 1) ? null : path.subpath(1, path.getNameCount()); + path = path.getNameCount() == 1 ? null : path.subpath(1, path.getNameCount()); } - return (path == null) ? "" : path.toString() + "/"; + return path == null ? "" : path.toString() + "/"; } private String createFilename(final List wrappers) { if (wrappers.size() == 1) { return wrappers.get(0).getFlowFile().getAttribute(CoreAttributes.FILENAME.key()); } else { - FlowFile ff = wrappers.get(0).getFlowFile(); - String origFilename = ff.getAttribute(SEGMENT_ORIGINAL_FILENAME); + final FlowFile ff = wrappers.get(0).getFlowFile(); + final String origFilename = ff.getAttribute(SEGMENT_ORIGINAL_FILENAME); if (origFilename != null) { return origFilename; } else {