Merge branch 'NIFI-378' into develop

This commit is contained in:
Mark Payne 2015-06-23 11:17:30 -04:00
commit e767f5ce02
1 changed files with 31 additions and 26 deletions

View File

@ -82,31 +82,34 @@ import org.apache.nifi.util.ObjectHolder;
+ "created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate.")
@ReadsAttributes({
@ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the <Merge Strategy> property is set to Defragment. "
+ "All FlowFiles with the same value for this attribute will be bundled together"),
@ReadsAttribute(attribute = "fragment.index", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This "
+ "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute and must be a unique integer "
+ "between 0 and the value of the fragment.count attribute. This attribute indicates the order in which the fragments should be assembled"),
+ "All FlowFiles with the same value for this attribute will be bundled together."),
@ReadsAttribute(attribute = "fragment.index", description = "Applicable only if the <Merge Strategy> property is set to Defragment. "
+ "This attribute indicates the order in which the fragments should be assembled. This "
+ "attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all "
+ "FlowFiles that have the same value for the \"fragment.identifier\" attribute) integer "
+ "between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the "
+ "\"fragment.identifier\" attribute and the same value for the \"fragment.index\" attribute, the behavior of this Processor is undefined."),
@ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This "
+ "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same "
+ "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected "
+ "in the given bundle"),
+ "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same "
+ "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected "
+ "in the given bundle."),
@ReadsAttribute(attribute = "segment.original.filename", description = "Applicable only if the <Merge Strategy> property is set to Defragment. "
+ "This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same "
+ "bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged "
+ "FlowFile"),
+ "This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same "
+ "bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged "
+ "FlowFile."),
@ReadsAttribute(attribute = "tar.permissions", description = "Applicable only if the <Merge Format> property is set to TAR. The value of this "
+ "attribute must be 3 characters; each character must be in the range 0 to 7 (inclusive) and indicates the file permissions that should "
+ "be used for the FlowFile's TAR entry. If this attribute is missing or has an invalid value, the default value of 644 will be used")})
+ "attribute must be 3 characters; each character must be in the range 0 to 7 (inclusive) and indicates the file permissions that should "
+ "be used for the FlowFile's TAR entry. If this attribute is missing or has an invalid value, the default value of 644 will be used") })
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "When more than 1 file is merged, the filename comes from the segment.original.filename "
+ "attribute. If that attribute does not exist in the source FlowFiles, then the filename is set to the number of nanoseconds matching "
+ "system time. Then a filename extension may be applied:"
+ "if Merge Format is TAR, then the filename will be appended with .tar, "
+ "if Merge Format is ZIP, then the filename will be appended with .zip, "
+ "if Merge Format is FlowFileStream, then the filename will be appended with .pkg"),
+ "attribute. If that attribute does not exist in the source FlowFiles, then the filename is set to the number of nanoseconds matching "
+ "system time. Then a filename extension may be applied:"
+ "if Merge Format is TAR, then the filename will be appended with .tar, "
+ "if Merge Format is ZIP, then the filename will be appended with .zip, "
+ "if Merge Format is FlowFileStream, then the filename will be appended with .pkg"),
@WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"),
@WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively "
+ "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output")})
+ "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output") })
@SeeAlso(SegmentContent.class)
public class MergeContent extends BinFiles {
@ -131,7 +134,9 @@ public class MergeContent extends BinFiles {
"Defragment",
"Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must "
+ "have the attributes <fragment.identifier>, <fragment.count>, and <fragment.index> or alternatively (for backward compatibility "
+ "purposes) <segment.identifier>, <segment.count>, and <segment.index>");
+ "purposes) <segment.identifier>, <segment.count>, and <segment.index>. All FlowFiles with the same value for \"fragment.identifier\" "
+ "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. All FlowFiles "
+ "in this group must have a unique value for the \"fragment.index\" attribute between 0 and the value of the \"fragment.count\" attribute.");
public static final AllowableValue DELIMITER_STRATEGY_FILENAME = new AllowableValue(
"Filename", "Filename", "The values of Header, Footer, and Demarcator will be retrieved from the contents of a file");
@ -307,7 +312,7 @@ public class MergeContent extends BinFiles {
@Override
protected Collection<ValidationResult> additionalCustomValidation(ValidationContext context) {
Collection<ValidationResult> results = new ArrayList<>();
final Collection<ValidationResult> results = new ArrayList<>();
final String delimiterStrategy = context.getProperty(DELIMITER_STRATEGY).getValue();
if(DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategy)) {
@ -353,7 +358,7 @@ public class MergeContent extends BinFiles {
@Override
protected String getGroupId(final ProcessContext context, final FlowFile flowFile) {
final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName);
String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName);
// when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier
if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
@ -442,7 +447,7 @@ public class MergeContent extends BinFiles {
bundle = session.putAllAttributes(bundle, bundleAttributes);
final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles";
final String inputDescription = binCopy.size() < 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
getLogger().info("Merged {} into {}", new Object[]{inputDescription, bundle});
session.transfer(bundle, REL_MERGED);
@ -640,18 +645,18 @@ public class MergeContent extends BinFiles {
}
if (".".equals(path.getName(0).toString())) {
path = (path.getNameCount() == 1) ? null : path.subpath(1, path.getNameCount());
path = path.getNameCount() == 1 ? null : path.subpath(1, path.getNameCount());
}
return (path == null) ? "" : path.toString() + "/";
return path == null ? "" : path.toString() + "/";
}
private String createFilename(final List<FlowFileSessionWrapper> wrappers) {
if (wrappers.size() == 1) {
return wrappers.get(0).getFlowFile().getAttribute(CoreAttributes.FILENAME.key());
} else {
FlowFile ff = wrappers.get(0).getFlowFile();
String origFilename = ff.getAttribute(SEGMENT_ORIGINAL_FILENAME);
final FlowFile ff = wrappers.get(0).getFlowFile();
final String origFilename = ff.getAttribute(SEGMENT_ORIGINAL_FILENAME);
if (origFilename != null) {
return origFilename;
} else {