NIFI-12837 Added DFS support in SMB processors

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8527.
This commit is contained in:
Peter Turcsanyi 2024-03-18 11:04:32 +01:00 committed by Pierre Villard
parent 06b0f29cad
commit 928206d52f
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
9 changed files with 370 additions and 62 deletions

View File

@ -80,6 +80,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT;
import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT;
import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION;
@ -257,6 +258,7 @@ public class GetSmbFile extends AbstractProcessor {
descriptors.add(IGNORE_HIDDEN_FILES);
descriptors.add(SMB_DIALECT);
descriptors.add(USE_ENCRYPTION);
descriptors.add(ENABLE_DFS);
descriptors.add(TIMEOUT);
this.descriptors = Collections.unmodifiableList(descriptors);

View File

@ -64,6 +64,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT;
import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT;
import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION;
@ -194,6 +195,7 @@ public class PutSmbFile extends AbstractProcessor {
descriptors.add(RENAME_SUFFIX);
descriptors.add(SMB_DIALECT);
descriptors.add(USE_ENCRYPTION);
descriptors.add(ENABLE_DFS);
descriptors.add(TIMEOUT);
this.descriptors = Collections.unmodifiableList(descriptors);

View File

@ -0,0 +1,225 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.smb;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.services.smb.SmbjClientProviderService;
import org.apache.nifi.smb.common.SmbProperties;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.NO_TRACKING;
import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
import static org.apache.nifi.util.TestRunners.newTestRunner;
import static org.junit.jupiter.api.Assertions.assertEquals;
class SmbDfsIT {
private final static Logger LOGGER = LoggerFactory.getLogger(SmbDfsIT.class);
private static final int DEFAULT_SMB_PORT = 445;
// DFS works only on the default SMB port (445). Not sure if it is a generic DFS vs Samba DFS constraint, or an issue in the smbj client library.
private final GenericContainer<?> sambaContainer = new FixedHostPortGenericContainer<>("dperson/samba")
.withFixedExposedPort(DEFAULT_SMB_PORT, DEFAULT_SMB_PORT)
.waitingFor(Wait.forListeningPort())
.withLogConsumer(new Slf4jLogConsumer(LOGGER))
.withCommand("-u", "myuser;mypass",
"-s", "share;/share-dir;;no;no;myuser;;;",
"-s", "dfs-share;/dfs-share-dir;;no;no;myuser;;;",
"-p",
"-g", "host msdfs = yes",
"-G", "dfs-share;msdfs root = yes");
@BeforeEach
void beforeEach() throws Exception {
sambaContainer.start();
sambaContainer.execInContainer("ln", "-s", "msdfs:" + sambaContainer.getHost() + "\\share", "/dfs-share-dir/dfs-link");
Thread.sleep(100);
}
@AfterEach
void afterEach() {
sambaContainer.stop();
}
@Test
void testFetchSmb() throws Exception {
writeFile("fetch_file", "fetch_content");
TestRunner testRunner = newTestRunner(FetchSmb.class);
testRunner.setProperty(FetchSmb.REMOTE_FILE, "dfs-link/fetch_file");
SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner);
testRunner.enqueue("");
testRunner.run();
testRunner.assertTransferCount(FetchSmb.REL_SUCCESS, 1);
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchSmb.REL_SUCCESS).get(0);
assertEquals("fetch_content", flowFile.getContent());
testRunner.disableControllerService(smbjClientProviderService);
}
@Test
void testFetchFileFailsWhenDfsIsDisabled() throws Exception {
writeFile("fetch_file", "fetch_content");
TestRunner testRunner = newTestRunner(FetchSmb.class);
testRunner.setProperty(FetchSmb.REMOTE_FILE, "dfs-link/fetch_file");
SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, false);
testRunner.enqueue("");
testRunner.run();
testRunner.assertTransferCount(FetchSmb.REL_FAILURE, 1);
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchSmb.REL_FAILURE).get(0);
assertEquals(0, flowFile.getSize());
testRunner.disableControllerService(smbjClientProviderService);
}
@Test
void testListSmbWithDfsLink() throws Exception {
testListSmb("dfs-link");
}
@Test
@Disabled("Listing folders recursively from the DFS root or a parent directory of the DFS link does not work on Samba due to https://github.com/hierynomus/smbj/issues/717#")
void testListSmbWithDfsRoot() throws Exception {
testListSmb(null);
}
private void testListSmb(String directory) throws Exception {
writeFile("list_file", "list_content");
TestRunner testRunner = newTestRunner(ListSmb.class);
if (directory != null) {
testRunner.setProperty(ListSmb.DIRECTORY, directory);
}
testRunner.setProperty(ListSmb.LISTING_STRATEGY, NO_TRACKING);
testRunner.setProperty(ListSmb.MINIMUM_AGE, "0 ms");
SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner);
testRunner.run();
testRunner.assertTransferCount(ListSmb.REL_SUCCESS, 1);
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(ListSmb.REL_SUCCESS).get(0);
assertEquals(0, flowFile.getSize());
assertEquals("dfs-link", flowFile.getAttribute(CoreAttributes.PATH.key()));
assertEquals("list_file", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
testRunner.disableControllerService(smbjClientProviderService);
}
@Test
void testPutSmbFile() {
TestRunner testRunner = newTestRunner(PutSmbFile.class);
testRunner.setProperty(PutSmbFile.HOSTNAME, sambaContainer.getHost());
testRunner.setProperty(PutSmbFile.SHARE, "dfs-share");
testRunner.setProperty(PutSmbFile.DIRECTORY, "dfs-link");
testRunner.setProperty(PutSmbFile.USERNAME, "myuser");
testRunner.setProperty(PutSmbFile.PASSWORD, "mypass");
testRunner.setProperty(SmbProperties.ENABLE_DFS, "true");
testRunner.enqueue("put_content", Map.of(CoreAttributes.FILENAME.key(), "put_file"));
testRunner.run();
testRunner.assertTransferCount(PutSmbFile.REL_SUCCESS, 1);
String fileContent = readFile("put_file");
assertEquals("put_content", fileContent);
}
@Test
void testGetSmbFile() {
writeFile("get_file", "get_content");
TestRunner testRunner = newTestRunner(GetSmbFile.class);
testRunner.setProperty(GetSmbFile.HOSTNAME, sambaContainer.getHost());
testRunner.setProperty(GetSmbFile.SHARE, "dfs-share");
testRunner.setProperty(GetSmbFile.DIRECTORY, "dfs-link");
testRunner.setProperty(GetSmbFile.USERNAME, "myuser");
testRunner.setProperty(GetSmbFile.PASSWORD, "mypass");
testRunner.setProperty(SmbProperties.ENABLE_DFS, "true");
testRunner.run();
testRunner.assertTransferCount(GetSmbFile.REL_SUCCESS, 1);
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GetSmbFile.REL_SUCCESS).get(0);
assertEquals("get_content", flowFile.getContent());
assertEquals("dfs-link", flowFile.getAttribute(CoreAttributes.PATH.key()));
assertEquals("get_file", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
}
private SmbjClientProviderService configureSmbClient(TestRunner testRunner) throws InitializationException {
return configureSmbClient(testRunner, true);
}
private SmbjClientProviderService configureSmbClient(TestRunner testRunner, boolean enableDfs) throws InitializationException {
SmbjClientProviderService smbjClientProviderService = new SmbjClientProviderService();
testRunner.addControllerService("client-provider", smbjClientProviderService);
testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "client-provider");
testRunner.setProperty(smbjClientProviderService, HOSTNAME, sambaContainer.getHost());
testRunner.setProperty(smbjClientProviderService, PORT, Integer.toString(DEFAULT_SMB_PORT));
testRunner.setProperty(smbjClientProviderService, USERNAME, "myuser");
testRunner.setProperty(smbjClientProviderService, PASSWORD, "mypass");
testRunner.setProperty(smbjClientProviderService, SHARE, "dfs-share");
testRunner.setProperty(smbjClientProviderService, ENABLE_DFS, Boolean.toString(enableDfs));
testRunner.enableControllerService(smbjClientProviderService);
return smbjClientProviderService;
}
private void writeFile(String filename, String content) {
String containerPath = "/share-dir/" + filename;
sambaContainer.copyFileToContainer(Transferable.of(content), containerPath);
}
private String readFile(String filename) {
String containerPath = "/share-dir/" + filename;
return sambaContainer.copyFileFromContainer(containerPath, is -> IOUtils.toString(is, StandardCharsets.UTF_8));
}
}

View File

@ -39,6 +39,7 @@ import static java.util.Arrays.asList;
import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR;
import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR;
import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT;
import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT;
import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION;
@ -112,6 +113,7 @@ public class SmbjClientProviderService extends AbstractControllerService impleme
DOMAIN,
SMB_DIALECT,
USE_ENCRYPTION,
ENABLE_DFS,
TIMEOUT
));
@ -122,24 +124,37 @@ public class SmbjClientProviderService extends AbstractControllerService impleme
private String shareName;
@Override
public SmbClientService getClient() throws IOException {
Connection connection = null;
try {
connection = smbClient.connect(hostname, port);
return connectToShare(connection);
} catch (IOException e) {
getLogger().debug("Closing stale connection and trying to create a new one for share " + getServiceLocation());
closeConnection(connection);
unregisterHost();
connection = smbClient.connect(hostname, port);
return connectToShare(connection);
}
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
private SmbjClientService connectToShare(final Connection connection) throws IOException {
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.hostname = context.getProperty(HOSTNAME).getValue();
this.port = context.getProperty(PORT).asInteger();
this.shareName = context.getProperty(SHARE).getValue();
this.smbClient = buildSmbClient(context);
createAuthenticationContext(context);
}
@OnDisabled
public void onDisabled() {
smbClient.close();
smbClient = null;
hostname = null;
port = 0;
shareName = null;
}
@Override
public URI getServiceLocation() {
return URI.create(String.format("smb://%s:%d/%s", hostname, port, shareName));
}
@Override
public SmbClientService getClient() throws IOException {
final Connection connection = smbClient.connect(hostname, port);
final Session session;
final Share share;
@ -164,20 +179,6 @@ public class SmbjClientProviderService extends AbstractControllerService impleme
return new SmbjClientService(session, (DiskShare) share, getServiceLocation());
}
private void unregisterHost() {
smbClient.getServerList().unregister(hostname);
}
private void closeConnection(final Connection connection) {
try {
if (connection != null) {
connection.close(true);
}
} catch (Exception e) {
getLogger().error("Could not close connection to {}", getServiceLocation(), e);
}
}
private void closeSession(final Session session) {
try {
if (session != null) {
@ -188,34 +189,6 @@ public class SmbjClientProviderService extends AbstractControllerService impleme
}
}
@Override
public URI getServiceLocation() {
return URI.create(String.format("smb://%s:%d/%s", hostname, port, shareName));
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.hostname = context.getProperty(HOSTNAME).getValue();
this.port = context.getProperty(PORT).asInteger();
this.shareName = context.getProperty(SHARE).getValue();
this.smbClient = buildSmbClient(context);
createAuthenticationContext(context);
}
@OnDisabled
public void onDisabled() {
smbClient.close();
smbClient = null;
hostname = null;
port = 0;
shareName = null;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
private void createAuthenticationContext(final ConfigurationContext context) {
if (context.getProperty(USERNAME).isSet()) {
final String userName = context.getProperty(USERNAME).getValue();

View File

@ -54,7 +54,7 @@ import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;
public class NiFiSmbjClientIT {
public class SmbjClientServiceIT {
private final static Logger sambaContainerLogger = LoggerFactory.getLogger("sambaContainer");
private final static Logger toxyProxyLogger = LoggerFactory.getLogger("toxiProxy");
@ -62,7 +62,7 @@ public class NiFiSmbjClientIT {
private final Network network = Network.newNetwork();
private final GenericContainer<?> sambaContainer = new GenericContainer<>(DockerImageName.parse("dperson/samba"))
.withExposedPorts(139, 445)
.withExposedPorts(445)
.waitingFor(Wait.forListeningPort())
.withLogConsumer(new Slf4jLogConsumer(sambaContainerLogger))
.withNetwork(network)

View File

@ -29,7 +29,7 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class NiFiSmbjClientTest {
class SmbjClientServiceTest {
@Mock
Session session;

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.smb.common;
import com.hierynomus.mssmb2.messages.SMB2Echo;
import com.hierynomus.smbj.SMBClient;
import com.hierynomus.smbj.SmbConfig;
import com.hierynomus.smbj.connection.Connection;
import com.hierynomus.smbj.event.ConnectionClosed;
import com.hierynomus.smbj.event.SMBEventBus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* Extends {@link com.hierynomus.smbj.SMBClient} with connection health check.
* <br/>
* Workaround to https://github.com/hierynomus/smbj/issues/796.
* <br/><br/>
* Health check method:
* <ul>
* <li>get connection from the parent class</li>
* <li>if it is a newly created connection, then return it</li>
* <li>if it is an old connection, send ECHO message to the server
* <ul>
* <li>if ECHO succeeds, return the connection</li>
* <li>if ECHO fails, unregister the connection, get connection again (which creates a new one) and return it</li>
* </ul>
* </li>
* </ul>
*/
class SmbClient extends SMBClient {
private static final Logger LOGGER = LoggerFactory.getLogger(SmbClient.class);
private SMBEventBus bus;
private SmbClient(final SmbConfig config, final SMBEventBus bus) {
super(config, bus);
}
static SmbClient create(final SmbConfig config) {
final SMBEventBus bus = new SMBEventBus();
final SmbClient client = new SmbClient(config, bus);
client.bus = bus;
return client;
}
public Connection connect(final String hostname) throws IOException {
return connect(hostname, DEFAULT_PORT);
}
public synchronized Connection connect(final String hostname, final int port) throws IOException {
final Connection connection = super.connect(hostname, port);
try {
// SMB2 ECHO message can only be sent if this is not a new connection (and health check is only needed in this case)
if (!connection.release()) {
connection.send(new SMB2Echo(connection.getNegotiatedProtocol().getDialect())).get(10, TimeUnit.SECONDS);
}
// set lease counter back
connection.lease();
return connection;
} catch (Exception e) {
LOGGER.info("Stale connection found, unregistering it and creating a new one");
bus.publish(new ConnectionClosed(hostname, port));
}
return super.connect(hostname, port);
}
}

View File

@ -44,6 +44,15 @@ public class SmbProperties {
.defaultValue("false")
.build();
public static final PropertyDescriptor ENABLE_DFS = new PropertyDescriptor.Builder()
.name("enable-dfs")
.displayName("Enable DFS")
.description("Enables accessing Distributed File System (DFS) and following DFS links during SMB operations.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.displayName("Timeout")
.name("timeout")

View File

@ -21,6 +21,7 @@ import com.hierynomus.smbj.SmbConfig;
import org.apache.nifi.context.PropertyContext;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT;
import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT;
import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION;
@ -32,7 +33,7 @@ public final class SmbUtils {
}
public static SMBClient buildSmbClient(final PropertyContext context) {
return new SMBClient(buildSmbConfig(context));
return SmbClient.create(buildSmbConfig(context));
}
static SmbConfig buildSmbConfig(final PropertyContext context) {
@ -50,6 +51,10 @@ public final class SmbUtils {
configBuilder.withEncryptData(context.getProperty(USE_ENCRYPTION).asBoolean());
}
if (context.getProperty(ENABLE_DFS).isSet()) {
configBuilder.withDfsEnabled(context.getProperty(ENABLE_DFS).asBoolean());
}
if (context.getProperty(TIMEOUT).isSet()) {
configBuilder.withTimeout(context.getProperty(TIMEOUT).asTimePeriod(MILLISECONDS), MILLISECONDS);
}