mirror of https://github.com/apache/nifi.git
NIFI-12231 FetchSmb supports Move and Delete Completion Strategies
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #8617.
This commit is contained in:
parent
bc75ef108c
commit
9f69ff233c
|
@ -25,9 +25,13 @@ import java.util.stream.Stream;
|
|||
*/
|
||||
public interface SmbClientService extends AutoCloseable {
|
||||
|
||||
Stream<SmbListableEntity> listRemoteFiles(String path);
|
||||
Stream<SmbListableEntity> listFiles(String directoryPath);
|
||||
|
||||
void createDirectory(String path);
|
||||
void ensureDirectory(String directoryPath);
|
||||
|
||||
void readFile(String fileName, OutputStream outputStream) throws IOException;
|
||||
void readFile(String filePath, OutputStream outputStream) throws IOException;
|
||||
|
||||
void moveFile(String filePath, String directoryPath);
|
||||
|
||||
void deleteFile(String filePath);
|
||||
}
|
||||
|
|
|
@ -16,16 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.smb;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.unmodifiableSet;
|
||||
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
|
||||
import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
|
@ -33,17 +23,29 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
|||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyDescriptor.Builder;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.smb.util.CompletionStrategy;
|
||||
import org.apache.nifi.services.smb.SmbClientProviderService;
|
||||
import org.apache.nifi.services.smb.SmbClientService;
|
||||
import org.apache.nifi.services.smb.SmbException;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.unmodifiableSet;
|
||||
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
|
||||
import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
|
||||
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@Tags({"samba", "smb", "cifs", "files", "fetch"})
|
||||
@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
|
||||
|
@ -57,8 +59,8 @@ public class FetchSmb extends AbstractProcessor {
|
|||
public static final String ERROR_CODE_ATTRIBUTE = "error.code";
|
||||
public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
|
||||
|
||||
public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
|
||||
.Builder().name("remote-file")
|
||||
public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor.Builder()
|
||||
.name("remote-file")
|
||||
.displayName("Remote File")
|
||||
.description("The full path of the file to be retrieved from the remote server. Expression language is supported.")
|
||||
.required(true)
|
||||
|
@ -67,91 +69,139 @@ public class FetchSmb extends AbstractProcessor {
|
|||
.addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
|
||||
public static final PropertyDescriptor COMPLETION_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("Completion Strategy")
|
||||
.description("Specifies what to do with the original file on the server once it has been processed. If the Completion Strategy fails, a warning will be "
|
||||
+ "logged but the data will still be transferred.")
|
||||
.allowableValues(CompletionStrategy.class)
|
||||
.defaultValue(CompletionStrategy.NONE)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DESTINATION_DIRECTORY = new PropertyDescriptor.Builder()
|
||||
.name("Destination Directory")
|
||||
.description("The directory on the remote server to move the original file to once it has been processed.")
|
||||
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(true)
|
||||
.dependsOn(COMPLETION_STRATEGY, CompletionStrategy.MOVE)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CREATE_DESTINATION_DIRECTORY = new PropertyDescriptor.Builder()
|
||||
.name("Create Destination Directory")
|
||||
.description("Specifies whether or not the remote directory should be created if it does not exist.")
|
||||
.required(true)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("false")
|
||||
.dependsOn(COMPLETION_STRATEGY, CompletionStrategy.MOVE)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("smb-client-provider-service")
|
||||
.displayName("SMB Client Provider Service")
|
||||
.description("Specifies the SMB client provider to use for creating SMB connections.")
|
||||
.required(true)
|
||||
.identifiesControllerService(SmbClientProviderService.class)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS =
|
||||
new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("A flowfile will be routed here for each successfully fetched file.")
|
||||
.description("A FlowFile will be routed here for each successfully fetched file.")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_FAILURE =
|
||||
new Relationship.Builder().name("failure")
|
||||
.description(
|
||||
"A flowfile will be routed here when failed to fetch its content.")
|
||||
new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("A FlowFile will be routed here when failed to fetch its content.")
|
||||
.build();
|
||||
|
||||
public static final Set<Relationship> RELATIONSHIPS = unmodifiableSet(new HashSet<>(asList(
|
||||
REL_SUCCESS,
|
||||
REL_FAILURE
|
||||
)));
|
||||
public static final String UNCATEGORIZED_ERROR = "-2";
|
||||
|
||||
private static final List<PropertyDescriptor> PROPERTIES = asList(
|
||||
SMB_CLIENT_PROVIDER_SERVICE,
|
||||
REMOTE_FILE
|
||||
REMOTE_FILE,
|
||||
COMPLETION_STRATEGY,
|
||||
DESTINATION_DIRECTORY,
|
||||
CREATE_DESTINATION_DIRECTORY
|
||||
);
|
||||
|
||||
public static final String UNCATEGORIZED_ERROR = "-2";
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return RELATIONSHIPS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return PROPERTIES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final SmbClientProviderService clientProviderService =
|
||||
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
|
||||
final Map<String, String> attributes = flowFile.getAttributes();
|
||||
final String filePath = context.getProperty(REMOTE_FILE).evaluateAttributeExpressions(attributes).getValue();
|
||||
|
||||
final SmbClientProviderService clientProviderService = context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
|
||||
|
||||
try (SmbClientService client = clientProviderService.getClient()) {
|
||||
fetchAndTransfer(session, context, client, flowFile);
|
||||
flowFile = session.write(flowFile, outputStream -> client.readFile(filePath, outputStream));
|
||||
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Couldn't connect to SMB.", e);
|
||||
getLogger().error("Could not fetch file {}.", filePath, e);
|
||||
flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
|
||||
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
session.commitAsync(() -> performCompletionStrategy(context, attributes));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return PROPERTIES;
|
||||
}
|
||||
|
||||
private void fetchAndTransfer(ProcessSession session, ProcessContext context, SmbClientService client,
|
||||
FlowFile flowFile) {
|
||||
final Map<String, String> attributes = flowFile.getAttributes();
|
||||
final String filename = context.getProperty(REMOTE_FILE)
|
||||
.evaluateAttributeExpressions(attributes).getValue();
|
||||
try {
|
||||
flowFile = session.write(flowFile, outputStream -> client.readFile(filename, outputStream));
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Couldn't fetch file {}.", filename, e);
|
||||
flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
|
||||
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, getErrorMessage(e));
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
private String getErrorCode(Exception exception) {
|
||||
private String getErrorCode(final Exception exception) {
|
||||
return Optional.ofNullable(exception instanceof SmbException ? (SmbException) exception : null)
|
||||
.map(SmbException::getErrorCode)
|
||||
.map(String::valueOf)
|
||||
.orElse(UNCATEGORIZED_ERROR);
|
||||
}
|
||||
|
||||
private String getErrorMessage(Exception exception) {
|
||||
return Optional.ofNullable(exception.getMessage())
|
||||
.orElse(exception.getClass().getSimpleName());
|
||||
private void performCompletionStrategy(final ProcessContext context, final Map<String, String> attributes) {
|
||||
final CompletionStrategy completionStrategy = context.getProperty(COMPLETION_STRATEGY).asAllowableValue(CompletionStrategy.class);
|
||||
|
||||
if (completionStrategy == CompletionStrategy.NONE) {
|
||||
return;
|
||||
}
|
||||
|
||||
final String filePath = context.getProperty(REMOTE_FILE).evaluateAttributeExpressions(attributes).getValue();
|
||||
|
||||
final SmbClientProviderService clientProviderService = context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
|
||||
|
||||
try (SmbClientService client = clientProviderService.getClient()) {
|
||||
if (completionStrategy == CompletionStrategy.MOVE) {
|
||||
final String destinationDirectory = context.getProperty(DESTINATION_DIRECTORY).evaluateAttributeExpressions(attributes).getValue();
|
||||
final boolean createDestinationDirectory = context.getProperty(CREATE_DESTINATION_DIRECTORY).asBoolean();
|
||||
|
||||
if (createDestinationDirectory) {
|
||||
client.ensureDirectory(destinationDirectory);
|
||||
}
|
||||
|
||||
client.moveFile(filePath, destinationDirectory);
|
||||
} else if (completionStrategy == CompletionStrategy.DELETE) {
|
||||
client.deleteFile(filePath);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
getLogger().warn("Could not perform completion strategy {} for file {}", completionStrategy, filePath, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -345,7 +345,7 @@ public class ListSmb extends AbstractListProcessor<SmbListableEntity> {
|
|||
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
|
||||
final String directory = getDirectory(context);
|
||||
final SmbClientService clientService = clientProviderService.getClient();
|
||||
return clientService.listRemoteFiles(directory).onClose(() -> {
|
||||
return clientService.listFiles(directory).onClose(() -> {
|
||||
try {
|
||||
clientService.close();
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.smb.util;
|
||||
|
||||
import org.apache.nifi.components.DescribedValue;
|
||||
|
||||
public enum CompletionStrategy implements DescribedValue {
|
||||
|
||||
NONE("None", "Leaves the file as-is."),
|
||||
MOVE("Move File", "Moves the file to the specified directory on the remote system. This option cannot be used when DFS is enabled on 'SMB Client Provider Service'."),
|
||||
DELETE("Delete File", "Deletes the file from the remote system.");
|
||||
|
||||
private final String displayName;
|
||||
private final String description;
|
||||
|
||||
CompletionStrategy(String displayName, String description) {
|
||||
this.displayName = displayName;
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return name();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDisplayName() {
|
||||
return displayName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
}
|
|
@ -16,52 +16,226 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.smb;
|
||||
|
||||
import static org.apache.nifi.processors.smb.FetchSmb.COMPLETION_STRATEGY;
|
||||
import static org.apache.nifi.processors.smb.FetchSmb.CREATE_DESTINATION_DIRECTORY;
|
||||
import static org.apache.nifi.processors.smb.FetchSmb.DESTINATION_DIRECTORY;
|
||||
import static org.apache.nifi.processors.smb.FetchSmb.REL_FAILURE;
|
||||
import static org.apache.nifi.processors.smb.FetchSmb.REL_SUCCESS;
|
||||
import static org.apache.nifi.processors.smb.FetchSmb.REMOTE_FILE;
|
||||
import static org.apache.nifi.util.TestRunners.newTestRunner;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.processors.smb.util.CompletionStrategy;
|
||||
import org.apache.nifi.services.smb.SmbjClientProviderService;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class FetchSmbIT extends SambaTestContainers {
|
||||
class FetchSmbIT extends SambaTestContainers {
|
||||
|
||||
@Test
|
||||
public void fetchFilesUsingEL() throws Exception {
|
||||
writeFile("/test_file", "test_content");
|
||||
TestRunner testRunner = newTestRunner(FetchSmb.class);
|
||||
testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}");
|
||||
final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true);
|
||||
private static final String TEST_CONTENT = "test_content";
|
||||
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("attribute_to_find_using_EL", "test_file");
|
||||
private TestRunner testRunner;
|
||||
|
||||
testRunner.enqueue("ignored", attributes);
|
||||
testRunner.run();
|
||||
testRunner.assertTransferCount(REL_SUCCESS, 1);
|
||||
assertEquals("test_content", testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent());
|
||||
testRunner.assertValid();
|
||||
private SmbjClientProviderService smbjClientProviderService;
|
||||
|
||||
@BeforeEach
|
||||
void setUpComponents() throws Exception {
|
||||
testRunner = newTestRunner(FetchSmb.class);
|
||||
|
||||
smbjClientProviderService = configureSmbClient(testRunner, true);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void tearDownComponents() {
|
||||
testRunner.disableControllerService(smbjClientProviderService);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void tryToFetchNonExistingFileEmitsFailure() throws Exception {
|
||||
TestRunner testRunner = newTestRunner(FetchSmb.class);
|
||||
testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}");
|
||||
final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true);
|
||||
void fetchFilesUsingEL() {
|
||||
writeFile("test_file", TEST_CONTENT);
|
||||
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}");
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("attribute_to_find_using_EL", "test_file");
|
||||
|
||||
runProcessor(attributes);
|
||||
|
||||
assertSuccessFlowFile();
|
||||
}
|
||||
|
||||
@Test
|
||||
void tryToFetchNonExistingFileEmitsFailure() throws Exception {
|
||||
testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}");
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("attribute_to_find_using_EL", "non_existing_file");
|
||||
|
||||
runProcessor(attributes);
|
||||
|
||||
testRunner.assertTransferCount(REL_FAILURE, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCompletionStrategyNone() {
|
||||
final String baseDir = "dir_none";
|
||||
final String filename = "test_file";
|
||||
final String filePath = baseDir + "/" + filename;
|
||||
|
||||
createDirectory(baseDir, AccessMode.READ_ONLY);
|
||||
writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY);
|
||||
|
||||
testRunner.setProperty(REMOTE_FILE, filePath);
|
||||
testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.NONE);
|
||||
|
||||
runProcessor();
|
||||
|
||||
assertSuccessFlowFile();
|
||||
assertNoWarning();
|
||||
|
||||
assertTrue(fileExists(filePath));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCompletionStrategyDelete() {
|
||||
final String baseDir = "dir_delete";
|
||||
final String filename = "test_file";
|
||||
final String filePath = baseDir + "/" + filename;
|
||||
|
||||
createDirectory(baseDir, AccessMode.READ_WRITE);
|
||||
writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE);
|
||||
|
||||
testRunner.setProperty(REMOTE_FILE, filePath);
|
||||
testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.DELETE);
|
||||
|
||||
runProcessor();
|
||||
|
||||
assertSuccessFlowFile();
|
||||
assertNoWarning();
|
||||
|
||||
assertFalse(fileExists(filePath));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCompletionStrategyMoveWithExistingDirectory() {
|
||||
final String baseDir = "dir_move_existing";
|
||||
final String filename = "test_file";
|
||||
final String filePath = baseDir + "/" + filename;
|
||||
final String processedDir = "processed";
|
||||
|
||||
createDirectory(baseDir, AccessMode.READ_WRITE);
|
||||
writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE);
|
||||
createDirectory(processedDir, AccessMode.READ_WRITE);
|
||||
|
||||
testRunner.setProperty(REMOTE_FILE, filePath);
|
||||
testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE);
|
||||
testRunner.setProperty(DESTINATION_DIRECTORY, processedDir);
|
||||
|
||||
runProcessor();
|
||||
|
||||
assertSuccessFlowFile();
|
||||
assertNoWarning();
|
||||
|
||||
assertFalse(fileExists(filePath));
|
||||
assertTrue(fileExists(processedDir + "/" + filename));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCompletionStrategyMoveWithCreatingDirectory() {
|
||||
final String baseDir = "dir_move_creating";
|
||||
final String filename = "test_file";
|
||||
final String filePath = baseDir + "/" + filename;
|
||||
final String processedDir = "processed";
|
||||
|
||||
createDirectory(baseDir, AccessMode.READ_WRITE);
|
||||
writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE);
|
||||
|
||||
testRunner.setProperty(REMOTE_FILE, filePath);
|
||||
testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE);
|
||||
testRunner.setProperty(DESTINATION_DIRECTORY, processedDir);
|
||||
testRunner.setProperty(CREATE_DESTINATION_DIRECTORY, "true");
|
||||
|
||||
runProcessor();
|
||||
|
||||
assertSuccessFlowFile();
|
||||
assertNoWarning();
|
||||
|
||||
assertFalse(fileExists(filePath));
|
||||
assertTrue(fileExists(processedDir + "/" + filename));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCompletionStrategyDeleteFailsWhenNoPermission() {
|
||||
final String baseDir = "dir_delete_noperm";
|
||||
final String filename = "test_file";
|
||||
final String filePath = baseDir + "/" + filename;
|
||||
|
||||
createDirectory(baseDir, AccessMode.READ_ONLY);
|
||||
writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY);
|
||||
|
||||
testRunner.setProperty(REMOTE_FILE, filePath);
|
||||
testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.DELETE);
|
||||
|
||||
runProcessor();
|
||||
|
||||
assertSuccessFlowFile();
|
||||
assertWarning();
|
||||
|
||||
assertTrue(fileExists(filePath));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCompletionStrategyMoveFailsWhenNoPermission() {
|
||||
final String baseDir = "dir_move_noperm";
|
||||
final String filename = "test_file";
|
||||
final String filePath = baseDir + "/" + filename;
|
||||
final String processedDir = "processed";
|
||||
|
||||
createDirectory(baseDir, AccessMode.READ_ONLY);
|
||||
writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY);
|
||||
createDirectory(processedDir, AccessMode.READ_ONLY);
|
||||
|
||||
testRunner.setProperty(REMOTE_FILE, filePath);
|
||||
testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE);
|
||||
testRunner.setProperty(DESTINATION_DIRECTORY, processedDir);
|
||||
|
||||
runProcessor();
|
||||
|
||||
assertSuccessFlowFile();
|
||||
assertWarning();
|
||||
|
||||
assertTrue(fileExists(filePath));
|
||||
assertFalse(fileExists(processedDir + "/" + filename));
|
||||
}
|
||||
|
||||
private void runProcessor() {
|
||||
runProcessor(Collections.emptyMap());
|
||||
}
|
||||
|
||||
private void runProcessor(final Map<String, String> attributes) {
|
||||
testRunner.enqueue("ignored", attributes);
|
||||
testRunner.run();
|
||||
testRunner.assertTransferCount(REL_FAILURE, 1);
|
||||
testRunner.assertValid();
|
||||
testRunner.disableControllerService(smbjClientProviderService);
|
||||
}
|
||||
|
||||
private void assertSuccessFlowFile() {
|
||||
testRunner.assertTransferCount(REL_SUCCESS, 1);
|
||||
assertEquals(TEST_CONTENT, testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent());
|
||||
}
|
||||
|
||||
private void assertWarning() {
|
||||
assertFalse(testRunner.getLogger().getWarnMessages().isEmpty());
|
||||
}
|
||||
|
||||
private void assertNoWarning() {
|
||||
assertTrue(testRunner.getLogger().getWarnMessages().isEmpty());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -232,7 +232,7 @@ class ListSmbTest {
|
|||
testRunner.setProperty(LISTING_STRATEGY, "timestamps");
|
||||
testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
|
||||
final SmbClientService mockNifiSmbClientService = configureTestRunnerWithMockedSambaClient(testRunner);
|
||||
when(mockNifiSmbClientService.listRemoteFiles(anyString())).thenThrow(new RuntimeException("test exception"));
|
||||
when(mockNifiSmbClientService.listFiles(anyString())).thenThrow(new RuntimeException("test exception"));
|
||||
testRunner.run();
|
||||
assertEquals(1, testRunner.getLogger().getErrorMessages().size());
|
||||
testRunner.assertValid();
|
||||
|
@ -282,7 +282,7 @@ class ListSmbTest {
|
|||
}
|
||||
|
||||
private void mockSmbFolders(SmbClientService mockNifiSmbClientService, SmbListableEntity... entities) {
|
||||
doAnswer(ignore -> stream(entities)).when(mockNifiSmbClientService).listRemoteFiles(anyString());
|
||||
doAnswer(ignore -> stream(entities)).when(mockNifiSmbClientService).listFiles(anyString());
|
||||
}
|
||||
|
||||
private SmbListableEntity listableEntity(String name, long timeStamp) {
|
||||
|
|
|
@ -39,12 +39,19 @@ import org.testcontainers.utility.DockerImageName;
|
|||
|
||||
public class SambaTestContainers {
|
||||
|
||||
protected final static Logger LOGGER = LoggerFactory.getLogger(SambaTestContainers.class);
|
||||
|
||||
protected final static Integer DEFAULT_SAMBA_PORT = 445;
|
||||
protected final static Logger logger = LoggerFactory.getLogger(SambaTestContainers.class);
|
||||
|
||||
protected enum AccessMode {
|
||||
READ_ONLY,READ_WRITE;
|
||||
}
|
||||
|
||||
protected final GenericContainer<?> sambaContainer = new GenericContainer<>(DockerImageName.parse("dperson/samba"))
|
||||
.withCreateContainerCmdModifier(cmd -> cmd.withName("samba-test"))
|
||||
.withExposedPorts(DEFAULT_SAMBA_PORT, 139)
|
||||
.waitingFor(Wait.forListeningPort())
|
||||
.withLogConsumer(new Slf4jLogConsumer(logger))
|
||||
.withLogConsumer(new Slf4jLogConsumer(LOGGER))
|
||||
.withCommand("-w domain -u username;password -s share;/folder;;no;no;username;;; -p");
|
||||
|
||||
@BeforeEach
|
||||
|
@ -57,33 +64,63 @@ public class SambaTestContainers {
|
|||
sambaContainer.stop();
|
||||
}
|
||||
|
||||
protected SmbjClientProviderService configureSmbClient(TestRunner testRunner, boolean shouldEnableSmbClient)
|
||||
throws Exception {
|
||||
protected SmbjClientProviderService configureSmbClient(final TestRunner testRunner, final boolean shouldEnableSmbClient) throws Exception {
|
||||
final SmbjClientProviderService smbjClientProviderService = new SmbjClientProviderService();
|
||||
testRunner.addControllerService("client-provider", smbjClientProviderService);
|
||||
|
||||
testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "client-provider");
|
||||
testRunner.setProperty(smbjClientProviderService, HOSTNAME, sambaContainer.getHost());
|
||||
testRunner.setProperty(smbjClientProviderService, PORT,
|
||||
String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
|
||||
testRunner.setProperty(smbjClientProviderService, PORT, String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
|
||||
testRunner.setProperty(smbjClientProviderService, USERNAME, "username");
|
||||
testRunner.setProperty(smbjClientProviderService, PASSWORD, "password");
|
||||
testRunner.setProperty(smbjClientProviderService, SHARE, "share");
|
||||
testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain");
|
||||
|
||||
if (shouldEnableSmbClient) {
|
||||
testRunner.enableControllerService(smbjClientProviderService);
|
||||
}
|
||||
|
||||
return smbjClientProviderService;
|
||||
}
|
||||
|
||||
protected String generateContentWithSize(int sizeInBytes) {
|
||||
byte[] bytes = new byte[sizeInBytes];
|
||||
protected String generateContentWithSize(final int sizeInBytes) {
|
||||
final byte[] bytes = new byte[sizeInBytes];
|
||||
fill(bytes, (byte) 1);
|
||||
return new String(bytes);
|
||||
}
|
||||
|
||||
protected void writeFile(String path, String content) {
|
||||
String containerPath = "/folder/" + path;
|
||||
sambaContainer.copyFileToContainer(Transferable.of(content), containerPath);
|
||||
protected void createDirectory(final String path) {
|
||||
createDirectory(path, AccessMode.READ_ONLY);
|
||||
}
|
||||
|
||||
protected void createDirectory(final String path, final AccessMode accessMode) {
|
||||
final String dirMode = accessMode == AccessMode.READ_ONLY ? "755" : "777";
|
||||
try {
|
||||
sambaContainer.execInContainer("bash", "-c", "mkdir -m " + dirMode + " -p " + getContainerPath(path));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to create directory", e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void writeFile(final String path, final String content) {
|
||||
writeFile(path, content, AccessMode.READ_ONLY);
|
||||
}
|
||||
|
||||
protected void writeFile(final String path, final String content, final AccessMode accessMode) {
|
||||
final int fileMode = accessMode == AccessMode.READ_ONLY ? 0100644: 0100666;
|
||||
sambaContainer.copyFileToContainer(Transferable.of(content, fileMode), getContainerPath(path));
|
||||
}
|
||||
|
||||
protected boolean fileExists(final String path) {
|
||||
try {
|
||||
return sambaContainer.execInContainer("bash", "-c", "cat " + getContainerPath(path) + " > /dev/null").getExitCode() == 0;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to check file", e);
|
||||
}
|
||||
}
|
||||
|
||||
private String getContainerPath(final String path) {
|
||||
return "/folder/" + path;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import static java.util.Arrays.asList;
|
|||
import static java.util.stream.StreamSupport.stream;
|
||||
|
||||
import com.hierynomus.msdtyp.AccessMask;
|
||||
import com.hierynomus.mserref.NtStatus;
|
||||
import com.hierynomus.msfscc.FileAttributes;
|
||||
import com.hierynomus.msfscc.fileinformation.FileIdBothDirectoryInformation;
|
||||
import com.hierynomus.mssmb2.SMB2CreateDisposition;
|
||||
|
@ -45,13 +46,13 @@ class SmbjClientService implements SmbClientService {
|
|||
private final static Logger LOGGER = LoggerFactory.getLogger(SmbjClientService.class);
|
||||
|
||||
private static final List<String> SPECIAL_DIRECTORIES = asList(".", "..");
|
||||
private static final long UNCATEGORISED_ERROR = -1L;
|
||||
private static final long UNCATEGORIZED_ERROR = -1L;
|
||||
|
||||
private final Session session;
|
||||
private final DiskShare share;
|
||||
private final URI serviceLocation;
|
||||
|
||||
SmbjClientService(Session session, DiskShare share, URI serviceLocation) {
|
||||
SmbjClientService(final Session session, final DiskShare share, final URI serviceLocation) {
|
||||
this.session = session;
|
||||
this.share = share;
|
||||
this.serviceLocation = serviceLocation;
|
||||
|
@ -69,50 +70,89 @@ class SmbjClientService implements SmbClientService {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Stream<SmbListableEntity> listRemoteFiles(String filePath) {
|
||||
return Stream.of(filePath).flatMap(path -> {
|
||||
public Stream<SmbListableEntity> listFiles(final String directoryPath) {
|
||||
return Stream.of(directoryPath).flatMap(path -> {
|
||||
final Directory directory = openDirectory(path);
|
||||
return stream(directory::spliterator, 0, false)
|
||||
.map(entity -> buildSmbListableEntity(entity, path, serviceLocation))
|
||||
.filter(entity -> !specialDirectory(entity))
|
||||
.flatMap(listable -> listable.isDirectory() ? listRemoteFiles(listable.getPathWithName())
|
||||
.flatMap(listable -> listable.isDirectory() ? listFiles(listable.getPathWithName())
|
||||
: Stream.of(listable))
|
||||
.onClose(directory::close);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createDirectory(String path) {
|
||||
final int lastDirectorySeparatorPosition = path.lastIndexOf("/");
|
||||
if (lastDirectorySeparatorPosition > 0) {
|
||||
createDirectory(path.substring(0, lastDirectorySeparatorPosition));
|
||||
}
|
||||
if (!share.folderExists(path)) {
|
||||
share.mkdir(path);
|
||||
public void ensureDirectory(final String directoryPath) {
|
||||
try {
|
||||
final int lastDirectorySeparatorPosition = directoryPath.lastIndexOf("/");
|
||||
if (lastDirectorySeparatorPosition > 0) {
|
||||
ensureDirectory(directoryPath.substring(0, lastDirectorySeparatorPosition));
|
||||
}
|
||||
|
||||
if (!share.folderExists(directoryPath)) {
|
||||
try {
|
||||
share.mkdir(directoryPath);
|
||||
} catch (SMBApiException e) {
|
||||
if (e.getStatus() == NtStatus.STATUS_OBJECT_NAME_COLLISION) {
|
||||
if (!share.folderExists(directoryPath)) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw wrapException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFile(String fileName, OutputStream outputStream) throws IOException {
|
||||
try (File f = share.openFile(
|
||||
fileName,
|
||||
public void readFile(final String filePath, final OutputStream outputStream) throws IOException {
|
||||
try (File file = share.openFile(
|
||||
filePath,
|
||||
EnumSet.of(AccessMask.GENERIC_READ),
|
||||
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
|
||||
EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
|
||||
SMB2CreateDisposition.FILE_OPEN,
|
||||
EnumSet.of(SMB2CreateOptions.FILE_SEQUENTIAL_ONLY))
|
||||
) {
|
||||
f.read(outputStream);
|
||||
} catch (SMBApiException a) {
|
||||
throw new SmbException(a.getMessage(), a.getStatusCode(), a);
|
||||
file.read(outputStream);
|
||||
} catch (Exception e) {
|
||||
throw new SmbException(e.getMessage(), UNCATEGORISED_ERROR, e);
|
||||
throw wrapException(e);
|
||||
} finally {
|
||||
outputStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
private SmbListableEntity buildSmbListableEntity(FileIdBothDirectoryInformation info, String path, URI serviceLocation) {
|
||||
@Override
|
||||
public void moveFile(final String filePath, final String directoryPath) {
|
||||
try (File file = share.openFile(
|
||||
filePath,
|
||||
EnumSet.of(AccessMask.GENERIC_WRITE, AccessMask.DELETE),
|
||||
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
|
||||
EnumSet.noneOf(SMB2ShareAccess.class),
|
||||
SMB2CreateDisposition.FILE_OPEN,
|
||||
EnumSet.of(SMB2CreateOptions.FILE_SEQUENTIAL_ONLY))
|
||||
) {
|
||||
final String[] parts = filePath.split("/");
|
||||
// rename operation on Windows requires \ (backslash) path separator
|
||||
final String newFilePath = directoryPath.replace('/', '\\') + "\\" + parts[parts.length - 1];
|
||||
file.rename(newFilePath);
|
||||
} catch (Exception e) {
|
||||
throw wrapException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteFile(final String filePath) {
|
||||
try {
|
||||
share.rm(filePath);
|
||||
} catch (Exception e) {
|
||||
throw wrapException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private SmbListableEntity buildSmbListableEntity(final FileIdBothDirectoryInformation info, final String path, final URI serviceLocation) {
|
||||
return SmbListableEntity.builder()
|
||||
.setName(info.getFileName())
|
||||
.setShortName(info.getShortName())
|
||||
|
@ -128,25 +168,30 @@ class SmbjClientService implements SmbClientService {
|
|||
.build();
|
||||
}
|
||||
|
||||
private Directory openDirectory(String path) {
|
||||
try {
|
||||
return share.openDirectory(
|
||||
path,
|
||||
EnumSet.of(AccessMask.GENERIC_READ),
|
||||
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_DIRECTORY),
|
||||
EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
|
||||
SMB2CreateDisposition.FILE_OPEN,
|
||||
EnumSet.of(SMB2CreateOptions.FILE_DIRECTORY_FILE)
|
||||
);
|
||||
} catch (SMBApiException s) {
|
||||
throw new RuntimeException("Could not open directory " + path + " due to " + s.getMessage(), s);
|
||||
}
|
||||
private Directory openDirectory(final String path) {
|
||||
return share.openDirectory(
|
||||
path,
|
||||
EnumSet.of(AccessMask.GENERIC_READ),
|
||||
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_DIRECTORY),
|
||||
EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
|
||||
SMB2CreateDisposition.FILE_OPEN,
|
||||
EnumSet.of(SMB2CreateOptions.FILE_DIRECTORY_FILE)
|
||||
);
|
||||
}
|
||||
|
||||
private boolean specialDirectory(SmbListableEntity entity) {
|
||||
private boolean specialDirectory(final SmbListableEntity entity) {
|
||||
return SPECIAL_DIRECTORIES.contains(entity.getName());
|
||||
}
|
||||
|
||||
private SmbException wrapException(final Exception e) {
|
||||
if (e instanceof SmbException) {
|
||||
return (SmbException) e;
|
||||
} else {
|
||||
final long errorCode = e instanceof SMBApiException ? ((SMBApiException) e).getStatusCode() : UNCATEGORIZED_ERROR;
|
||||
return new SmbException(e.getMessage(), errorCode, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -132,7 +132,7 @@ public class SmbjClientServiceIT {
|
|||
sambaProxy.setConnectionCut(true);
|
||||
}
|
||||
|
||||
final Set<String> actual = s.listRemoteFiles("testDirectory")
|
||||
final Set<String> actual = s.listFiles("testDirectory")
|
||||
.map(SmbListableEntity::getIdentifier)
|
||||
.collect(toSet());
|
||||
|
||||
|
|
|
@ -62,7 +62,7 @@ class SmbjClientServiceTest {
|
|||
when(share.fileExists("to")).thenReturn(false);
|
||||
when(share.fileExists("create")).thenReturn(false);
|
||||
|
||||
underTest.createDirectory("directory/path/to/create");
|
||||
underTest.ensureDirectory("directory/path/to/create");
|
||||
|
||||
verify(share).mkdir("directory/path");
|
||||
verify(share).mkdir("directory/path/to");
|
||||
|
|
Loading…
Reference in New Issue