mirror of https://github.com/apache/nifi.git
* Add "Remove All Content" property, related functionality, and tests to ModifyBytes processor to allow deletion of all flow file content.
* Removed @Ignore annotation on class and unnecessary EOL translation of test data. Because ModifyBytes treat input as binary data, not text, line endings don't matter as long as they byte offsets are calculated correctly. * Replace validator with .allowableValues. This closes #890.
This commit is contained in:
parent
3378426f35
commit
02071103d0
|
@ -51,7 +51,7 @@ import org.apache.nifi.util.StopWatch;
|
||||||
@SideEffectFree
|
@SideEffectFree
|
||||||
@Tags({"binary", "discard", "keep"})
|
@Tags({"binary", "discard", "keep"})
|
||||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
@CapabilityDescription("Keep or discard bytes range from a binary file.")
|
@CapabilityDescription("Discard byte range at the start and end or all content of a binary file.")
|
||||||
public class ModifyBytes extends AbstractProcessor {
|
public class ModifyBytes extends AbstractProcessor {
|
||||||
|
|
||||||
// Relationships
|
// Relationships
|
||||||
|
@ -74,6 +74,13 @@ public class ModifyBytes extends AbstractProcessor {
|
||||||
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||||
.defaultValue("0 B")
|
.defaultValue("0 B")
|
||||||
.build();
|
.build();
|
||||||
|
public static final PropertyDescriptor REMOVE_ALL = new PropertyDescriptor.Builder()
|
||||||
|
.name("Remove All Content")
|
||||||
|
.description("Remove all content from the FlowFile superseding Start Offset and End Offset properties.")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("false")
|
||||||
|
.build();
|
||||||
private final List<PropertyDescriptor> propDescriptors;
|
private final List<PropertyDescriptor> propDescriptors;
|
||||||
|
|
||||||
public ModifyBytes() {
|
public ModifyBytes() {
|
||||||
|
@ -84,6 +91,7 @@ public class ModifyBytes extends AbstractProcessor {
|
||||||
ArrayList<PropertyDescriptor> pds = new ArrayList<>();
|
ArrayList<PropertyDescriptor> pds = new ArrayList<>();
|
||||||
pds.add(START_OFFSET);
|
pds.add(START_OFFSET);
|
||||||
pds.add(END_OFFSET);
|
pds.add(END_OFFSET);
|
||||||
|
pds.add(REMOVE_ALL);
|
||||||
propDescriptors = Collections.unmodifiableList(pds);
|
propDescriptors = Collections.unmodifiableList(pds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +116,8 @@ public class ModifyBytes extends AbstractProcessor {
|
||||||
|
|
||||||
final int startOffset = context.getProperty(START_OFFSET).asDataSize(DataUnit.B).intValue();
|
final int startOffset = context.getProperty(START_OFFSET).asDataSize(DataUnit.B).intValue();
|
||||||
final int endOffset = context.getProperty(END_OFFSET).asDataSize(DataUnit.B).intValue();
|
final int endOffset = context.getProperty(END_OFFSET).asDataSize(DataUnit.B).intValue();
|
||||||
final int newFileSize = (int) ff.getSize() - startOffset - endOffset;
|
final boolean removeAll = context.getProperty(REMOVE_ALL).asBoolean();
|
||||||
|
final int newFileSize = removeAll ? 0 : (int) ff.getSize() - startOffset - endOffset;
|
||||||
|
|
||||||
final StopWatch stopWatch = new StopWatch(true);
|
final StopWatch stopWatch = new StopWatch(true);
|
||||||
if (newFileSize <= 0) {
|
if (newFileSize <= 0) {
|
||||||
|
|
|
@ -19,28 +19,44 @@ package org.apache.nifi.processors.standard;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.regex.Pattern;
|
|
||||||
|
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@Ignore
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
public class TestModifyBytes {
|
public class TestModifyBytes {
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ModifyBytes treats FlowFiles as binary content, not line oriented text, so the tests use byte offsets
|
||||||
|
* and are not line oriented. Any changes to the test data files needs to be considered based on the
|
||||||
|
* byte offset impacts of any end-of-line changing edits.
|
||||||
|
*
|
||||||
|
* The test data files are assumed to be in Unix end-of-line format (i.e. LF).
|
||||||
|
*/
|
||||||
|
|
||||||
|
private final Path testFilePath = Paths.get("src/test/resources/TestModifyBytes/testFile.txt");
|
||||||
|
private final Path noFooterPath = Paths.get("src/test/resources/TestModifyBytes/noFooter.txt");
|
||||||
|
private final Path noHeaderPath = Paths.get("src/test/resources/TestModifyBytes/noHeader.txt");
|
||||||
|
private final Path noFooterNoHeaderPath = Paths.get("src/test/resources/TestModifyBytes/noFooter_noHeader.txt");
|
||||||
|
|
||||||
|
private final File testFile = testFilePath.toFile();
|
||||||
|
private final File noFooterFile = noFooterPath.toFile();
|
||||||
|
private final File noHeaderFile = noHeaderPath.toFile();
|
||||||
|
private final File noFooterNoHeaderFile = noFooterNoHeaderPath.toFile();
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReturnEmptyFile() throws IOException {
|
public void testReturnEmptyFile() throws IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new ModifyBytes());
|
final TestRunner runner = TestRunners.newTestRunner(new ModifyBytes());
|
||||||
runner.setProperty(ModifyBytes.START_OFFSET, "1 MB");
|
runner.setProperty(ModifyBytes.START_OFFSET, "1 MB");
|
||||||
runner.setProperty(ModifyBytes.END_OFFSET, "1 MB");
|
runner.setProperty(ModifyBytes.END_OFFSET, "1 MB");
|
||||||
|
|
||||||
runner.enqueue(Paths.get("src/test/resources/TestModifyBytes/testFile.txt"));
|
runner.enqueue(testFilePath);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
||||||
|
@ -54,12 +70,12 @@ public class TestModifyBytes {
|
||||||
runner.setProperty(ModifyBytes.START_OFFSET, "0 MB");
|
runner.setProperty(ModifyBytes.START_OFFSET, "0 MB");
|
||||||
runner.setProperty(ModifyBytes.END_OFFSET, "0 MB");
|
runner.setProperty(ModifyBytes.END_OFFSET, "0 MB");
|
||||||
|
|
||||||
runner.enqueue(Paths.get("src/test/resources/TestModifyBytes/testFile.txt"));
|
runner.enqueue(testFilePath);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
||||||
final MockFlowFile out = runner.getFlowFilesForRelationship(ModifyBytes.REL_SUCCESS).get(0);
|
final MockFlowFile out = runner.getFlowFilesForRelationship(ModifyBytes.REL_SUCCESS).get(0);
|
||||||
out.assertContentEquals(translateNewLines(new File("src/test/resources/TestModifyBytes/testFile.txt")));
|
out.assertContentEquals(testFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -68,14 +84,14 @@ public class TestModifyBytes {
|
||||||
runner.setProperty(ModifyBytes.START_OFFSET, "12 B"); //REMOVE - '<<<HEADER>>>'
|
runner.setProperty(ModifyBytes.START_OFFSET, "12 B"); //REMOVE - '<<<HEADER>>>'
|
||||||
runner.setProperty(ModifyBytes.END_OFFSET, "0 MB");
|
runner.setProperty(ModifyBytes.END_OFFSET, "0 MB");
|
||||||
|
|
||||||
runner.enqueue(Paths.get("src/test/resources/TestModifyBytes/testFile.txt"));
|
runner.enqueue(testFilePath);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
||||||
final MockFlowFile out = runner.getFlowFilesForRelationship(ModifyBytes.REL_SUCCESS).get(0);
|
final MockFlowFile out = runner.getFlowFilesForRelationship(ModifyBytes.REL_SUCCESS).get(0);
|
||||||
final String outContent = new String(out.toByteArray(), StandardCharsets.UTF_8);
|
final String outContent = new String(out.toByteArray(), StandardCharsets.UTF_8);
|
||||||
System.out.println(outContent);
|
System.out.println(outContent);
|
||||||
out.assertContentEquals(translateNewLines(new File("src/test/resources/TestModifyBytes/noHeader.txt")));
|
out.assertContentEquals(noHeaderFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -84,7 +100,7 @@ public class TestModifyBytes {
|
||||||
runner.setProperty(ModifyBytes.START_OFFSET, "181 B");
|
runner.setProperty(ModifyBytes.START_OFFSET, "181 B");
|
||||||
runner.setProperty(ModifyBytes.END_OFFSET, "0 B");
|
runner.setProperty(ModifyBytes.END_OFFSET, "0 B");
|
||||||
|
|
||||||
runner.enqueue(Paths.get("src/test/resources/TestModifyBytes/testFile.txt"));
|
runner.enqueue(testFilePath);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
||||||
|
@ -100,7 +116,7 @@ public class TestModifyBytes {
|
||||||
runner.setProperty(ModifyBytes.START_OFFSET, "0 B");
|
runner.setProperty(ModifyBytes.START_OFFSET, "0 B");
|
||||||
runner.setProperty(ModifyBytes.END_OFFSET, "181 B");
|
runner.setProperty(ModifyBytes.END_OFFSET, "181 B");
|
||||||
|
|
||||||
runner.enqueue(Paths.get("src/test/resources/TestModifyBytes/testFile.txt"));
|
runner.enqueue(testFilePath);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
||||||
|
@ -114,14 +130,14 @@ public class TestModifyBytes {
|
||||||
runner.setProperty(ModifyBytes.START_OFFSET, "0 B");
|
runner.setProperty(ModifyBytes.START_OFFSET, "0 B");
|
||||||
runner.setProperty(ModifyBytes.END_OFFSET, "12 B");
|
runner.setProperty(ModifyBytes.END_OFFSET, "12 B");
|
||||||
|
|
||||||
runner.enqueue(Paths.get("src/test/resources/TestModifyBytes/testFile.txt"));
|
runner.enqueue(testFilePath);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
||||||
final MockFlowFile out = runner.getFlowFilesForRelationship(ModifyBytes.REL_SUCCESS).get(0);
|
final MockFlowFile out = runner.getFlowFilesForRelationship(ModifyBytes.REL_SUCCESS).get(0);
|
||||||
final String outContent = new String(out.toByteArray(), StandardCharsets.UTF_8);
|
final String outContent = new String(out.toByteArray(), StandardCharsets.UTF_8);
|
||||||
System.out.println(outContent);
|
System.out.println(outContent);
|
||||||
out.assertContentEquals(translateNewLines(new File("src/test/resources/TestModifyBytes/noFooter.txt")));
|
out.assertContentEquals(noFooterFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -130,14 +146,14 @@ public class TestModifyBytes {
|
||||||
runner.setProperty(ModifyBytes.START_OFFSET, "12 B");
|
runner.setProperty(ModifyBytes.START_OFFSET, "12 B");
|
||||||
runner.setProperty(ModifyBytes.END_OFFSET, "12 B");
|
runner.setProperty(ModifyBytes.END_OFFSET, "12 B");
|
||||||
|
|
||||||
runner.enqueue(Paths.get("src/test/resources/TestModifyBytes/testFile.txt"));
|
runner.enqueue(testFilePath);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
||||||
final MockFlowFile out = runner.getFlowFilesForRelationship(ModifyBytes.REL_SUCCESS).get(0);
|
final MockFlowFile out = runner.getFlowFilesForRelationship(ModifyBytes.REL_SUCCESS).get(0);
|
||||||
final String outContent = new String(out.toByteArray(), StandardCharsets.UTF_8);
|
final String outContent = new String(out.toByteArray(), StandardCharsets.UTF_8);
|
||||||
System.out.println(outContent);
|
System.out.println(outContent);
|
||||||
out.assertContentEquals(translateNewLines(new File("src/test/resources/TestModifyBytes/noFooter_noHeader.txt")));
|
out.assertContentEquals(noFooterNoHeaderFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -146,7 +162,7 @@ public class TestModifyBytes {
|
||||||
runner.setProperty(ModifyBytes.START_OFFSET, "97 B");
|
runner.setProperty(ModifyBytes.START_OFFSET, "97 B");
|
||||||
runner.setProperty(ModifyBytes.END_OFFSET, "97 B");
|
runner.setProperty(ModifyBytes.END_OFFSET, "97 B");
|
||||||
|
|
||||||
runner.enqueue(Paths.get("src/test/resources/TestModifyBytes/testFile.txt"));
|
runner.enqueue(testFilePath);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
||||||
|
@ -160,7 +176,7 @@ public class TestModifyBytes {
|
||||||
runner.setProperty(ModifyBytes.START_OFFSET, "94 B");
|
runner.setProperty(ModifyBytes.START_OFFSET, "94 B");
|
||||||
runner.setProperty(ModifyBytes.END_OFFSET, "96 B");
|
runner.setProperty(ModifyBytes.END_OFFSET, "96 B");
|
||||||
|
|
||||||
runner.enqueue(Paths.get("src/test/resources/TestModifyBytes/testFile.txt"));
|
runner.enqueue(testFilePath);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
||||||
|
@ -170,20 +186,61 @@ public class TestModifyBytes {
|
||||||
out.assertContentEquals("Dew".getBytes("UTF-8"));
|
out.assertContentEquals("Dew".getBytes("UTF-8"));
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte[] translateNewLines(final File file) throws IOException {
|
@Test
|
||||||
return translateNewLines(file.toPath());
|
public void testRemoveAllContent() throws IOException {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new ModifyBytes());
|
||||||
|
runner.setProperty(ModifyBytes.START_OFFSET, "0 B");
|
||||||
|
runner.setProperty(ModifyBytes.END_OFFSET, "0 B");
|
||||||
|
runner.setProperty(ModifyBytes.REMOVE_ALL, "true");
|
||||||
|
|
||||||
|
runner.enqueue(testFilePath);
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
||||||
|
final MockFlowFile out = runner.getFlowFilesForRelationship(ModifyBytes.REL_SUCCESS).get(0);
|
||||||
|
assertEquals(0L, out.getSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte[] translateNewLines(final Path path) throws IOException {
|
@Test
|
||||||
final byte[] data = Files.readAllBytes(path);
|
public void testRemoveAllOverridesWhenSet() throws IOException {
|
||||||
final String text = new String(data, StandardCharsets.UTF_8);
|
final TestRunner runner = TestRunners.newTestRunner(new ModifyBytes());
|
||||||
return translateNewLines(text).getBytes(StandardCharsets.UTF_8);
|
runner.setProperty(ModifyBytes.START_OFFSET, "10 B");
|
||||||
|
runner.setProperty(ModifyBytes.END_OFFSET, "10 B");
|
||||||
|
runner.setProperty(ModifyBytes.REMOVE_ALL, "true");
|
||||||
|
|
||||||
|
runner.enqueue(testFilePath);
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
||||||
|
final MockFlowFile out = runner.getFlowFilesForRelationship(ModifyBytes.REL_SUCCESS).get(0);
|
||||||
|
assertEquals(0L, out.getSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
private String translateNewLines(final String text) {
|
@Test
|
||||||
final String lineSeparator = System.getProperty("line.separator");
|
public void testRemoveAllNoOverridesWhenFalse() throws IOException {
|
||||||
final Pattern pattern = Pattern.compile("\n", Pattern.MULTILINE);
|
final TestRunner runner = TestRunners.newTestRunner(new ModifyBytes());
|
||||||
final String translated = pattern.matcher(text).replaceAll(lineSeparator);
|
runner.setProperty(ModifyBytes.START_OFFSET, "10 B");
|
||||||
return translated;
|
runner.setProperty(ModifyBytes.END_OFFSET, "10 B");
|
||||||
|
runner.setProperty(ModifyBytes.REMOVE_ALL, "false");
|
||||||
|
|
||||||
|
runner.enqueue(testFilePath);
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ModifyBytes.REL_SUCCESS, 1);
|
||||||
|
final MockFlowFile out = runner.getFlowFilesForRelationship(ModifyBytes.REL_SUCCESS).get(0);
|
||||||
|
assertEquals(testFile.length() - 20, out.getSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckAllowableValues() throws IOException {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new ModifyBytes());
|
||||||
|
runner.setProperty(ModifyBytes.REMOVE_ALL, "maybe");
|
||||||
|
runner.assertNotValid();
|
||||||
|
runner.setProperty(ModifyBytes.REMOVE_ALL, "true");
|
||||||
|
runner.assertValid();
|
||||||
|
runner.setProperty(ModifyBytes.REMOVE_ALL, "false");
|
||||||
|
runner.assertValid();
|
||||||
|
runner.setProperty(ModifyBytes.REMOVE_ALL, "certainly");
|
||||||
|
runner.assertNotValid();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue