From 3cf65261aa141ef116b762d821cc5dbe26bb2bdd Mon Sep 17 00:00:00 2001 From: bbende Date: Tue, 19 May 2015 21:11:50 -0400 Subject: [PATCH] NIFI-446 Adding a Delimiter Strategy to MergeContent --- .../processors/standard/MergeContent.java | 92 ++++++++++++++++-- .../processors/standard/TestMergeContent.java | 97 +++++++++++++++++++ 2 files changed, 179 insertions(+), 10 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index b11dee3090..2883a758e2 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -23,6 +23,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -49,6 +50,8 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; @@ -130,6 +133,11 @@ public class MergeContent extends BinFiles { + "have the attributes , , and or alternatively (for backward compatibility " + "purposes) , , and "); + public static final AllowableValue DELIMITER_STRATEGY_FILENAME = new AllowableValue( + "Filename", "Filename", "The values of Header, Footer, and Demarcator will be retrieved from the contents of a file"); + public static final AllowableValue DELIMITER_STRATEGY_TEXT = new AllowableValue( + "Text", "Text", "The values of Header, Footer, and Demarcator will be specified as property values"); + public static final String MERGE_FORMAT_TAR_VALUE = "TAR"; public static final String MERGE_FORMAT_ZIP_VALUE = "ZIP"; public static final String MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE = "FlowFile Stream, v3"; @@ -210,26 +218,40 @@ public class MergeContent extends BinFiles { .defaultValue(null) .build(); + public static final PropertyDescriptor DELIMITER_STRATEGY = new PropertyDescriptor.Builder() + .required(true) + .name("Delimiter Strategy") + .description("Determines if Header, Footer, and Demarcator should point to files containing the respective content, or if " + + "the values of the properties should be used as the content.") + .allowableValues(DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT) + .defaultValue(DELIMITER_STRATEGY_FILENAME.getValue()) + .build(); public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder() .name("Header File") + .displayName("Header") .description("Filename specifying the header to use. If not specified, no header is supplied. This property is valid only when using the " + "binary-concatenation merge strategy; otherwise, it is ignored.") .required(false) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor FOOTER = new PropertyDescriptor.Builder() .name("Footer File") + .displayName("Footer") .description("Filename specifying the footer to use. If not specified, no footer is supplied. This property is valid only when using the " + "binary-concatenation merge strategy; otherwise, it is ignored.") .required(false) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor DEMARCATOR = new PropertyDescriptor.Builder() .name("Demarcator File") + .displayName("Demarcator") .description("Filename specifying the demarcator to use. If not specified, no demarcator is supplied. This property is valid only when " + "using the binary-concatenation merge strategy; otherwise, it is ignored.") .required(false) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder() .name("Compression Level") @@ -274,6 +296,7 @@ public class MergeContent extends BinFiles { descriptors.add(MAX_SIZE); descriptors.add(MAX_BIN_AGE); descriptors.add(MAX_BIN_COUNT); + descriptors.add(DELIMITER_STRATEGY); descriptors.add(HEADER); descriptors.add(FOOTER); descriptors.add(DEMARCATOR); @@ -282,6 +305,30 @@ public class MergeContent extends BinFiles { return descriptors; } + @Override + protected Collection additionalCustomValidation(ValidationContext context) { + Collection results = new ArrayList<>(); + + final String delimiterStrategy = context.getProperty(DELIMITER_STRATEGY).getValue(); + if(DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategy)) { + final String headerValue = context.getProperty(HEADER).getValue(); + if (headerValue != null) { + results.add(StandardValidators.FILE_EXISTS_VALIDATOR.validate(HEADER.getName(), headerValue, context)); + } + + final String footerValue = context.getProperty(FOOTER).getValue(); + if (footerValue != null) { + results.add(StandardValidators.FILE_EXISTS_VALIDATOR.validate(FOOTER.getName(), footerValue, context)); + } + + final String demarcatorValue = context.getProperty(DEMARCATOR).getValue(); + if (demarcatorValue != null) { + results.add(StandardValidators.FILE_EXISTS_VALIDATOR.validate(DEMARCATOR.getName(), demarcatorValue, context)); + } + } + return results; + } + private byte[] readContent(final String filename) throws IOException { return Files.readAllBytes(Paths.get(filename)); } @@ -479,7 +526,7 @@ public class MergeContent extends BinFiles { bundle = session.write(bundle, new OutputStreamCallback() { @Override public void process(final OutputStream out) throws IOException { - final byte[] header = getDescriptorFileContent(context, wrappers, HEADER); + final byte[] header = getDelimiterContent(context, wrappers, HEADER); if (header != null) { out.write(header); } @@ -496,7 +543,7 @@ public class MergeContent extends BinFiles { }); if (itr.hasNext()) { - final byte[] demarcator = getDescriptorFileContent(context, wrappers, DEMARCATOR); + final byte[] demarcator = getDelimiterContent(context, wrappers, DEMARCATOR); if (demarcator != null) { out.write(demarcator); } @@ -513,7 +560,7 @@ public class MergeContent extends BinFiles { } } - final byte[] footer = getDescriptorFileContent(context, wrappers, FOOTER); + final byte[] footer = getDelimiterContent(context, wrappers, FOOTER); if (footer != null) { out.write(footer); } @@ -529,12 +576,22 @@ public class MergeContent extends BinFiles { return bundle; } - private byte[] getDescriptorFileContent(final ProcessContext context, final List wrappers, final PropertyDescriptor descriptor) + private byte[] getDelimiterContent(final ProcessContext context, final List wrappers, final PropertyDescriptor descriptor) + throws IOException { + final String delimiterStrategyValue = context.getProperty(DELIMITER_STRATEGY).getValue(); + if (DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategyValue)) { + return getDelimiterFileContent(context, wrappers, descriptor); + } else { + return getDelimiterTextContent(context, wrappers, descriptor); + } + } + + private byte[] getDelimiterFileContent(final ProcessContext context, final List wrappers, final PropertyDescriptor descriptor) throws IOException { byte[] property = null; - final String descriptorFile = context.getProperty(descriptor).getValue(); - if (descriptorFile != null && wrappers != null && wrappers.size() > 0) { - final String content = new String(readContent(descriptorFile)); + final String descriptorValue = context.getProperty(descriptor).evaluateAttributeExpressions().getValue(); + if (descriptorValue != null && wrappers != null && wrappers.size() > 0) { + final String content = new String(readContent(descriptorValue)); final FlowFileSessionWrapper wrapper = wrappers.get(0); if (wrapper != null && content != null) { final FlowFile flowFile = wrapper.getFlowFile(); @@ -547,6 +604,21 @@ public class MergeContent extends BinFiles { return property; } + private byte[] getDelimiterTextContent(final ProcessContext context, final List wrappers, final PropertyDescriptor descriptor) + throws IOException { + byte[] property = null; + if (wrappers != null && wrappers.size() > 0) { + final FlowFileSessionWrapper wrapper = wrappers.get(0); + if (wrapper != null) { + final FlowFile flowFile = wrapper.getFlowFile(); + if (flowFile != null) { + property = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue().getBytes(); + } + } + } + return property; + } + @Override public String getMergedContentType() { return mimeType; diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index a657453c54..d2952d2d68 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -33,9 +34,12 @@ import java.util.zip.ZipInputStream; import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.io.IOUtils; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.stream.io.ByteArrayInputStream; import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; @@ -69,6 +73,99 @@ public class TestMergeContent { bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); } + @Test + public void testSimpleBinaryConcatWithTextDelimiters() throws IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); + runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); + runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_TEXT); + runner.setProperty(MergeContent.HEADER, "@"); + runner.setProperty(MergeContent.DEMARCATOR, "#"); + runner.setProperty(MergeContent.FOOTER, "$"); + + createFlowFiles(runner); + runner.run(); + + runner.assertQueueEmpty(); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + bundle.assertContentEquals("@Hello#, #World!$".getBytes("UTF-8")); + bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); + } + + @Test + public void testSimpleBinaryConcatWithFileDelimiters() throws IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); + runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); + runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_FILENAME); + runner.setProperty(MergeContent.HEADER, getClass().getResource("/TestMergeContent/head").getPath()); + runner.setProperty(MergeContent.DEMARCATOR, getClass().getResource("/TestMergeContent/demarcate").getPath()); + runner.setProperty(MergeContent.FOOTER, getClass().getResource("/TestMergeContent/foot").getPath()); + + createFlowFiles(runner); + runner.run(); + + runner.assertQueueEmpty(); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + bundle.assertContentEquals("(|)Hello***, ***World!___".getBytes("UTF-8")); + bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); + } + + @Test + public void testTextDelimitersValidation() throws IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); + runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); + runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_TEXT); + runner.setProperty(MergeContent.HEADER, ""); + runner.setProperty(MergeContent.DEMARCATOR, ""); + runner.setProperty(MergeContent.FOOTER, ""); + + Collection results = new HashSet<>(); + ProcessContext context = runner.getProcessContext(); + if (context instanceof MockProcessContext) { + MockProcessContext mockContext = (MockProcessContext)context; + results = mockContext.validate(); + } + + Assert.assertEquals(3, results.size()); + for (ValidationResult vr : results) { + Assert.assertTrue(vr.toString().contains("cannot be empty")); + } + } + + @Test + public void testFileDelimitersValidation() throws IOException, InterruptedException { + final String doesNotExistFile = "src/test/resources/TestMergeContent/does_not_exist"; + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); + runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); + runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_FILENAME); + runner.setProperty(MergeContent.HEADER, doesNotExistFile); + runner.setProperty(MergeContent.DEMARCATOR, doesNotExistFile); + runner.setProperty(MergeContent.FOOTER, doesNotExistFile); + + Collection results = new HashSet<>(); + ProcessContext context = runner.getProcessContext(); + if (context instanceof MockProcessContext) { + MockProcessContext mockContext = (MockProcessContext)context; + results = mockContext.validate(); + } + + Assert.assertEquals(3, results.size()); + for (ValidationResult vr : results) { + Assert.assertTrue(vr.toString().contains("is invalid because File " + doesNotExistFile + " does not exist")); + } + } + @Test public void testMimeTypeIsOctetStreamIfConflictingWithBinaryConcat() throws IOException, InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent());