NIFI-11369: Fixed Defragment strategy with optional fragment.count attribute in MergeContent

This commit is contained in:
Peter Turcsanyi 2023-05-16 22:11:39 +02:00 committed by Tamas Palfy
parent f6b0bd30df
commit 3bd4b49abe
4 changed files with 140 additions and 30 deletions

View File

@ -44,8 +44,8 @@ public class Bin {
private final long minimumSizeBytes;
private final long maximumSizeBytes;
private volatile int minimumEntries = 0;
private volatile int maximumEntries = Integer.MAX_VALUE;
private volatile int minimumEntries;
private volatile int maximumEntries;
private final String fileCountAttribute;
private volatile EvictionReason evictionReason = EvictionReason.UNSET;
@ -67,12 +67,22 @@ public class Bin {
*/
public Bin(final ProcessSession session, final long minSizeBytes, final long maxSizeBytes, final int minEntries, final int maxEntries, final String fileCountAttribute) {
this.session = session;
this.minimumSizeBytes = minSizeBytes;
this.maximumSizeBytes = maxSizeBytes;
this.minimumEntries = minEntries;
this.maximumEntries = maxEntries;
this.fileCountAttribute = fileCountAttribute;
if (this.fileCountAttribute != null ) {
// Merge Strategy = Defragment
// FlowFiles will be merged based on fragment.* attributes
this.minimumSizeBytes = 0;
this.maximumSizeBytes = Long.MAX_VALUE;
this.minimumEntries = Integer.MAX_VALUE;
this.maximumEntries = Integer.MAX_VALUE;
} else {
this.minimumSizeBytes = minSizeBytes;
this.maximumSizeBytes = maxSizeBytes;
this.minimumEntries = minEntries;
this.maximumEntries = maxEntries;
}
this.creationMomentEpochNs = System.nanoTime();
if (minSizeBytes > maxSizeBytes) {
throw new IllegalArgumentException();
@ -164,16 +174,16 @@ public class Bin {
if (fileCountAttribute != null) {
final String countValue = flowFile.getAttribute(fileCountAttribute);
final Integer count = toInteger(countValue);
if (count == null) {
return false;
if (count != null) {
// set the limits for the bin as an exact count when the count attribute arrives
this.maximumEntries = count;
this.minimumEntries = count;
}
this.maximumEntries = count;
this.minimumEntries = count;
final String index = flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE);
if (index == null || index.isEmpty() || !binIndexSet.add(index)) {
// Do not accept flowfile with duplicate fragment index value
logger.warn("Duplicate or missing value for '" + FRAGMENT_INDEX_ATTRIBUTE + "' in defragment mode. Flowfile {} not allowed in Bin", new Object[] { flowFile });
logger.warn("Duplicate or missing value for '" + FRAGMENT_INDEX_ATTRIBUTE + "' in defragment mode. Flowfile {} not allowed in Bin", flowFile);
successiveFailedOfferings++;
return false;
}

View File

@ -120,9 +120,8 @@ import java.util.zip.ZipOutputStream;
+ "\"fragment.identifier\" attribute and the same value for the \"fragment.index\" attribute, the first FlowFile processed will be "
+ "accepted and subsequent FlowFiles will not be accepted into the Bin."),
@ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This "
+ "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same "
+ "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected "
+ "in the given bundle."),
+ "attribute indicates how many FlowFiles should be expected in the given bundle. At least one FlowFile must have this attribute in "
+ "the bundle. If multiple FlowFiles contain the \"fragment.count\" attribute in a given bundle, all must have the same value."),
@ReadsAttribute(attribute = "segment.original.filename", description = "Applicable only if the <Merge Strategy> property is set to Defragment. "
+ "This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same "
+ "bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged "
@ -572,16 +571,23 @@ public class MergeContent extends BinFiles {
fragmentIdentifier = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE);
if (!isNumber(fragmentCount)) {
return "Cannot Defragment " + flowFile + " because it does not have an integer value for the " + FRAGMENT_COUNT_ATTRIBUTE + " attribute";
} else if (decidedFragmentCount == null) {
decidedFragmentCount = fragmentCount;
} else if (!decidedFragmentCount.equals(fragmentCount)) {
return "Cannot Defragment " + flowFile + " because it is grouped with another FlowFile, and the two have differing values for the "
+ FRAGMENT_COUNT_ATTRIBUTE + " attribute: " + decidedFragmentCount + " and " + fragmentCount;
if (fragmentCount != null) {
if (!isNumber(fragmentCount)) {
return "Cannot Defragment " + flowFile + " because it does not have an integer value for the " + FRAGMENT_COUNT_ATTRIBUTE + " attribute";
} else if (decidedFragmentCount == null) {
decidedFragmentCount = fragmentCount;
} else if (!decidedFragmentCount.equals(fragmentCount)) {
return "Cannot Defragment " + flowFile + " because it is grouped with another FlowFile, and the two have differing values for the "
+ FRAGMENT_COUNT_ATTRIBUTE + " attribute: " + decidedFragmentCount + " and " + fragmentCount;
}
}
}
if (decidedFragmentCount == null) {
return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because no FlowFile arrived with the " + FRAGMENT_COUNT_ATTRIBUTE + " attribute "
+ "and the expected number of fragments is unknown";
}
final int numericFragmentCount;
try {
numericFragmentCount = Integer.parseInt(decidedFragmentCount);

View File

@ -58,9 +58,11 @@
The "Defragment" Merge Strategy can be used when FlowFiles need to be explicitly assigned to the same bin. For example, if data is split apart using
the UnpackContent Processor, each unpacked FlowFile can be processed independently and later merged back together using this Processor with the
Merge Strategy set to Defragment. In order for FlowFiles to be added to the same bin when using this configuration, the FlowFiles must have the same
value for the "fragment.identifier" attribute. Each FlowFile with the same identifier must also have the same value for the "fragment.count" attribute
(which indicates how many FlowFiles belong in the bin) and a unique value for the "fragment.index" attribute so that the FlowFiles can be ordered
correctly. <b>NOTE:</b> while there are valid use cases for breaking apart FlowFiles and later re-merging them, it is an anti-pattern to take a larger FlowFile,
value for the "fragment.identifier" attribute. Each FlowFile with the same identifier must also have a unique value for the "fragment.index" attribute
so that the FlowFiles can be ordered correctly. For a given "fragment.identifier", at least one FlowFile must have the "fragment.count" attribute
(which indicates how many FlowFiles belong in the bin). Other FlowFiles with the same identifier must have the same value for the "fragment.count" attribute,
or they can omit this attribute.
<b>NOTE:</b> while there are valid use cases for breaking apart FlowFiles and later re-merging them, it is an anti-pattern to take a larger FlowFile,
break it into a million tiny FlowFiles, and then re-merge them. Doing so can result in using huge amounts of Java heap and can result in Out Of Memory Errors.
Additionally, it adds large amounts of load to the NiFi framework. This can result in increased CPU and disk utilization and often times can be an order of magnitude
lower throughput and an order of magnitude higher latency. As an alternative, whenever possible, dataflows should be built to make use of Record-oriented processors,
@ -70,7 +72,7 @@
<p>
In order to be added to the same bin, two FlowFiles must be 'like FlowFiles.' In order for two FlowFiles to be like FlowFiles, they must have the same
schema, and if the &lt;Correlation Attribute Name&gt; property is set, they must have the same value for the specified attribute. For example, if the
&lt;Correlation Attribute Name&gt; is set to "filename" then two FlowFiles must have the same value for the "filename" attribute in order to be binned
&lt;Correlation Attribute Name&gt; is set to "filename", then two FlowFiles must have the same value for the "filename" attribute in order to be binned
together. If more than one attribute is needed in order to correlate two FlowFiles, it is recommended to use an UpdateAttribute processor before the
MergeContent processor and combine the attributes. For example, if the goal is to bin together two FlowFiles only if they have the same value for the
"abc" attribute and the "xyz" attribute, then we could accomplish this by using UpdateAttribute and adding a property with name "correlation.attribute"
@ -87,7 +89,7 @@
</p>
<p>
If the &lt;Merge Strategy&gt; property is set to "Bin Packing Algorithm" then then the following rules will be evaluated.
If the &lt;Merge Strategy&gt; property is set to "Bin Packing Algorithm", then the following rules will be evaluated.
</p>
<p>
@ -107,7 +109,7 @@
</p>
<p>
If the &lt;Merge Strategy&gt; property is set to "Defragment" then a bin is full only when the number of FlowFiles in the bin is equal to the number specified
If the &lt;Merge Strategy&gt; property is set to "Defragment", then a bin is full only when the number of FlowFiles in the bin is equal to the number specified
by the "fragment.count" attribute of one of the FlowFiles in the bin. All FlowFiles that have this attribute must have the same value for this attribute,
or else they will be routed to the "failure" relationship. It is not necessary that all FlowFiles have this value, but at least one FlowFile in the bin must have
this value or the bin will never be complete. If all of the necessary FlowFiles are not binned together by the point at which the bin times amount
@ -152,7 +154,7 @@
</tr>
<tr>
<td>BIN_MANAGER_FULL</td>
<td>If an incoming FlowFile does not fit into any of the existing Bins (either due to the Maximum thresholds set, or due to the Correlation Attribute being used, etc.) then a new Bin
<td>If an incoming FlowFile does not fit into any of the existing Bins (either due to the Maximum thresholds set, or due to the Correlation Attribute being used, etc.), then a new Bin
must be created for the incoming FlowFiles. If the number of active Bins is already equal to the &lt;Maximum number of Bins&gt; property, the oldest Bin will be merged in order to
make room for the new Bin. In that case, the Bin Manager is said to be full, and this value will be used.</td>
</tr>

View File

@ -869,7 +869,99 @@ public class TestMergeContent {
}
@Test
public void testDefragmentDuplicateFragement() throws IOException, InterruptedException {
public void testDefragmentWithFragmentCountOnLastFragmentOnly() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
final Map<String, String> attributes = new HashMap<>();
attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
}
@Test
public void testDefragmentWithFragmentCountOnMiddleFragment() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
final String fragmentId = "Fragment Id";
runner.enqueue("Fragment 1 without count ".getBytes("UTF-8"), new HashMap<String, String>() {{
put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
}});
runner.enqueue("Fragment 2 with count ".getBytes("UTF-8"), new HashMap<String, String>() {{
put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "3");
}});
runner.enqueue("Fragment 3 without count".getBytes("UTF-8"), new HashMap<String, String>() {{
put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
}});
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assembled.assertContentEquals("Fragment 1 without count Fragment 2 with count Fragment 3 without count".getBytes("UTF-8"));
}
@Test
public void testDefragmentWithDifferentFragmentCounts() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
final String fragmentId = "Fragment Id";
runner.enqueue("Fragment 1 with count ".getBytes("UTF-8"), new HashMap<String, String>() {{
put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "2");
}});
runner.enqueue("Fragment 2 with count ".getBytes("UTF-8"), new HashMap<String, String>() {{
put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "3");
}});
runner.enqueue("Fragment 3 without count".getBytes("UTF-8"), new HashMap<String, String>() {{
put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
}});
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 0);
runner.assertTransferCount(MergeContent.REL_FAILURE, 3);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 0);
}
@Test
public void testDefragmentDuplicateFragment() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
@ -904,7 +996,7 @@ public class TestMergeContent {
}
@Test
public void testDefragmentWithTooManyFragements() throws IOException {
public void testDefragmentWithTooManyFragments() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_ENTRIES, "3");