NIFI-63: If MergeContent is set to defragment and we dont have enough fragments, consider this an error and route all FlowFiles for that bin to failure.

This commit is contained in:
Mark Payne 2014-12-30 11:02:52 -05:00
parent 7589ad356e
commit 3997f9b2b0
2 changed files with 75 additions and 4 deletions

View File

@ -320,6 +320,8 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
int binsAdded = binFlowFiles(context, sessionFactory);
getLogger().debug("Binned {} FlowFiles", new Object[] {binsAdded});
if (!isScheduled()) {
return;
}
@ -402,6 +404,8 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
final ProcessorLog logger = getLogger();
final ProcessSession session = sessionFactory.createSession();
final Set<Bin> committedBins = new HashSet<>();
for (final Bin unmodifiableBin : bins) {
final List<FlowFileSessionWrapper> binCopy = new ArrayList<>(unmodifiableBin.getContents());
@ -410,6 +414,12 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
if (error != null) {
final String binDescription = binCopy.size() <= 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
logger.error(error + "; routing {} to failure", new Object[]{binDescription});
for ( final FlowFileSessionWrapper wrapper : binCopy ) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
wrapper.getSession().commit();
committedBins.add(unmodifiableBin);
}
continue;
}
@ -453,6 +463,11 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
// across multiple sessions, we cannot guarantee atomicity across the sessions
session.commit();
for (final Bin unmodifiableBin : bins) {
// If this bin's session has been committed, move on.
if ( committedBins.contains(unmodifiableBin) ) {
continue;
}
for (final FlowFileSessionWrapper wrapper : unmodifiableBin.getContents()) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_ORIGINAL);
wrapper.getSession().commit();
@ -516,6 +531,7 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
// If we are defragmenting, all fragments must have the appropriate attributes.
String decidedFragmentCount = null;
String fragmentIdentifier = null;
for (final FlowFileSessionWrapper flowFileWrapper : bin) {
final FlowFile flowFile = flowFileWrapper.getFlowFile();
@ -523,6 +539,8 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
if (!isNumber(fragmentIndex)) {
return "Cannot Defragment " + flowFile + " because it does not have an integer value for the " + FRAGMENT_INDEX_ATTRIBUTE + " attribute";
}
fragmentIdentifier = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE);
if (!isNumber(fragmentCount)) {
@ -533,6 +551,21 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
return "Cannot Defragment " + flowFile + " because it is grouped with another FlowFile, and the two have differing values for the " + FRAGMENT_COUNT_ATTRIBUTE + " attribute: " + decidedFragmentCount + " and " + fragmentCount;
}
}
final int numericFragmentCount;
try {
numericFragmentCount = Integer.parseInt(decidedFragmentCount);
} catch (final NumberFormatException nfe) {
return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because the " + FRAGMENT_COUNT_ATTRIBUTE + " has a non-integer value of " + decidedFragmentCount;
}
if ( bin.size() < numericFragmentCount ) {
return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because the expected number of fragments is " + decidedFragmentCount + " but found only " + bin.size() + " fragments";
}
if ( bin.size() > numericFragmentCount ) {
return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because the expected number of fragments is " + decidedFragmentCount + " but found " + bin.size() + " fragments for this identifier";
}
return null;
}

View File

@ -30,21 +30,26 @@ import java.util.Map;
import java.util.Set;
import java.util.zip.ZipInputStream;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.io.IOUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
public class TestMergeContent {
@BeforeClass
public static void setup() {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "DEBUG");
}
@Test
public void testSimpleBinaryConcat() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
@ -263,6 +268,39 @@ public class TestMergeContent {
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
}
@Test
public void testDefragmentWithTooFewFragments() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "2 secs");
final Map<String, String> attributes = new HashMap<>();
attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "5");
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
runner.run(1, false);
while (true) {
try {
Thread.sleep(3000L);
break;
} catch (final InterruptedException ie) {}
}
runner.run(1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 4);
}
@Test
public void testDefragmentOutOfOrder() throws IOException {