1
0
mirror of https://github.com/apache/nifi.git synced 2025-02-14 14:05:26 +00:00

NIFI-9348 NIFI-7863 This closes . Added temporary suffix and fixed [NIFI-7863] creation of the directories

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Gabriel Barbu 2021-10-29 16:03:42 +03:00 committed by Joe Witt
parent 64495e99e9
commit 6caffca811
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
3 changed files with 204 additions and 35 deletions
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src
main/java/org/apache/nifi/processors/smb
test/java/org/apache/nifi/processors/smb

@ -51,6 +51,7 @@ import java.net.URI;
import com.hierynomus.smbj.SMBClient;
import com.hierynomus.smbj.connection.Connection;
import com.hierynomus.smbj.auth.AuthenticationContext;
import com.hierynomus.smbj.share.DiskEntry;
import com.hierynomus.smbj.share.DiskShare;
import com.hierynomus.smbj.session.Session;
import com.hierynomus.msfscc.FileAttributes;
@ -148,6 +149,12 @@ public class PutSmbFile extends AbstractProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("100")
.build();
public static final PropertyDescriptor RENAME_SUFFIX = new PropertyDescriptor.Builder()
.name("Temporary Suffix")
.description("A temporary suffix which will be apended to the filename while it's transfering. After the transfer is complete, the suffix will be removed.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Files that have been successfully written to the output network path are transferred to this relationship")
@ -178,6 +185,7 @@ public class PutSmbFile extends AbstractProcessor {
descriptors.add(SHARE_ACCESS);
descriptors.add(CONFLICT_RESOLUTION);
descriptors.add(BATCH_SIZE);
descriptors.add(RENAME_SUFFIX);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
@ -236,6 +244,29 @@ public class PutSmbFile extends AbstractProcessor {
this.smbClient = smbClient;
}
private void createMissingDirectoriesRecursevly(ComponentLog logger, DiskShare share, String pathToCreate) {
List<String> paths = new ArrayList<>();
java.io.File file = new java.io.File(pathToCreate);
paths.add(file.getPath());
while (file.getParent() != null) {
String parent = file.getParent();
paths.add(parent);
file = new java.io.File(parent);
}
Collections.reverse(paths);
for (String path : paths) {
if (!share.folderExists(path)) {
logger.debug("Creating folder {}", new Object[]{path});
share.mkdir(path);
} else {
logger.debug("Folder already exists {}. Moving on", new Object[]{path});
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
@ -268,33 +299,40 @@ public class PutSmbFile extends AbstractProcessor {
DiskShare share = (DiskShare) smbSession.connectShare(shareName)) {
for (FlowFile flowFile : flowFiles) {
String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final long sendStart = System.nanoTime();
String fullPath;
final long processingStartTime = System.nanoTime();
if (directory == null) {
directory = "";
fullPath = filename;
final String destinationDirectory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final String destinationFilename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
String destinationFullPath;
// build destination path for the flowfile
if (destinationDirectory == null || destinationDirectory.trim().isEmpty()) {
destinationFullPath = destinationFilename;
} else {
fullPath = directory + "\\" + filename;
// missing directory handling
if (context.getProperty(CREATE_DIRS).asBoolean() && !share.folderExists(directory)) {
logger.debug("Creating folder {}", new Object[]{directory});
share.mkdir(directory);
}
destinationFullPath = new java.io.File(destinationDirectory, destinationFilename).getPath();
}
final URI uri = new URI("smb", hostname, "/" + fullPath.replace('\\', '/'), null);
// handle missing directory
final String destinationFileParentDirectory = new java.io.File(destinationFullPath).getParent();
final Boolean createMissingDirectories = context.getProperty(CREATE_DIRS).asBoolean();
if (!createMissingDirectories && !share.folderExists(destinationFileParentDirectory)) {
flowFile = session.penalize(flowFile);
logger.warn(
"Penalizing {} and routing to failure as configured because the destination directory ({}) doesn't exist",
new Object[]{ flowFile, destinationFileParentDirectory });
session.transfer(flowFile, REL_FAILURE);
continue;
} else if (!share.folderExists(destinationFileParentDirectory)) {
createMissingDirectoriesRecursevly(logger, share, destinationFileParentDirectory);
}
// replace strategy handling
SMB2CreateDisposition createDisposition = SMB2CreateDisposition.FILE_OVERWRITE_IF;
// handle conflict resolution
final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
if (!conflictResolution.equals(REPLACE_RESOLUTION) && share.fileExists(fullPath)) {
if (share.fileExists(destinationFullPath)) {
if (conflictResolution.equals(IGNORE_RESOLUTION)) {
session.transfer(flowFile, REL_SUCCESS);
logger.info("Transferring {} to success because file with same name already exists", new Object[]{flowFile});
logger.info("Transferring {} to success as configured because file with same name already exists", new Object[]{flowFile});
continue;
} else if (conflictResolution.equals(FAIL_RESOLUTION)) {
flowFile = session.penalize(flowFile);
@ -304,27 +342,61 @@ public class PutSmbFile extends AbstractProcessor {
}
}
// handle temporary suffix
final String renameSuffixValue = context.getProperty(RENAME_SUFFIX).getValue();
final Boolean renameSuffix = renameSuffixValue != null && !renameSuffixValue.trim().isEmpty();
String finalDestinationFullPath = destinationFullPath;
if (renameSuffix) {
finalDestinationFullPath += renameSuffixValue;
}
try (File f = share.openFile(
fullPath,
// handle the transfer
try (
File shareDestinationFile = share.openFile(
finalDestinationFullPath,
EnumSet.of(AccessMask.GENERIC_WRITE),
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
sharedAccess,
createDisposition,
SMB2CreateDisposition.FILE_OVERWRITE_IF,
EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH));
OutputStream os = f.getOutputStream()) {
session.exportTo(flowFile, os);
final long sendNanos = System.nanoTime() - sendStart;
final long sendMillis = TimeUnit.MILLISECONDS.convert(sendNanos, TimeUnit.NANOSECONDS);
session.getProvenanceReporter().send(flowFile, uri.toString(), sendMillis);
session.transfer(flowFile, REL_SUCCESS);
OutputStream shareDestinationFileOutputStream = shareDestinationFile.getOutputStream()) {
session.exportTo(flowFile, shareDestinationFileOutputStream);
} catch (Exception e) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Penalizing {} and routing to 'failure' because of error {}", new Object[]{flowFile, e});
logger.error("Cannot transfer the file. Penalizing {} and routing to 'failure' because of error {}", new Object[]{flowFile, e});
continue;
}
// handle the rename
if (renameSuffix) {
try(DiskEntry fileDiskEntry = share.open(
finalDestinationFullPath,
EnumSet.of(AccessMask.DELETE, AccessMask.GENERIC_WRITE),
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
sharedAccess,
SMB2CreateDisposition.FILE_OPEN,
EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH))) {
// normalize path slashes for the network share
destinationFullPath = destinationFullPath.replace("/", "\\");
// rename the file on the share and replace it in case it exists
fileDiskEntry.rename(destinationFullPath, true);
} catch (Exception e) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Cannot rename the file. Penalizing {} and routing to 'failure' because of error {}", new Object[]{flowFile, e});
continue;
}
}
// handle the success
final URI provenanceUri = new URI("smb", hostname, "/" + destinationFullPath.replace('\\', '/'), null);
final long processingTimeInNano = System.nanoTime() - processingStartTime;
final long processingTimeInMilli = TimeUnit.MILLISECONDS.convert(processingTimeInNano, TimeUnit.NANOSECONDS);
session.getProvenanceReporter().send(flowFile, provenanceUri.toString(), processingTimeInMilli);
session.transfer(flowFile, REL_SUCCESS);
}
} catch (Exception e) {
session.transfer(flowFiles, REL_FAILURE);

@ -37,7 +37,6 @@ import org.junit.jupiter.api.Test;
import org.mockito.MockitoAnnotations;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
@ -60,7 +59,6 @@ public class GetSmbFileTest {
private Connection connection;
private Session session;
private DiskShare diskShare;
private ByteArrayOutputStream baOutputStream;
private final static String HOSTNAME = "host";
private final static String SHARE = "share";
@ -75,8 +73,6 @@ public class GetSmbFileTest {
session = mock(Session.class);
diskShare = mock(DiskShare.class);
baOutputStream = new ByteArrayOutputStream();
when(smbClient.connect(any(String.class))).thenReturn(connection);
when(connection.authenticate(any(AuthenticationContext.class))).thenReturn(session);
when(session.connectShare(SHARE)).thenReturn(diskShare);

@ -22,6 +22,7 @@ import com.hierynomus.smbj.SMBClient;
import com.hierynomus.smbj.auth.AuthenticationContext;
import com.hierynomus.smbj.connection.Connection;
import com.hierynomus.smbj.session.Session;
import com.hierynomus.smbj.share.DiskEntry;
import com.hierynomus.smbj.share.DiskShare;
import com.hierynomus.smbj.share.File;
import org.apache.nifi.util.TestRunner;
@ -42,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -55,6 +57,7 @@ public class PutSmbFileTest {
private Connection connection;
private Session session;
private DiskShare diskShare;
private DiskEntry diskEntry;
private File smbfile;
private ByteArrayOutputStream baOutputStream;
@ -75,6 +78,7 @@ public class PutSmbFileTest {
connection = mock(Connection.class);
session = mock(Session.class);
diskShare = mock(DiskShare.class);
diskEntry = mock(DiskEntry.class);
smbfile = mock(File.class);
baOutputStream = new ByteArrayOutputStream();
@ -89,6 +93,14 @@ public class PutSmbFileTest {
any(SMB2CreateDisposition.class),
anySet()
)).thenReturn(smbfile);
when(diskShare.open(
any(String.class),
anySet(),
anySet(),
anySet(),
any(SMB2CreateDisposition.class),
anySet()
)).thenReturn(diskEntry);
when(smbfile.getOutputStream()).thenReturn(baOutputStream);
testRunner.setProperty(PutSmbFile.HOSTNAME, HOSTNAME);
@ -171,9 +183,26 @@ public class PutSmbFileTest {
testDirectoryCreation("true", 1);
}
@Test
public void testDirectoriesCreatedWhenDontExists() throws IOException {
final String directory = new java.io.File("a\\b\\c\\b\\e").getPath();
final int count = directory.split(java.util.regex.Pattern.quote(java.io.File.separator)).length;
when(diskShare.folderExists(DIRECTORY)).thenReturn(false);
testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
testRunner.setProperty(PutSmbFile.DIRECTORY, directory);
testRunner.enqueue("data");
testRunner.run();
verify(diskShare, times(count)).mkdir(
any(String.class)
);
}
@Test
public void testFileShareNone() throws IOException {
testRunner.setProperty(PutSmbFile.SHARE_ACCESS, PutSmbFile.SHARE_ACCESS_NONE);
testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
Set<SMB2ShareAccess> shareAccessSet = testOpenFileShareAccess();
assertTrue(shareAccessSet.isEmpty());
}
@ -181,6 +210,7 @@ public class PutSmbFileTest {
@Test
public void testFileShareRead() throws IOException {
testRunner.setProperty(PutSmbFile.SHARE_ACCESS, PutSmbFile.SHARE_ACCESS_READ);
testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
Set<SMB2ShareAccess> shareAccessSet = testOpenFileShareAccess();
assertTrue(shareAccessSet.contains(SMB2ShareAccess.FILE_SHARE_READ));
}
@ -188,6 +218,7 @@ public class PutSmbFileTest {
@Test
public void testFileShareReadWriteDelete() throws IOException {
testRunner.setProperty(PutSmbFile.SHARE_ACCESS, PutSmbFile.SHARE_ACCESS_READWRITEDELETE);
testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
Set<SMB2ShareAccess> shareAccessSet = testOpenFileShareAccess();
assertTrue(shareAccessSet.contains(SMB2ShareAccess.FILE_SHARE_READ));
assertTrue(shareAccessSet.contains(SMB2ShareAccess.FILE_SHARE_WRITE));
@ -208,6 +239,76 @@ public class PutSmbFileTest {
testRunner.assertAllFlowFilesTransferred(PutSmbFile.REL_SUCCESS);
}
@Test
public void testTemporarySuffixIsUnset() throws IOException {
testRunner.enqueue("data");
testRunner.run();
verify(diskShare, never()).open(
any(String.class),
anySet(),
anySet(),
anySet(),
any(SMB2CreateDisposition.class),
anySet()
);
}
@Test
public void testTemporarySuffixIsSet() throws IOException {
final String suffix = ".test";
testRunner.setProperty(PutSmbFile.RENAME_SUFFIX, suffix);
testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
testRunner.enqueue("data");
testRunner.run();
ArgumentCaptor<String> filename = ArgumentCaptor.forClass(String.class);
verify(diskShare, times(1)).open(
filename.capture(),
anySet(),
anySet(),
anySet(),
any(SMB2CreateDisposition.class),
anySet()
);
assertTrue(filename.getValue().endsWith(suffix), "Suffix is not present");
}
@Test
public void testTemporarySuffixIsSetRenameIsCalled() throws IOException {
final String suffix = ".test";
testRunner.setProperty(PutSmbFile.RENAME_SUFFIX, suffix);
testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
testRunner.enqueue("data");
testRunner.run();
ArgumentCaptor<String> initialFilename = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> finalFilename = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Boolean> replace = ArgumentCaptor.forClass(Boolean.class);
verify(diskShare, times(1)).open(
initialFilename.capture(),
anySet(),
anySet(),
anySet(),
any(SMB2CreateDisposition.class),
anySet()
);
verify(diskEntry, times(1)).rename(
finalFilename.capture(),
replace.capture()
);
assertTrue(initialFilename.getValue().endsWith(suffix), "Suffix is not present and it should be");
assertTrue(!finalFilename.getValue().endsWith(suffix), "Suffix is present and it shouldn't be");
assertTrue(replace.getValue(), "Replace flag shold be true");
}
@Test
public void testConnectionError() throws IOException {
String emsg = "mock connection exception";