NIFI-4658 set Maximum Number of Entries to required and allow FlowFiles having fragment.count greater than Max Entries property

Signed-off-by: Mike Moser <mosermw@apache.org>

This closes #2559
This commit is contained in:
Mark Bean 2018-03-16 19:43:29 -04:00 committed by Mike Moser
parent b7e1f48133
commit c59b6fdf66
4 changed files with 43 additions and 9 deletions

View File

@ -48,10 +48,10 @@ public class Bin {
private volatile int maximumEntries = Integer.MAX_VALUE;
private final String fileCountAttribute;
final List<FlowFile> binContents = new ArrayList<>();
private final List<FlowFile> binContents = new ArrayList<>();
private final Set<String> binIndexSet = new HashSet<>();
long size;
int successiveFailedOfferings = 0;
private long size;
private int successiveFailedOfferings = 0;
/**
* Constructs a new bin
@ -141,11 +141,11 @@ public class Bin {
if (fileCountAttribute != null) {
final String countValue = flowFile.getAttribute(fileCountAttribute);
final Integer count = toInteger(countValue);
if (count != null) {
int currentMaxEntries = this.maximumEntries;
this.maximumEntries = Math.min(count, currentMaxEntries);
this.minimumEntries = currentMaxEntries;
if (count == null) {
return false;
}
this.maximumEntries = count;
this.minimumEntries = count;
final String index = flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE);
if (index == null || index.isEmpty() || !binIndexSet.add(index)) {

View File

@ -72,9 +72,9 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
.build();
public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder()
.name("Maximum Number of Entries")
.description("The maximum number of files to include in a bundle. If not specified, there is no maximum.")
.description("The maximum number of files to include in a bundle")
.defaultValue("1000")
.required(false)
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();

View File

@ -76,6 +76,10 @@ public class BinManager {
this.fileCountAttribute.set(fileCountAttribute);
}
public String getFileCountAttribute() {
return fileCountAttribute.get();
}
public void setMinimumEntries(final int minimumEntries) {
this.minEntries.set(minimumEntries);
}

View File

@ -87,6 +87,9 @@ public class TestMergeContent {
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assertEquals(1024 * 6, bundle.getSize());
// Queue should not be empty because the first FlowFile will be transferred back to the input queue
// when we run out @OnStopped logic, since it won't be transferred to any bin.
runner.assertQueueNotEmpty();
@ -886,6 +889,33 @@ public class TestMergeContent {
runner.assertTransferCount(MergeContent.REL_MERGED, 0);
}
@Test
public void testDefragmentWithTooManyFragements() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
final Map<String, String> attributes = new HashMap<>();
attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
runner.run();
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
}
@Test
public void testDefragmentWithTooFewFragments() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());