mirror of https://github.com/apache/nifi.git
NIFI-4828: Fix MergeContent to process all ready bins
Before this fix, MergeContent only processed the first bin even if there were multiple bins. There were two unit tests marked with Ignore those had been failing because of this. This closes #2444. Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
parent
d7da04209a
commit
e9af6c6ad8
|
@ -217,17 +217,11 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
private int processBins(final ProcessContext context) {
|
private int processBins(final ProcessContext context) {
|
||||||
final Bin bin = readyBins.poll();
|
|
||||||
if (bin == null) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
final List<Bin> bins = new ArrayList<>();
|
|
||||||
bins.add(bin);
|
|
||||||
|
|
||||||
final ComponentLog logger = getLogger();
|
final ComponentLog logger = getLogger();
|
||||||
|
int processedBins = 0;
|
||||||
boolean binAlreadyCommitted = false;
|
Bin bin;
|
||||||
|
while ((bin = readyBins.poll()) != null) {
|
||||||
|
boolean binAlreadyCommitted;
|
||||||
try {
|
try {
|
||||||
binAlreadyCommitted = this.processBin(bin, context);
|
binAlreadyCommitted = this.processBin(bin, context);
|
||||||
} catch (final ProcessException e) {
|
} catch (final ProcessException e) {
|
||||||
|
@ -238,12 +232,12 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
|
||||||
binSession.transfer(flowFile, REL_FAILURE);
|
binSession.transfer(flowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
binSession.commit();
|
binSession.commit();
|
||||||
return 1;
|
continue;
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[] {bin.getContents().size(), e});
|
logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[] {bin.getContents().size(), e});
|
||||||
|
|
||||||
bin.getSession().rollback();
|
bin.getSession().rollback();
|
||||||
return 1;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this bin's session has been committed, move on.
|
// If this bin's session has been committed, move on.
|
||||||
|
@ -253,12 +247,15 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
|
||||||
binSession.commit();
|
binSession.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
return 1;
|
processedBins++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return processedBins;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
|
private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
|
||||||
int flowFilesBinned = 0;
|
int flowFilesBinned = 0;
|
||||||
while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
|
while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger()) {
|
||||||
if (!isScheduled()) {
|
if (!isScheduled()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -290,6 +287,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
|
||||||
bin.offer(flowFile, session);
|
bin.offer(flowFile, session);
|
||||||
this.readyBins.add(bin);
|
this.readyBins.add(bin);
|
||||||
}
|
}
|
||||||
|
flowFilesBinned += entry.getValue().size();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -175,21 +175,7 @@ public class BinManager {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<Bin> currentBins = groupBinMap.get(groupIdentifier);
|
final List<Bin> currentBins = groupBinMap.computeIfAbsent(groupIdentifier, k -> new ArrayList<>());
|
||||||
if (currentBins == null) { // this is a new group we need to register
|
|
||||||
final List<Bin> bins = new ArrayList<>();
|
|
||||||
final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
|
|
||||||
maxEntries.get(), fileCountAttribute.get());
|
|
||||||
bins.add(bin);
|
|
||||||
groupBinMap.put(groupIdentifier, bins);
|
|
||||||
binCount++;
|
|
||||||
|
|
||||||
final boolean added = bin.offer(flowFile, session);
|
|
||||||
if (!added) {
|
|
||||||
unbinned.add(flowFile);
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
for (final Bin bin : currentBins) {
|
for (final Bin bin : currentBins) {
|
||||||
final boolean accepted = bin.offer(flowFile, session);
|
final boolean accepted = bin.offer(flowFile, session);
|
||||||
if (accepted) {
|
if (accepted) {
|
||||||
|
@ -197,7 +183,8 @@ public class BinManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one
|
// if we've reached this point then the groupIdentifier was a brand new one,
|
||||||
|
// or we couldn't fit it into any existing bins - gotta make a new one
|
||||||
final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
|
final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
|
||||||
maxEntries.get(), fileCountAttribute.get());
|
maxEntries.get(), fileCountAttribute.get());
|
||||||
currentBins.add(bin);
|
currentBins.add(bin);
|
||||||
|
@ -207,8 +194,6 @@ public class BinManager {
|
||||||
unbinned.add(flowFile);
|
unbinned.add(flowFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
wLock.unlock();
|
wLock.unlock();
|
||||||
|
|
|
@ -57,7 +57,6 @@ import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestMergeContent {
|
public class TestMergeContent {
|
||||||
|
@ -911,7 +910,6 @@ public class TestMergeContent {
|
||||||
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
|
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("this test appears to be faulty")
|
|
||||||
@Test
|
@Test
|
||||||
public void testDefragmentMultipleMingledSegments() throws IOException {
|
public void testDefragmentMultipleMingledSegments() throws IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
|
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
|
||||||
|
@ -941,7 +939,7 @@ public class TestMergeContent {
|
||||||
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
|
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
|
||||||
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
|
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
|
||||||
|
|
||||||
runner.run(2);
|
runner.run(1);
|
||||||
|
|
||||||
runner.assertTransferCount(MergeContent.REL_MERGED, 2);
|
runner.assertTransferCount(MergeContent.REL_MERGED, 2);
|
||||||
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
|
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
|
||||||
|
@ -1007,7 +1005,6 @@ public class TestMergeContent {
|
||||||
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
|
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("This test appears to be a fail...is retuning 1 instead of 2...needs work")
|
|
||||||
@Test
|
@Test
|
||||||
public void testMergeBasedOnCorrelation() throws IOException, InterruptedException {
|
public void testMergeBasedOnCorrelation() throws IOException, InterruptedException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
|
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
|
||||||
|
@ -1028,7 +1025,7 @@ public class TestMergeContent {
|
||||||
attributes.put("attr", "b");
|
attributes.put("attr", "b");
|
||||||
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
|
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
|
||||||
|
|
||||||
runner.run(2);
|
runner.run(1);
|
||||||
|
|
||||||
runner.assertTransferCount(MergeContent.REL_MERGED, 2);
|
runner.assertTransferCount(MergeContent.REL_MERGED, 2);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue