NIFI-10925 Refactored TestPutSFTP with discrete test methods

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #6746.
This commit is contained in:
exceptionfactory 2022-12-01 14:44:32 -06:00 committed by Pierre Villard
parent beb0f920d5
commit c51411d360
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
1 changed files with 116 additions and 115 deletions

View File

@ -21,7 +21,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
import org.apache.nifi.processors.standard.util.SSHTestServer;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
@ -33,14 +33,33 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
class TestPutSFTP {
private static final String FLOW_FILE_CONTENTS = TestPutSFTP.class.getSimpleName();
private static final String LOCALHOST = "localhost";
private static final String LOCALHOST_ADDRESS = "127.0.0.1";
private static final String REMOTE_DIRECTORY = "nifi_test/";
private static final String FIRST_FILENAME = "1.txt";
private static final String TRANSFER_HOST_ATTRIBUTE = "transfer-host";
private static final int BATCH_SIZE = 2;
private static final byte[] ZERO_BYTES = new byte[]{};
private static final String TRANSIT_URI_FORMAT = "sftp://%s";
private SSHTestServer sshTestServer;
private TestRunner runner;
@ -51,16 +70,16 @@ class TestPutSFTP {
sshTestServer.startServer();
runner = TestRunners.newTestRunner(PutSFTP.class);
runner.setProperty(SFTPTransfer.HOSTNAME, "localhost");
runner.setProperty(SFTPTransfer.HOSTNAME, LOCALHOST);
runner.setProperty(SFTPTransfer.PORT, Integer.toString(sshTestServer.getSSHPort()));
runner.setProperty(SFTPTransfer.USERNAME, sshTestServer.getUsername());
runner.setProperty(SFTPTransfer.PASSWORD, sshTestServer.getPassword());
runner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, "false");
runner.setProperty(SFTPTransfer.BATCH_SIZE, "2");
runner.setProperty(SFTPTransfer.REMOTE_PATH, "nifi_test/");
runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true");
runner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, Boolean.FALSE.toString());
runner.setProperty(SFTPTransfer.BATCH_SIZE, Integer.toString(BATCH_SIZE));
runner.setProperty(SFTPTransfer.REMOTE_PATH, REMOTE_DIRECTORY);
runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, Boolean.TRUE.toString());
runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE);
runner.setProperty(SFTPTransfer.CREATE_DIRECTORY, "true");
runner.setProperty(SFTPTransfer.CREATE_DIRECTORY, Boolean.TRUE.toString());
runner.setProperty(SFTPTransfer.DATA_TIMEOUT, "30 sec");
runner.setValidateExpressionUsage(false);
}
@ -74,176 +93,158 @@ class TestPutSFTP {
@Test
void testRunNewDirectory() {
runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
//verify directory exists
Path newDirectory = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/");
Path newFile = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/1.txt");
Path newDirectory = Paths.get(sshTestServer.getVirtualFileSystemPath() + REMOTE_DIRECTORY);
Path newFile = Paths.get(sshTestServer.getVirtualFileSystemPath() + REMOTE_DIRECTORY + FIRST_FILENAME);
assertTrue(newDirectory.toAbsolutePath().toFile().exists(), "New Directory not created");
assertTrue(newFile.toAbsolutePath().toFile().exists(), "New File not created");
runner.clearTransferState();
}
@Test
void testRunZeroByteFile() {
runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "2.txt"));
void testRunZeroByteFileRejected() {
runner.enqueue(ZERO_BYTES, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
//Two files in batch, should have only 1 transferred to sucess, 1 to failure
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
runner.assertTransferCount(PutSFTP.REL_REJECT, 1);
runner.clearTransferState();
runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
runner.run();
//One files in batch, should have 0 transferred to output since it's zero byte
runner.assertTransferCount(PutSFTP.REL_REJECT, 1);
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
runner.clearTransferState();
//allow zero byte files
runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false");
runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
runner.run();
//should have 1 transferred to sucess
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
//revert settings
runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true");
runner.clearTransferState();
}
@Test
void testRunConflictResolution() throws IOException {
final String directoryName = "nifi_test";
final String filename = "1";
void testRunZeroByteFileAllowed() {
runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, Boolean.FALSE.toString());
runner.enqueue(ZERO_BYTES, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
final Path directory = Paths.get(sshTestServer.getVirtualFileSystemPath() + directoryName );
final Path subDirectory = Paths.get(directory.toString(), filename);
Files.createDirectory(directory);
Files.createFile(subDirectory);
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
}
final Map<String, String> flowFileAttributes = Collections.singletonMap(CoreAttributes.FILENAME.key(), filename);
@Test
void testRunConflictResolutionReplaceStrategy() throws IOException {
createRemoteFile();
// REPLACE Strategy
runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE);
runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
runner.assertTransferCount(PutSFTP.REL_FAILURE, 0);
runner.clearTransferState();
}
@Test
void testRunConflictResolutionRejectStrategy() throws IOException {
createRemoteFile();
// REJECT Strategy
runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REJECT);
runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
runner.assertTransferCount(PutSFTP.REL_REJECT, 1);
runner.assertTransferCount(PutSFTP.REL_FAILURE, 0);
runner.clearTransferState();
}
@Test
void testRunConflictResolutionIgnoreStrategy() throws IOException {
createRemoteFile();
// IGNORE Strategy
runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_IGNORE);
runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
runner.assertTransferCount(PutSFTP.REL_FAILURE, 0);
}
runner.clearTransferState();
@Test
void testRunConflictResolutionFailStrategy() throws IOException {
createRemoteFile();
// FAIL Strategy
runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_FAIL);
runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
runner.assertTransferCount(PutSFTP.REL_FAILURE, 1);
runner.clearTransferState();
}
@Test
void testRunBatching() {
runner.setProperty(SFTPTransfer.BATCH_SIZE, "2");
runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "2.txt"));
runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "3.txt"));
runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "4.txt"));
runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "5.txt"));
final int files = 4;
for (int fileNumber = 1; fileNumber <= files; fileNumber++) {
final String filename = Integer.toString(fileNumber);
runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), filename));
}
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
runner.assertTransferCount(PutSFTP.REL_SUCCESS, BATCH_SIZE);
runner.clearTransferState();
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
runner.clearTransferState();
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
runner.clearTransferState();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, BATCH_SIZE);
runner.assertQueueEmpty();
}
@Test
void testRunTransitUri() {
runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false");
Map<String,String> attributes = new HashMap<>();
attributes.put("filename", "testfile.txt");
attributes.put("transfer-host","localhost");
runner.enqueue(FLOW_FILE_CONTENTS, attributes);
runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
attributes = new HashMap<>();
attributes.put("filename", "testfile1.txt");
attributes.put("transfer-host","127.0.0.1");
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
final List<ProvenanceEventRecord> records = runner.getProvenanceEvents();
assertFalse(records.isEmpty());
final ProvenanceEventRecord record = records.iterator().next();
final String firstTransitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST);
assertTrue(record.getTransitUri().startsWith(firstTransitUri), "Transit URI not found");
}
@Test
void testRunTransitUriDifferentHosts() {
runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, Boolean.FALSE.toString());
runner.setProperty(SFTPTransfer.HOSTNAME, "${transfer-host}");
final Map<String, String> firstAttributes = new LinkedHashMap<>();
firstAttributes.put(CoreAttributes.FILENAME.key(), FIRST_FILENAME);
firstAttributes.put(TRANSFER_HOST_ATTRIBUTE, LOCALHOST);
runner.enqueue(FLOW_FILE_CONTENTS, firstAttributes);
final Map<String, String> secondAttributes = new LinkedHashMap<>();
secondAttributes.put(CoreAttributes.FILENAME.key(), FIRST_FILENAME);
secondAttributes.put(TRANSFER_HOST_ATTRIBUTE, LOCALHOST_ADDRESS);
runner.enqueue(FLOW_FILE_CONTENTS, secondAttributes);
runner.enqueue(FLOW_FILE_CONTENTS, attributes);
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
runner.getProvenanceEvents().forEach(k -> assertTrue(k.getTransitUri().contains("sftp://localhost")));
//Two files in batch, should have 2 transferred to success, 0 to failure
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
MockFlowFile flowFile1 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(0);
MockFlowFile flowFile2 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(1);
runner.clearProvenanceEvents();
runner.clearTransferState();
final List<ProvenanceEventRecord> records = runner.getProvenanceEvents();
//Test different destinations on flow file attributes
runner.setProperty(SFTPTransfer.HOSTNAME,"${transfer-host}"); //set to derive hostname
final String firstTransitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST);
final Optional<ProvenanceEventRecord> firstRecord = records.stream()
.filter(record -> record.getTransitUri().startsWith(firstTransitUri))
.findFirst();
assertTrue(firstRecord.isPresent(), "First Transit URI not found");
runner.setThreadCount(1);
runner.enqueue(flowFile1);
runner.enqueue(flowFile2);
runner.run();
final String secondTransitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST_ADDRESS);
final Optional<ProvenanceEventRecord> secondRecord = records.stream()
.filter(record -> record.getTransitUri().startsWith(secondTransitUri))
.findFirst();
assertTrue(secondRecord.isPresent(), "Second Transit URI not found");
}
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
assertTrue(runner.getProvenanceEvents().get(0).getTransitUri().contains("sftp://localhost"));
assertTrue(runner.getProvenanceEvents().get(1).getTransitUri().contains("sftp://127.0.0.1"));
runner.clearProvenanceEvents();
runner.clearTransferState();
private void createRemoteFile() throws IOException {
final Path directory = Paths.get(sshTestServer.getVirtualFileSystemPath() + REMOTE_DIRECTORY);
final Path subDirectory = Paths.get(directory.toString(), FIRST_FILENAME);
Files.createDirectory(directory);
Files.createFile(subDirectory);
}
}