Merge branch 'NIFI-446' of https://github.com/bbende/incubator-nifi into NIFI-446-merge-content-delimiter-strategy

This commit is contained in:
Mark Payne 2015-05-20 14:47:53 -04:00
commit f247e515dc
2 changed files with 179 additions and 10 deletions

View File

@ -23,6 +23,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@ -49,6 +50,8 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
@ -130,6 +133,11 @@ public class MergeContent extends BinFiles {
+ "have the attributes <fragment.identifier>, <fragment.count>, and <fragment.index> or alternatively (for backward compatibility "
+ "purposes) <segment.identifier>, <segment.count>, and <segment.index>");
public static final AllowableValue DELIMITER_STRATEGY_FILENAME = new AllowableValue(
"Filename", "Filename", "The values of Header, Footer, and Demarcator will be retrieved from the contents of a file");
public static final AllowableValue DELIMITER_STRATEGY_TEXT = new AllowableValue(
"Text", "Text", "The values of Header, Footer, and Demarcator will be specified as property values");
public static final String MERGE_FORMAT_TAR_VALUE = "TAR";
public static final String MERGE_FORMAT_ZIP_VALUE = "ZIP";
public static final String MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE = "FlowFile Stream, v3";
@ -210,26 +218,40 @@ public class MergeContent extends BinFiles {
.defaultValue(null)
.build();
public static final PropertyDescriptor DELIMITER_STRATEGY = new PropertyDescriptor.Builder()
.required(true)
.name("Delimiter Strategy")
.description("Determines if Header, Footer, and Demarcator should point to files containing the respective content, or if "
+ "the values of the properties should be used as the content.")
.allowableValues(DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT)
.defaultValue(DELIMITER_STRATEGY_FILENAME.getValue())
.build();
public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
.name("Header File")
.displayName("Header")
.description("Filename specifying the header to use. If not specified, no header is supplied. This property is valid only when using the "
+ "binary-concatenation merge strategy; otherwise, it is ignored.")
.required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor FOOTER = new PropertyDescriptor.Builder()
.name("Footer File")
.displayName("Footer")
.description("Filename specifying the footer to use. If not specified, no footer is supplied. This property is valid only when using the "
+ "binary-concatenation merge strategy; otherwise, it is ignored.")
.required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor DEMARCATOR = new PropertyDescriptor.Builder()
.name("Demarcator File")
.displayName("Demarcator")
.description("Filename specifying the demarcator to use. If not specified, no demarcator is supplied. This property is valid only when "
+ "using the binary-concatenation merge strategy; otherwise, it is ignored.")
.required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder()
.name("Compression Level")
@ -274,6 +296,7 @@ public class MergeContent extends BinFiles {
descriptors.add(MAX_SIZE);
descriptors.add(MAX_BIN_AGE);
descriptors.add(MAX_BIN_COUNT);
descriptors.add(DELIMITER_STRATEGY);
descriptors.add(HEADER);
descriptors.add(FOOTER);
descriptors.add(DEMARCATOR);
@ -282,6 +305,30 @@ public class MergeContent extends BinFiles {
return descriptors;
}
@Override
protected Collection<ValidationResult> additionalCustomValidation(ValidationContext context) {
Collection<ValidationResult> results = new ArrayList<>();
final String delimiterStrategy = context.getProperty(DELIMITER_STRATEGY).getValue();
if(DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategy)) {
final String headerValue = context.getProperty(HEADER).getValue();
if (headerValue != null) {
results.add(StandardValidators.FILE_EXISTS_VALIDATOR.validate(HEADER.getName(), headerValue, context));
}
final String footerValue = context.getProperty(FOOTER).getValue();
if (footerValue != null) {
results.add(StandardValidators.FILE_EXISTS_VALIDATOR.validate(FOOTER.getName(), footerValue, context));
}
final String demarcatorValue = context.getProperty(DEMARCATOR).getValue();
if (demarcatorValue != null) {
results.add(StandardValidators.FILE_EXISTS_VALIDATOR.validate(DEMARCATOR.getName(), demarcatorValue, context));
}
}
return results;
}
private byte[] readContent(final String filename) throws IOException {
return Files.readAllBytes(Paths.get(filename));
}
@ -479,7 +526,7 @@ public class MergeContent extends BinFiles {
bundle = session.write(bundle, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
final byte[] header = getDescriptorFileContent(context, wrappers, HEADER);
final byte[] header = getDelimiterContent(context, wrappers, HEADER);
if (header != null) {
out.write(header);
}
@ -496,7 +543,7 @@ public class MergeContent extends BinFiles {
});
if (itr.hasNext()) {
final byte[] demarcator = getDescriptorFileContent(context, wrappers, DEMARCATOR);
final byte[] demarcator = getDelimiterContent(context, wrappers, DEMARCATOR);
if (demarcator != null) {
out.write(demarcator);
}
@ -513,7 +560,7 @@ public class MergeContent extends BinFiles {
}
}
final byte[] footer = getDescriptorFileContent(context, wrappers, FOOTER);
final byte[] footer = getDelimiterContent(context, wrappers, FOOTER);
if (footer != null) {
out.write(footer);
}
@ -529,12 +576,22 @@ public class MergeContent extends BinFiles {
return bundle;
}
private byte[] getDescriptorFileContent(final ProcessContext context, final List<FlowFileSessionWrapper> wrappers, final PropertyDescriptor descriptor)
private byte[] getDelimiterContent(final ProcessContext context, final List<FlowFileSessionWrapper> wrappers, final PropertyDescriptor descriptor)
throws IOException {
final String delimiterStrategyValue = context.getProperty(DELIMITER_STRATEGY).getValue();
if (DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategyValue)) {
return getDelimiterFileContent(context, wrappers, descriptor);
} else {
return getDelimiterTextContent(context, wrappers, descriptor);
}
}
private byte[] getDelimiterFileContent(final ProcessContext context, final List<FlowFileSessionWrapper> wrappers, final PropertyDescriptor descriptor)
throws IOException {
byte[] property = null;
final String descriptorFile = context.getProperty(descriptor).getValue();
if (descriptorFile != null && wrappers != null && wrappers.size() > 0) {
final String content = new String(readContent(descriptorFile));
final String descriptorValue = context.getProperty(descriptor).evaluateAttributeExpressions().getValue();
if (descriptorValue != null && wrappers != null && wrappers.size() > 0) {
final String content = new String(readContent(descriptorValue));
final FlowFileSessionWrapper wrapper = wrappers.get(0);
if (wrapper != null && content != null) {
final FlowFile flowFile = wrapper.getFlowFile();
@ -547,6 +604,21 @@ public class MergeContent extends BinFiles {
return property;
}
private byte[] getDelimiterTextContent(final ProcessContext context, final List<FlowFileSessionWrapper> wrappers, final PropertyDescriptor descriptor)
throws IOException {
byte[] property = null;
if (wrappers != null && wrappers.size() > 0) {
final FlowFileSessionWrapper wrapper = wrappers.get(0);
if (wrapper != null) {
final FlowFile flowFile = wrapper.getFlowFile();
if (flowFile != null) {
property = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue().getBytes();
}
}
}
return property;
}
@Override
public String getMergedContentType() {
return mimeType;

View File

@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -33,9 +34,12 @@ import java.util.zip.ZipInputStream;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
@ -69,6 +73,99 @@ public class TestMergeContent {
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
}
@Test
public void testSimpleBinaryConcatWithTextDelimiters() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_TEXT);
runner.setProperty(MergeContent.HEADER, "@");
runner.setProperty(MergeContent.DEMARCATOR, "#");
runner.setProperty(MergeContent.FOOTER, "$");
createFlowFiles(runner);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertContentEquals("@Hello#, #World!$".getBytes("UTF-8"));
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
}
@Test
public void testSimpleBinaryConcatWithFileDelimiters() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_FILENAME);
runner.setProperty(MergeContent.HEADER, getClass().getResource("/TestMergeContent/head").getPath());
runner.setProperty(MergeContent.DEMARCATOR, getClass().getResource("/TestMergeContent/demarcate").getPath());
runner.setProperty(MergeContent.FOOTER, getClass().getResource("/TestMergeContent/foot").getPath());
createFlowFiles(runner);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertContentEquals("(|)Hello***, ***World!___".getBytes("UTF-8"));
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
}
@Test
public void testTextDelimitersValidation() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_TEXT);
runner.setProperty(MergeContent.HEADER, "");
runner.setProperty(MergeContent.DEMARCATOR, "");
runner.setProperty(MergeContent.FOOTER, "");
Collection<ValidationResult> results = new HashSet<>();
ProcessContext context = runner.getProcessContext();
if (context instanceof MockProcessContext) {
MockProcessContext mockContext = (MockProcessContext)context;
results = mockContext.validate();
}
Assert.assertEquals(3, results.size());
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("cannot be empty"));
}
}
@Test
public void testFileDelimitersValidation() throws IOException, InterruptedException {
final String doesNotExistFile = "src/test/resources/TestMergeContent/does_not_exist";
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_FILENAME);
runner.setProperty(MergeContent.HEADER, doesNotExistFile);
runner.setProperty(MergeContent.DEMARCATOR, doesNotExistFile);
runner.setProperty(MergeContent.FOOTER, doesNotExistFile);
Collection<ValidationResult> results = new HashSet<>();
ProcessContext context = runner.getProcessContext();
if (context instanceof MockProcessContext) {
MockProcessContext mockContext = (MockProcessContext)context;
results = mockContext.validate();
}
Assert.assertEquals(3, results.size());
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because File " + doesNotExistFile + " does not exist"));
}
}
@Test
public void testMimeTypeIsOctetStreamIfConflictingWithBinaryConcat() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());