NIFI-10212 added ListSmb processor and SmbConnectionPoolService

This closes #6192.

Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
Gabor Kulik 2022-06-20 15:43:33 +02:00 committed by Tamas Palfy
parent 5cc2790891
commit 768f58da21
24 changed files with 2201 additions and 7 deletions

View File

@ -664,6 +664,18 @@ language governing permissions and limitations under the License. -->
<version>1.18.0-SNAPSHOT</version> <version>1.18.0-SNAPSHOT</version>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-smb-client-api-nar</artifactId>
<version>1.18.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-smb-smbj-client-nar</artifactId>
<version>1.18.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-windows-event-log-nar</artifactId> <artifactId>nifi-windows-event-log-nar</artifactId>

View File

@ -182,7 +182,7 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override @Override
public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) { public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) {
run(iterations, stopOnFinish, initialize, 5000); run(iterations, stopOnFinish, initialize, 5000 + iterations * runSchedule);
} }
@Override @Override

View File

@ -543,6 +543,10 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
return System.currentTimeMillis(); return System.currentTimeMillis();
} }
protected long getCurrentNanoTime() {
return System.nanoTime();
}
public void listByNoTracking(final ProcessContext context, final ProcessSession session) { public void listByNoTracking(final ProcessContext context, final ProcessSession session) {
final List<T> entityList; final List<T> entityList;
@ -655,6 +659,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
.computeIfAbsent(entity.getTimestamp(), __ -> new ArrayList<>()) .computeIfAbsent(entity.getTimestamp(), __ -> new ArrayList<>())
.add(entity) .add(entity)
); );
if (getLogger().isTraceEnabled()) { if (getLogger().isTraceEnabled()) {
getLogger().trace("orderedEntries: " + getLogger().trace("orderedEntries: " +
orderedEntries.values().stream() orderedEntries.values().stream()
@ -744,8 +749,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
} }
final List<T> entityList; final List<T> entityList;
final long currentRunTimeNanos = System.nanoTime(); final long currentRunTimeNanos = getCurrentNanoTime();
final long currentRunTimeMillis = System.currentTimeMillis(); final long currentRunTimeMillis = getCurrentTime();
try { try {
// track of when this last executed for consideration of the lag nanos // track of when this last executed for consideration of the lag nanos
entityList = performListing(context, minTimestampToListMillis, ListingMode.EXECUTION); entityList = performListing(context, minTimestampToListMillis, ListingMode.EXECUTION);
@ -1100,7 +1105,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
} }
protected ListedEntityTracker<T> createListedEntityTracker() { protected ListedEntityTracker<T> createListedEntityTracker() {
return new ListedEntityTracker<>(getIdentifier(), getLogger(), getRecordSchema()); return new ListedEntityTracker<>(getIdentifier(), getLogger(), this::getCurrentTime, getRecordSchema());
} }
private void listByTrackingEntities(ProcessContext context, ProcessSession session) throws ProcessException { private void listByTrackingEntities(ProcessContext context, ProcessSession session) throws ProcessException {

View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-smb-bundle</artifactId>
<version>1.18.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-smb-client-api-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>1.18.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-smb-client-api</artifactId>
<version>1.18.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,40 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-smb-bundle</artifactId>
<version>1.18.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-smb-client-api</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.18.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-listed-entity</artifactId>
<version>1.18.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.smb;
import java.io.IOException;
import java.net.URI;
import org.apache.nifi.controller.ControllerService;
public interface SmbClientProviderService extends ControllerService {
/**
* Returns the identifier of the service location.
*
* @return the remote location
*/
URI getServiceLocation();
/**
* Returns the smb client to use.
*
* @return the client.
*/
SmbClientService getClient() throws IOException;
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.smb;
import java.util.stream.Stream;
/**
* Service abstraction for Server Message Block protocol operations.
*/
public interface SmbClientService extends AutoCloseable {
Stream<SmbListableEntity> listRemoteFiles(String path);
void createDirectory(String path);
}

View File

@ -0,0 +1,236 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.smb;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.nifi.processor.util.list.ListableEntity;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
public class SmbListableEntity implements ListableEntity {
private final String name;
private final String shortName;
private final String path;
private final long timestamp;
private final long creationTime;
private final long lastAccessTime;
private final long changeTime;
private final boolean directory;
private final long size;
private final long allocationSize;
private SmbListableEntity(String name, String shortName, String path, long timestamp, long creationTime,
long lastAccessTime, long changeTime, boolean directory,
long size, long allocationSize) {
this.name = name;
this.shortName = shortName;
this.path = path;
this.timestamp = timestamp;
this.creationTime = creationTime;
this.lastAccessTime = lastAccessTime;
this.changeTime = changeTime;
this.directory = directory;
this.size = size;
this.allocationSize = allocationSize;
}
public static SimpleRecordSchema getRecordSchema() {
List<RecordField> fields = Arrays.asList(
new RecordField("filename", RecordFieldType.STRING.getDataType(), false),
new RecordField("shortName", RecordFieldType.STRING.getDataType(), false),
new RecordField("path", RecordFieldType.STRING.getDataType(), false),
new RecordField("identifier", RecordFieldType.STRING.getDataType(), false),
new RecordField("timestamp", RecordFieldType.LONG.getDataType(), false),
new RecordField("creationTime", RecordFieldType.LONG.getDataType(), false),
new RecordField("lastAccessTime", RecordFieldType.LONG.getDataType(), false),
new RecordField("changeTime", RecordFieldType.LONG.getDataType(), false),
new RecordField("size", RecordFieldType.LONG.getDataType(), false),
new RecordField("allocationSize", RecordFieldType.LONG.getDataType(), false)
);
return new SimpleRecordSchema(fields);
}
public static SmbListableEntityBuilder builder() {
return new SmbListableEntityBuilder();
}
public String getShortName() {
return shortName;
}
public long getCreationTime() {
return creationTime;
}
public long getLastAccessTime() {
return lastAccessTime;
}
public long getChangeTime() {
return changeTime;
}
public long getAllocationSize() {
return allocationSize;
}
@Override
public String getName() {
return name;
}
public String getPath() {
return path;
}
public String getPathWithName() {
return path.isEmpty() ? name : path + "/" + name;
}
@Override
public String getIdentifier() {
return getPathWithName();
}
@Override
public long getTimestamp() {
return timestamp;
}
@Override
public long getSize() {
return size;
}
public boolean isDirectory() {
return directory;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SmbListableEntity that = (SmbListableEntity) o;
return getPathWithName().equals(that.getPathWithName());
}
@Override
public int hashCode() {
return getPathWithName().hashCode();
}
@Override
public String toString() {
return getPathWithName() + " (last write: " + timestamp + " size: " + size + ")";
}
@Override
public Record toRecord() {
final Map<String, Object> record = new TreeMap<>();
record.put("filename", getName());
record.put("shortName", getShortName());
record.put("path", path);
record.put("identifier", getPathWithName());
record.put("timestamp", getTimestamp());
record.put("creationTime", getCreationTime());
record.put("lastAccessTime", getLastAccessTime());
record.put("size", getSize());
record.put("allocationSize", getAllocationSize());
return new MapRecord(getRecordSchema(), record);
}
public static class SmbListableEntityBuilder {
private String name;
private String shortName;
private String path = "";
private long timestamp;
private long creationTime;
private long lastAccessTime;
private long changeTime;
private boolean directory = false;
private long size = 0;
private long allocationSize = 0;
public SmbListableEntityBuilder setName(String name) {
this.name = name;
return this;
}
public SmbListableEntityBuilder setShortName(String shortName) {
this.shortName = shortName;
return this;
}
public SmbListableEntityBuilder setPath(String path) {
this.path = path;
return this;
}
public SmbListableEntityBuilder setTimestamp(long timestamp) {
this.timestamp = timestamp;
return this;
}
public SmbListableEntityBuilder setCreationTime(long creationTime) {
this.creationTime = creationTime;
return this;
}
public SmbListableEntityBuilder setLastAccessTime(long lastAccessTime) {
this.lastAccessTime = lastAccessTime;
return this;
}
public SmbListableEntityBuilder setChangeTime(long changeTime) {
this.changeTime = changeTime;
return this;
}
public SmbListableEntityBuilder setDirectory(boolean directory) {
this.directory = directory;
return this;
}
public SmbListableEntityBuilder setSize(long size) {
this.size = size;
return this;
}
public SmbListableEntityBuilder setAllocationSize(long allocationSize) {
this.allocationSize = allocationSize;
return this;
}
public SmbListableEntity build() {
return new SmbListableEntity(name, shortName, path, timestamp, creationTime, lastAccessTime, changeTime,
directory, size, allocationSize);
}
}
}

View File

@ -35,5 +35,11 @@
<artifactId>nifi-smb-processors</artifactId> <artifactId>nifi-smb-processors</artifactId>
<version>1.18.0-SNAPSHOT</version> <version>1.18.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-smb-client-api-nar</artifactId>
<version>1.18.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -26,10 +26,24 @@
<packaging>jar</packaging> <packaging>jar</packaging>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-smb-client-api</artifactId>
<version>1.18.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId> <artifactId>nifi-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId> <artifactId>nifi-utils</artifactId>
@ -38,7 +52,10 @@
<dependency> <dependency>
<groupId>com.hierynomus</groupId> <groupId>com.hierynomus</groupId>
<artifactId>smbj</artifactId> <artifactId>smbj</artifactId>
<version>0.10.0</version> </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
@ -46,5 +63,26 @@
<version>1.18.0-SNAPSHOT</version> <version>1.18.0-SNAPSHOT</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-smb-smbj-client</artifactId>
<version>1.18.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,369 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.smb;
import static java.time.format.DateTimeFormatter.ISO_DATE_TIME;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap;
import static org.apache.nifi.components.state.Scope.CLUSTER;
import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR;
import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR;
import java.io.IOException;
import java.net.URI;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.services.smb.SmbClientProviderService;
import org.apache.nifi.services.smb.SmbClientService;
import org.apache.nifi.services.smb.SmbListableEntity;
@PrimaryNodeOnly
@TriggerSerially
@Tags({"samba, smb, cifs, files", "list"})
@SeeAlso({PutSmbFile.class, GetSmbFile.class})
@CapabilityDescription("Lists concrete files shared via SMB protocol. " +
"Each listed file may result in one flowfile, the metadata being written as flowfile attributes. " +
"Or - in case the 'Record Writer' property is set - the entire result is written as records to a single flowfile. "
+
"This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the "
+
"previous node left off without duplicating all of the data.")
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."),
@WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."),
@WritesAttribute(attribute = "path", description =
"The path is set to the relative path of the file's directory "
+ "on filesystem compared to the Share and Input Directory properties and the configured host "
+ "and port inherited from the configured connection pool controller service. For example, for "
+ "a given remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from "
+ "smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file then the path attribute will be set to \"sub/folder/file\"."),
@WritesAttribute(attribute = "absolute.path", description =
"The absolute.path is set to the absolute path of the file's directory on the remote location. For example, "
+ "given a remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listen from "
+ "SHARE/DIRECTORY/sub/folder/file then the absolute.path attribute will be set to "
+ "SHARE/DIRECTORY/sub/folder/file."),
@WritesAttribute(attribute = "identifier", description =
"The identifier of the file. This equals to the path attribute so two files with the same relative path "
+ "coming from different file shares considered to be identical."),
@WritesAttribute(attribute = "timestamp", description =
"The timestamp of when the file's content changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
@WritesAttribute(attribute = "createTime", description =
"The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
@WritesAttribute(attribute = "lastAccessTime", description =
"The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
@WritesAttribute(attribute = "changeTime", description =
"The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
@WritesAttribute(attribute = "size", description = "The number of bytes in the source file"),
@WritesAttribute(attribute = "allocationSize", description = "The number of bytes allocated for the file on the server"),
})
@Stateful(scopes = {Scope.CLUSTER}, description =
"After performing a listing of files, the state of the previous listing can be stored in order to list files "
+ "continuously without duplication."
)
public class ListSmb extends AbstractListProcessor<SmbListableEntity> {
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.displayName("Input Directory")
.name("directory")
.description("The network folder from which to list files. This is the remaining relative path " +
"after the share: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]/sub/directories. It is also possible "
+ "to add subdirectories. The given path on the remote file share must exist. "
+ "This can be checked using verification. You may mix Windows and Linux-style "
+ "directory separators.")
.required(false)
.addValidator(NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder()
.displayName("Minimum File Age")
.name("min-file-age")
.description("The minimum age that a file must be in order to be listed; any file younger than this "
+ "amount of time will be ignored.")
.required(true)
.addValidator(TIME_PERIOD_VALIDATOR)
.defaultValue("5 secs")
.build();
public static final PropertyDescriptor MAXIMUM_AGE = new PropertyDescriptor.Builder()
.displayName("Maximum File Age")
.name("max-file-age")
.description("Any file older than the given value will be omitted. ")
.required(false)
.addValidator(TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor MINIMUM_SIZE = new PropertyDescriptor.Builder()
.displayName("Minimum File Size")
.name("min-file-size")
.description("Any file smaller than the given value will be omitted.")
.required(false)
.addValidator(DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MAXIMUM_SIZE = new PropertyDescriptor.Builder()
.displayName("Maximum File Size")
.name("max-file-size")
.description("Any file larger than the given value will be omitted.")
.required(false)
.addValidator(DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor SMB_LISTING_STRATEGY = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(LISTING_STRATEGY)
.allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS)
.build();
public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
.name("smb-client-provider-service")
.displayName("SMB Client Provider Service")
.description("Specifies the SMB client provider to use for creating SMB connections.")
.required(true)
.identifiesControllerService(SmbClientProviderService.class)
.build();
public static final PropertyDescriptor FILE_NAME_SUFFIX_FILTER = new Builder()
.name("file-name-suffix-filter")
.displayName("File Name Suffix Filter")
.description("Files ending with the given suffix will be omitted. Can be used to make sure that files "
+ "that are still uploading are not listed multiple times, by having those files have a suffix "
+ "and remove the suffix once the upload finishes. This is highly recommended when using "
+ "'Tracking Entities' or 'Tracking Timestamps' listing strategies.")
.required(false)
.addValidator(NON_EMPTY_VALIDATOR)
.addValidator(new MustNotContainDirectorySeparatorsValidator())
.build();
private static final List<PropertyDescriptor> PROPERTIES = unmodifiableList(asList(
SMB_LISTING_STRATEGY,
SMB_CLIENT_PROVIDER_SERVICE,
DIRECTORY,
AbstractListProcessor.RECORD_WRITER,
FILE_NAME_SUFFIX_FILTER,
MINIMUM_AGE,
MAXIMUM_AGE,
MINIMUM_SIZE,
MAXIMUM_SIZE,
AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION,
ListedEntityTracker.TRACKING_STATE_CACHE,
ListedEntityTracker.TRACKING_TIME_WINDOW,
ListedEntityTracker.INITIAL_LISTING_TARGET
));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
protected Map<String, String> createAttributes(SmbListableEntity entity, ProcessContext context) {
final Map<String, String> attributes = new TreeMap<>();
attributes.put("filename", entity.getName());
attributes.put("shortname", entity.getShortName());
attributes.put("path", entity.getPath());
attributes.put("absolute.path", entity.getPathWithName());
attributes.put("identifier", entity.getIdentifier());
attributes.put("timestamp", formatTimeStamp(entity.getTimestamp()));
attributes.put("creationTime", formatTimeStamp(entity.getCreationTime()));
attributes.put("lastAccessTime", formatTimeStamp(entity.getLastAccessTime()));
attributes.put("changeTime", formatTimeStamp(entity.getChangeTime()));
attributes.put("size", String.valueOf(entity.getSize()));
attributes.put("allocationSize", String.valueOf(entity.getAllocationSize()));
return unmodifiableMap(attributes);
}
@Override
protected String getPath(ProcessContext context) {
final SmbClientProviderService clientProviderService =
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
final URI serviceLocation = clientProviderService.getServiceLocation();
final String directory = getDirectory(context);
return String.format("%s/%s", serviceLocation.toString(), directory.isEmpty() ? "" : directory + "/");
}
@Override
protected List<SmbListableEntity> performListing(ProcessContext context, Long minimumTimestampOrNull,
ListingMode listingMode) throws IOException {
final Predicate<SmbListableEntity> fileFilter =
createFileFilter(context, minimumTimestampOrNull);
try (Stream<SmbListableEntity> listing = performListing(context)) {
final Iterator<SmbListableEntity> iterator = listing.iterator();
final List<SmbListableEntity> result = new LinkedList<>();
while (iterator.hasNext()) {
if (isExecutionStopped(listingMode)) {
return emptyList();
}
final SmbListableEntity entity = iterator.next();
if (fileFilter.test(entity)) {
result.add(entity);
}
}
return result;
} catch (Exception e) {
throw new IOException("Could not perform listing", e);
}
}
@Override
protected boolean isListingResetNecessary(PropertyDescriptor property) {
return asList(SMB_CLIENT_PROVIDER_SERVICE, DIRECTORY, FILE_NAME_SUFFIX_FILTER).contains(property);
}
@Override
protected Scope getStateScope(PropertyContext context) {
return CLUSTER;
}
@Override
protected RecordSchema getRecordSchema() {
return SmbListableEntity.getRecordSchema();
}
@Override
protected Integer countUnfilteredListing(ProcessContext context) throws IOException {
try (Stream<SmbListableEntity> listing = performListing(context)) {
return Long.valueOf(listing.count()).intValue();
} catch (Exception e) {
throw new IOException("Could not count files", e);
}
}
@Override
protected String getListingContainerName(ProcessContext context) {
return String.format("Remote Directory [%s]", getPath(context));
}
private String formatTimeStamp(long timestamp) {
return ISO_DATE_TIME.format(
LocalDateTime.ofEpochSecond(TimeUnit.MILLISECONDS.toSeconds(timestamp), 0, ZoneOffset.UTC));
}
private boolean isExecutionStopped(ListingMode listingMode) {
return ListingMode.EXECUTION.equals(listingMode) && !isScheduled();
}
private Predicate<SmbListableEntity> createFileFilter(ProcessContext context, Long minTimestampOrNull) {
final Long minimumAge = context.getProperty(MINIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maximumAgeOrNull = context.getProperty(MAXIMUM_AGE).isSet() ? context.getProperty(MAXIMUM_AGE)
.asTimePeriod(TimeUnit.MILLISECONDS) : null;
final Double minimumSizeOrNull =
context.getProperty(MINIMUM_SIZE).isSet() ? context.getProperty(MINIMUM_SIZE).asDataSize(DataUnit.B)
: null;
final Double maximumSizeOrNull =
context.getProperty(MAXIMUM_SIZE).isSet() ? context.getProperty(MAXIMUM_SIZE).asDataSize(DataUnit.B)
: null;
final String suffixOrNull = context.getProperty(FILE_NAME_SUFFIX_FILTER).getValue();
final long now = getCurrentTime();
Predicate<SmbListableEntity> filter = entity -> now - entity.getTimestamp() >= minimumAge;
if (maximumAgeOrNull != null) {
filter = filter.and(entity -> now - entity.getTimestamp() <= maximumAgeOrNull);
}
if (minTimestampOrNull != null) {
filter = filter.and(entity -> entity.getTimestamp() >= minTimestampOrNull);
}
if (minimumSizeOrNull != null) {
filter = filter.and(entity -> entity.getSize() >= minimumSizeOrNull);
}
if (maximumSizeOrNull != null) {
filter = filter.and(entity -> entity.getSize() <= maximumSizeOrNull);
}
if (suffixOrNull != null) {
filter = filter.and(entity -> !entity.getName().endsWith(suffixOrNull));
}
return filter;
}
private Stream<SmbListableEntity> performListing(ProcessContext context) throws IOException {
final SmbClientProviderService clientProviderService =
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
final String directory = getDirectory(context);
final SmbClientService clientService = clientProviderService.getClient();
return clientService.listRemoteFiles(directory).onClose(() -> {
try {
clientService.close();
} catch (Exception e) {
throw new RuntimeException("Could not close samba client", e);
}
});
}
private String getDirectory(ProcessContext context) {
final PropertyValue property = context.getProperty(DIRECTORY);
final String directory = property.isSet() ? property.getValue().replace('\\', '/') : "";
return directory.equals("/") ? "" : directory;
}
private static class MustNotContainDirectorySeparatorsValidator implements Validator {
@Override
public ValidationResult validate(String subject, String value, ValidationContext context) {
return new ValidationResult.Builder()
.subject(subject)
.input(value)
.valid(!value.contains("/"))
.explanation(subject + " must not contain any folder separator character.")
.build();
}
}
}

View File

@ -12,5 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
org.apache.nifi.processors.smb.PutSmbFile
org.apache.nifi.processors.smb.GetSmbFile org.apache.nifi.processors.smb.GetSmbFile
org.apache.nifi.processors.smb.ListSmb
org.apache.nifi.processors.smb.PutSmbFile

View File

@ -0,0 +1,297 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.smb;
import static java.util.Arrays.asList;
import static java.util.Arrays.fill;
import static java.util.stream.Collectors.toSet;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_STRATEGY;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.RECORD_WRITER;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
import static org.apache.nifi.processors.smb.ListSmb.DIRECTORY;
import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER;
import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE;
import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_SIZE;
import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN;
import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
import static org.apache.nifi.util.TestRunners.newTestRunner;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.services.smb.SmbClientProviderService;
import org.apache.nifi.services.smb.SmbListableEntity;
import org.apache.nifi.services.smb.SmbjClientProviderService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;
public class ListSmbIT {
private final static Integer DEFAULT_SAMBA_PORT = 445;
private final static Logger logger = LoggerFactory.getLogger(ListSmbTest.class);
private final GenericContainer<?> sambaContainer = new GenericContainer<>(DockerImageName.parse("dperson/samba"))
.withExposedPorts(DEFAULT_SAMBA_PORT, 139)
.waitingFor(Wait.forListeningPort())
.withLogConsumer(new Slf4jLogConsumer(logger))
.withCommand("-w domain -u username;password -s share;/folder;;no;no;username;;; -p");
@BeforeEach
public void beforeEach() {
sambaContainer.start();
}
@AfterEach
public void afterEach() {
sambaContainer.stop();
}
@ParameterizedTest
@ValueSource(ints = {4, 50, 45000})
public void shouldFillSizeAttributeProperly(int size) throws Exception {
writeFile("1.txt", generateContentWithSize(size));
final TestRunner testRunner = newTestRunner(ListSmb.class);
testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(MINIMUM_AGE, "0 ms");
SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner);
testRunner.enableControllerService(smbjClientProviderService);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.getFlowFilesForRelationship(REL_SUCCESS)
.forEach(flowFile -> assertEquals(size, Integer.valueOf(flowFile.getAttribute("size"))));
testRunner.assertValid();
testRunner.disableControllerService(smbjClientProviderService);
}
@Test
public void shouldShowBulletinOnMissingDirectory() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(MINIMUM_AGE, "0 ms");
testRunner.setProperty(DIRECTORY, "folderDoesNotExists");
SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner);
testRunner.enableControllerService(smbjClientProviderService);
testRunner.run();
assertEquals(1, testRunner.getLogger().getErrorMessages().size());
testRunner.assertValid();
testRunner.disableControllerService(smbjClientProviderService);
}
@Test
public void shouldShowBulletinWhenShareIsInvalid() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner);
testRunner.setProperty(smbjClientProviderService, SHARE, "invalid_share");
testRunner.enableControllerService(smbjClientProviderService);
testRunner.run();
assertEquals(1, testRunner.getLogger().getErrorMessages().size());
testRunner.assertValid();
testRunner.disableControllerService(smbjClientProviderService);
}
@Test
public void shouldShowBulletinWhenSMBPortIsInvalid() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
final SmbClientProviderService smbClientProviderService =
configureTestRunnerForSambaDockerContainer(testRunner);
testRunner.setProperty(smbClientProviderService, PORT, "1");
testRunner.enableControllerService(smbClientProviderService);
testRunner.run();
assertEquals(1, testRunner.getLogger().getErrorMessages().size());
testRunner.assertValid();
testRunner.disableControllerService(smbClientProviderService);
}
@Test
public void shouldShowBulletinWhenSMBHostIsInvalid() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
final SmbClientProviderService smbClientProviderService =
configureTestRunnerForSambaDockerContainer(testRunner);
testRunner.setProperty(smbClientProviderService, HOSTNAME, "this.host.should.not.exists");
testRunner.enableControllerService(smbClientProviderService);
testRunner.run();
assertEquals(1, testRunner.getLogger().getErrorMessages().size());
testRunner.disableControllerService(smbClientProviderService);
}
@Test
public void shouldUseRecordWriterProperly() throws Exception {
final Set<String> testFiles = new HashSet<>(asList(
"1.txt",
"directory/2.txt",
"directory/subdirectory/3.txt",
"directory/subdirectory2/4.txt",
"directory/subdirectory3/5.txt"
));
testFiles.forEach(file -> writeFile(file, generateContentWithSize(4)));
final TestRunner testRunner = newTestRunner(ListSmb.class);
final MockRecordWriter writer = new MockRecordWriter(null, false);
final SimpleRecordSchema simpleRecordSchema = SmbListableEntity.getRecordSchema();
testRunner.addControllerService("writer", writer);
testRunner.enableControllerService(writer);
testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(RECORD_WRITER, "writer");
testRunner.setProperty(MINIMUM_AGE, "0 ms");
SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner);
testRunner.enableControllerService(smbjClientProviderService);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
final String result = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent();
final int identifierColumnIndex = simpleRecordSchema.getFieldNames().indexOf("identifier");
final Set<String> actual = Arrays.stream(result.split("\n"))
.map(row -> row.split(",")[identifierColumnIndex])
.collect(toSet());
assertEquals(testFiles, actual);
testRunner.assertValid();
testRunner.disableControllerService(smbjClientProviderService);
}
@Test
public void shouldWriteFlowFileAttributesProperly() throws Exception {
final Set<String> testFiles = new HashSet<>(asList(
"file_name", "directory/file_name", "directory/subdirectory/file_name"
));
testFiles.forEach(file -> writeFile(file, generateContentWithSize(4)));
final TestRunner testRunner = newTestRunner(ListSmb.class);
final SmbjClientProviderService smbjClientProviderService =
configureTestRunnerForSambaDockerContainer(testRunner);
testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(MINIMUM_AGE, "0 sec");
testRunner.enableControllerService(smbjClientProviderService);
testRunner.run(1);
testRunner.assertTransferCount(REL_SUCCESS, 3);
final Set<Map<String, String>> allAttributes = testRunner.getFlowFilesForRelationship(REL_SUCCESS)
.stream()
.map(MockFlowFile::getAttributes)
.collect(toSet());
final Set<String> identifiers = allAttributes.stream()
.map(attributes -> attributes.get("identifier"))
.collect(toSet());
assertEquals(testFiles, identifiers);
allAttributes.forEach(attribute -> assertEquals(
Stream.of(attribute.get("path"), attribute.get("filename")).filter(s -> !s.isEmpty()).collect(
Collectors.joining("/")),
attribute.get("absolute.path")));
final Set<String> fileNames = allAttributes.stream()
.map(attributes -> attributes.get("filename"))
.collect(toSet());
assertEquals(new HashSet<>(Arrays.asList("file_name")), fileNames);
testRunner.assertValid();
testRunner.disableControllerService(smbjClientProviderService);
}
@Test
public void shouldFilterFilesBySizeCriteria() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
final SmbClientProviderService smbClientProviderService =
configureTestRunnerForSambaDockerContainer(testRunner);
testRunner.enableControllerService(smbClientProviderService);
testRunner.setProperty(MINIMUM_AGE, "0 ms");
testRunner.setProperty(LISTING_STRATEGY, "none");
writeFile("1.txt", generateContentWithSize(1));
writeFile("10.txt", generateContentWithSize(10));
writeFile("100.txt", generateContentWithSize(100));
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 3);
testRunner.clearTransferState();
testRunner.setProperty(MINIMUM_SIZE, "10 B");
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 2);
testRunner.clearTransferState();
testRunner.setProperty(MINIMUM_SIZE, "50 B");
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.disableControllerService(smbClientProviderService);
}
@Test
public void shouldFilterByGivenSuffix() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
final SmbClientProviderService smbClientProviderService =
configureTestRunnerForSambaDockerContainer(testRunner);
testRunner.enableControllerService(smbClientProviderService);
testRunner.setProperty(MINIMUM_AGE, "0 ms");
testRunner.setProperty(FILE_NAME_SUFFIX_FILTER, ".suffix");
testRunner.setProperty(LISTING_STRATEGY, "none");
writeFile("should_list_this", generateContentWithSize(1));
writeFile("should_skip_this.suffix", generateContentWithSize(1));
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.disableControllerService(smbClientProviderService);
}
private SmbjClientProviderService configureTestRunnerForSambaDockerContainer(TestRunner testRunner)
throws Exception {
SmbjClientProviderService smbjClientProviderService = new SmbjClientProviderService();
testRunner.addControllerService("connection-pool", smbjClientProviderService);
testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "connection-pool");
testRunner.setProperty(smbjClientProviderService, HOSTNAME, sambaContainer.getHost());
testRunner.setProperty(smbjClientProviderService, PORT,
String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
testRunner.setProperty(smbjClientProviderService, USERNAME, "username");
testRunner.setProperty(smbjClientProviderService, PASSWORD, "password");
testRunner.setProperty(smbjClientProviderService, SHARE, "share");
testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain");
return smbjClientProviderService;
}
private String generateContentWithSize(int sizeInBytes) {
byte[] bytes = new byte[sizeInBytes];
fill(bytes, (byte) 1);
return new String(bytes);
}
private void writeFile(String path, String content) {
String containerPath = "/folder/" + path;
sambaContainer.copyFileToContainer(Transferable.of(content), containerPath);
}
}

View File

@ -0,0 +1,322 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.smb;
import static java.util.Arrays.stream;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_STRATEGY;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION;
import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_STATE_CACHE;
import static org.apache.nifi.processors.smb.ListSmb.DIRECTORY;
import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER;
import static org.apache.nifi.processors.smb.ListSmb.MAXIMUM_AGE;
import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE;
import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
import static org.apache.nifi.util.TestRunners.newTestRunner;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.processor.util.list.ListedEntity;
import org.apache.nifi.services.smb.SmbClientProviderService;
import org.apache.nifi.services.smb.SmbClientService;
import org.apache.nifi.services.smb.SmbListableEntity;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
class ListSmbTest {
private final static AtomicLong currentMillis = new AtomicLong();
private final static AtomicLong currentNanos = new AtomicLong();
public static final String CLIENT_SERVICE_PROVIDER_ID = "client-provider-service-id";
private static long currentMillis() {
return currentMillis.get();
}
private static long currentNanos() {
return currentNanos.get();
}
private static void setTime(Long timeInMillis) {
currentMillis.set(timeInMillis);
currentNanos.set(NANOSECONDS.convert(timeInMillis, MILLISECONDS));
}
private static void timePassed(Long timeInMillis) {
currentMillis.addAndGet(timeInMillis);
currentNanos.addAndGet(NANOSECONDS.convert(timeInMillis, MILLISECONDS));
}
@ParameterizedTest
@ValueSource(strings = {"timestamps", "entities"})
public void shouldResetStateWhenPropertiesChanged(String listingStrategy) throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
testRunner.setProperty(LISTING_STRATEGY, listingStrategy);
testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
testRunner.setProperty(MINIMUM_AGE, "0 ms");
final DistributedMapCacheClient cacheService = mockDistributedMap();
testRunner.addControllerService("cacheService", cacheService);
testRunner.setProperty(TRACKING_STATE_CACHE, "cacheService");
testRunner.enableControllerService(cacheService);
final SmbClientService mockNifiSmbClientService = configureTestRunnerWithMockedSambaClient(testRunner);
long now = System.currentTimeMillis();
mockSmbFolders(mockNifiSmbClientService,
listableEntity("should_list_this_again_after_property_change", now - 100));
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.clearTransferState();
testRunner.setProperty(DIRECTORY, "testDirectoryChanged");
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.clearTransferState();
testRunner.setProperty(FILE_NAME_SUFFIX_FILTER, "suffix_changed");
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.clearTransferState();
final SmbClientProviderService clientProviderService = mock(SmbClientProviderService.class);
when(clientProviderService.getIdentifier()).thenReturn("different-client-provider");
when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://localhost:445/share"));
when(clientProviderService.getClient()).thenReturn(mockNifiSmbClientService);
testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "different-client-provider");
testRunner.addControllerService("different-client-provider", clientProviderService);
testRunner.enableControllerService(clientProviderService);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertValid();
}
@ParameterizedTest
@ValueSource(longs = {1L, 50L, 150L, 3000L})
public void testShouldUseTimestampBasedStrategyProperly(Long minimumAge) throws Exception {
final TestRunner testRunner = newTestRunner(TimeMockingListSmb.class);
testRunner.setProperty(LISTING_STRATEGY, "timestamps");
testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
testRunner.setProperty(MINIMUM_AGE, minimumAge + " ms");
final SmbClientService mockNifiSmbClientService = configureTestRunnerWithMockedSambaClient(testRunner);
setTime(1000L);
mockSmbFolders(mockNifiSmbClientService);
testRunner.run();
verify(mockNifiSmbClientService).close();
mockSmbFolders(mockNifiSmbClientService,
listableEntity("first", 900),
listableEntity("second", 1000)
);
testRunner.run();
timePassed(100L);
mockSmbFolders(mockNifiSmbClientService,
listableEntity("first", 900),
listableEntity("second", 1000),
listableEntity("third", 1100)
);
testRunner.run();
timePassed(1L);
mockSmbFolders(mockNifiSmbClientService,
listableEntity("first", 900),
listableEntity("second", 1000),
listableEntity("third", 1100),
listableEntity("appeared_during_the_previous_iteration_and_it_was_missed", 1099)
);
timePassed(100L);
testRunner.run();
timePassed(minimumAge);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 4);
testRunner.assertValid();
}
@ParameterizedTest
@ValueSource(longs = {0L, 50L, 150L, 3000L})
public void testShouldUseEntityTrackingBasedStrategyProperly(Long minimumAge) throws Exception {
final TestRunner testRunner = newTestRunner(TimeMockingListSmb.class);
testRunner.setProperty(LISTING_STRATEGY, "entities");
testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
testRunner.setProperty(MINIMUM_AGE, minimumAge + " ms");
final DistributedMapCacheClient cacheService = mockDistributedMap();
testRunner.addControllerService("cacheService", cacheService);
testRunner.setProperty(TRACKING_STATE_CACHE, "cacheService");
testRunner.enableControllerService(cacheService);
final SmbClientService mockNifiSmbClientService = configureTestRunnerWithMockedSambaClient(testRunner);
setTime(1000L);
mockSmbFolders(mockNifiSmbClientService,
listableEntity("first", 1000)
);
testRunner.run();
verify(mockNifiSmbClientService).close();
timePassed(100L);
mockSmbFolders(mockNifiSmbClientService,
listableEntity("first", 1000),
listableEntity("second", 1100)
);
testRunner.run();
timePassed(minimumAge);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 2);
testRunner.assertValid();
}
@ParameterizedTest
@ValueSource(longs = {0L, 50L, 150L, 3000L})
public void testShouldUseNoTrackingBasedStrategyProperly(Long minimumAge) throws Exception {
final TestRunner testRunner = newTestRunner(TimeMockingListSmb.class);
testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
testRunner.setProperty(MINIMUM_AGE, minimumAge + " ms");
final SmbClientService mockNifiSmbClientService = configureTestRunnerWithMockedSambaClient(testRunner);
setTime(1000L);
mockSmbFolders(mockNifiSmbClientService,
listableEntity("first", 1000)
);
timePassed(minimumAge);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 2);
testRunner.assertValid();
}
@Test
public void testShouldFilterByFileAgeCriteria() throws Exception {
final TestRunner testRunner = newTestRunner(TimeMockingListSmb.class);
testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
testRunner.setProperty(MINIMUM_AGE, "10 ms");
testRunner.setProperty(MAXIMUM_AGE, "30 ms");
final SmbClientService mockNifiSmbClientService = configureTestRunnerWithMockedSambaClient(testRunner);
setTime(1000L);
mockSmbFolders(mockNifiSmbClientService,
listableEntity("too_young", 1000),
listableEntity("too_old", 1000 - 31),
listableEntity("should_list_this", 1000 - 11)
);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertValid();
}
@Test
public void shouldTurnSmbClientExceptionsToBulletins() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
testRunner.setProperty(LISTING_STRATEGY, "timestamps");
testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
final SmbClientService mockNifiSmbClientService = configureTestRunnerWithMockedSambaClient(testRunner);
when(mockNifiSmbClientService.listRemoteFiles(anyString())).thenThrow(new RuntimeException("test exception"));
testRunner.run();
assertEquals(1, testRunner.getLogger().getErrorMessages().size());
}
@Test
public void shouldFormatRemotePathProperly() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
final SmbClientProviderService clientProviderService = mockSmbConnectionPoolService();
testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, CLIENT_SERVICE_PROVIDER_ID);
testRunner.addControllerService(CLIENT_SERVICE_PROVIDER_ID, clientProviderService);
when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://hostname:445/share"));
final ListSmb underTest = (ListSmb) testRunner.getProcessor();
assertEquals("smb://hostname:445/share/", underTest.getPath(testRunner.getProcessContext()));
testRunner.setProperty(DIRECTORY, "/");
assertEquals("smb://hostname:445/share/", underTest.getPath(testRunner.getProcessContext()));
testRunner.setProperty(DIRECTORY, "root");
assertEquals("smb://hostname:445/share/root/", underTest.getPath(testRunner.getProcessContext()));
testRunner.setProperty(DIRECTORY, "root/subdirectory");
assertEquals("smb://hostname:445/share/root/subdirectory/", underTest.getPath(testRunner.getProcessContext()));
}
private SmbClientProviderService mockSmbConnectionPoolService() {
final SmbClientProviderService clientProviderService = mock(SmbClientProviderService.class);
when(clientProviderService.getIdentifier()).thenReturn(CLIENT_SERVICE_PROVIDER_ID);
when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://localhost:445/share"));
return clientProviderService;
}
private SmbClientService configureTestRunnerWithMockedSambaClient(TestRunner testRunner)
throws Exception {
final SmbClientService mockNifiSmbClientService = mock(SmbClientService.class);
testRunner.setProperty(DIRECTORY, "testDirectory");
final SmbClientProviderService clientProviderService = mockSmbConnectionPoolService();
when(clientProviderService.getClient()).thenReturn(mockNifiSmbClientService);
testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, CLIENT_SERVICE_PROVIDER_ID);
testRunner.addControllerService(CLIENT_SERVICE_PROVIDER_ID, clientProviderService);
testRunner.enableControllerService(clientProviderService);
return mockNifiSmbClientService;
}
private void mockSmbFolders(SmbClientService mockNifiSmbClientService, SmbListableEntity... entities) {
doAnswer(ignore -> stream(entities)).when(mockNifiSmbClientService).listRemoteFiles(anyString());
}
private SmbListableEntity listableEntity(String name, long timeStamp) {
return SmbListableEntity.builder()
.setName(name)
.setTimestamp(timeStamp)
.build();
}
private DistributedMapCacheClient mockDistributedMap() throws IOException {
final Map<String, ConcurrentHashMap<String, ListedEntity>> store = new ConcurrentHashMap<>();
final DistributedMapCacheClient cacheService = mock(DistributedMapCacheClient.class);
when(cacheService.putIfAbsent(any(), any(), any(), any())).thenReturn(false);
when(cacheService.containsKey(any(), any())).thenReturn(false);
when(cacheService.getIdentifier()).thenReturn("cacheService");
doAnswer(invocation -> store.get(invocation.getArgument(0)))
.when(cacheService).get(any(), any(), any());
doAnswer(invocation -> store.put(invocation.getArgument(0), invocation.getArgument(1)))
.when(cacheService).put(any(), any(), any(), any());
doAnswer(invocation -> Optional.ofNullable(invocation.getArgument(0)).map(store::remove).isPresent())
.when(cacheService).remove(any(), any());
return cacheService;
}
public static class TimeMockingListSmb extends ListSmb {
@Override
protected long getCurrentTime() {
return currentMillis();
}
@Override
protected long getCurrentNanoTime() {
return currentNanos();
}
}
}

View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-smb-bundle</artifactId>
<version>1.18.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-smb-smbj-client-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-smb-client-api-nar</artifactId>
<version>1.18.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-smb-smbj-client</artifactId>
<version>1.18.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-smb-bundle</artifactId>
<version>1.18.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-smb-smbj-client</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-smb-client-api</artifactId>
<version>1.18.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.18.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.hierynomus</groupId>
<artifactId>smbj</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.18.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.smb;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR;
import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR;
import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR;
import com.hierynomus.smbj.SMBClient;
import com.hierynomus.smbj.SmbConfig;
import com.hierynomus.smbj.auth.AuthenticationContext;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
@Tags({"samba, smb, cifs, files"})
@CapabilityDescription("Provides access to SMB Sessions with shared authentication credentials.")
public class SmbjClientProviderService extends AbstractControllerService implements SmbClientProviderService {
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.displayName("Hostname")
.name("hostname")
.description("The network host of the SMB file server.")
.required(true)
.addValidator(NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor DOMAIN = new PropertyDescriptor.Builder()
.displayName("Domain")
.name("domain")
.description(
"The domain used for authentication. Optional, in most cases username and password is sufficient.")
.required(false)
.addValidator(NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.displayName("Username")
.name("username")
.description(
"The username used for authentication.")
.required(false)
.defaultValue("Guest")
.addValidator(NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.displayName("Password")
.name("password")
.description("The password used for authentication.")
.required(false)
.addValidator(NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.displayName("Port")
.name("port")
.description("Port to use for connection.")
.required(true)
.addValidator(PORT_VALIDATOR)
.defaultValue("445")
.build();
public static final PropertyDescriptor SHARE = new PropertyDescriptor.Builder()
.displayName("Share")
.name("share")
.description("The network share to which files should be listed from. This is the \"first folder\"" +
"after the hostname: smb://hostname:port/[share]/dir1/dir2")
.required(true)
.addValidator(NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.displayName("Timeout")
.name("timeout")
.description("Timeout for read and write operations.")
.required(true)
.defaultValue("5 sec")
.addValidator(TIME_PERIOD_VALIDATOR)
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections
.unmodifiableList(asList(
HOSTNAME,
PORT,
SHARE,
USERNAME,
PASSWORD,
DOMAIN,
TIMEOUT
));
private SMBClient smbClient;
private AuthenticationContext authenticationContext;
private String hostname;
private int port;
private String shareName;
@Override
public SmbClientService getClient() throws IOException {
final SmbjClientService client = new SmbjClientService(smbClient, authenticationContext);
try {
client.connectToShare(hostname, port, shareName);
} catch (IOException e) {
client.forceFullyCloseConnection();
client.connectToShare(hostname, port, shareName);
}
return client;
}
@Override
public URI getServiceLocation() {
return URI.create(String.format("smb://%s:%d/%s", hostname, port, shareName));
}
@OnEnabled
public void onEnabled(ConfigurationContext context) {
this.hostname = context.getProperty(HOSTNAME).getValue();
this.port = context.getProperty(PORT).asInteger();
this.shareName = context.getProperty(SHARE).getValue();
this.smbClient = new SMBClient(SmbConfig.builder()
.withTimeout(context.getProperty(TIMEOUT).asTimePeriod(MILLISECONDS), MILLISECONDS)
.build());
createAuthenticationContext(context);
}
@OnDisabled
public void onDisabled() {
smbClient.close();
smbClient = null;
hostname = null;
port = 0;
shareName = null;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
private void createAuthenticationContext(ConfigurationContext context) {
if (context.getProperty(USERNAME).isSet()) {
final String userName = context.getProperty(USERNAME).getValue();
final String password =
context.getProperty(PASSWORD).isSet() ? context.getProperty(PASSWORD).getValue() : "";
final String domainOrNull =
context.getProperty(DOMAIN).isSet() ? context.getProperty(DOMAIN).getValue() : null;
authenticationContext = new AuthenticationContext(userName, password.toCharArray(), domainOrNull);
} else {
authenticationContext = AuthenticationContext.anonymous();
}
}
}

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.smb;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.stream.StreamSupport.stream;
import com.hierynomus.msdtyp.AccessMask;
import com.hierynomus.msfscc.FileAttributes;
import com.hierynomus.msfscc.fileinformation.FileIdBothDirectoryInformation;
import com.hierynomus.mssmb2.SMB2CreateDisposition;
import com.hierynomus.mssmb2.SMB2CreateOptions;
import com.hierynomus.mssmb2.SMB2ShareAccess;
import com.hierynomus.mssmb2.SMBApiException;
import com.hierynomus.smbj.SMBClient;
import com.hierynomus.smbj.auth.AuthenticationContext;
import com.hierynomus.smbj.connection.Connection;
import com.hierynomus.smbj.session.Session;
import com.hierynomus.smbj.share.Directory;
import com.hierynomus.smbj.share.DiskShare;
import com.hierynomus.smbj.share.Share;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Stream;
public class SmbjClientService implements SmbClientService {
private static final List<String> SPECIAL_DIRECTORIES = asList(".", "..");
final private AuthenticationContext authenticationContext;
final private SMBClient smbClient;
private Connection connection;
private Session session;
private DiskShare share;
public SmbjClientService(SMBClient smbClient, AuthenticationContext authenticationContext) {
this.smbClient = smbClient;
this.authenticationContext = authenticationContext;
}
public void connectToShare(String hostname, int port, String shareName) throws IOException {
Share share;
try {
connection = smbClient.connect(hostname, port);
session = connection.authenticate(authenticationContext);
share = session.connectShare(shareName);
} catch (Exception e) {
close();
throw new IOException("Could not connect to share " + format("%s:%d/%s", hostname, port, shareName), e);
}
if (share instanceof DiskShare) {
this.share = (DiskShare) share;
} else {
close();
throw new IllegalArgumentException("DiskShare not found. Share " +
share.getClass().getSimpleName() + " found on " + format("%s:%d/%s", hostname, port,
shareName));
}
}
public void forceFullyCloseConnection() {
try {
if (connection != null) {
connection.close(true);
}
} catch (IOException ignore) {
} finally {
connection = null;
}
}
@Override
public void close() {
try {
if (session != null) {
session.close();
}
} catch (IOException ignore) {
} finally {
session = null;
}
}
@Override
public Stream<SmbListableEntity> listRemoteFiles(String filePath) {
return Stream.of(filePath).flatMap(path -> {
final Directory directory = openDirectory(path);
return stream(directory::spliterator, 0, false)
.map(entity -> buildSmbListableEntity(entity, path))
.filter(entity -> !specialDirectory(entity))
.flatMap(listable -> listable.isDirectory() ? listRemoteFiles(listable.getPathWithName())
: Stream.of(listable))
.onClose(directory::close);
});
}
@Override
public void createDirectory(String path) {
final int lastDirectorySeparatorPosition = path.lastIndexOf("/");
if (lastDirectorySeparatorPosition > 0) {
createDirectory(path.substring(0, lastDirectorySeparatorPosition));
}
if (!share.folderExists(path)) {
share.mkdir(path);
}
}
private SmbListableEntity buildSmbListableEntity(FileIdBothDirectoryInformation info, String path) {
return SmbListableEntity.builder()
.setName(info.getFileName())
.setShortName(info.getShortName())
.setPath(path)
.setTimestamp(info.getLastWriteTime().toEpochMillis())
.setCreationTime(info.getCreationTime().toEpochMillis())
.setChangeTime(info.getChangeTime().toEpochMillis())
.setLastAccessTime(info.getLastAccessTime().toEpochMillis())
.setDirectory((info.getFileAttributes() & FileAttributes.FILE_ATTRIBUTE_DIRECTORY.getValue()) != 0)
.setSize(info.getEndOfFile())
.setAllocationSize(info.getAllocationSize())
.build();
}
private Directory openDirectory(String path) {
try {
return share.openDirectory(
path,
EnumSet.of(AccessMask.GENERIC_READ),
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_DIRECTORY),
EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
SMB2CreateDisposition.FILE_OPEN,
EnumSet.of(SMB2CreateOptions.FILE_DIRECTORY_FILE)
);
} catch (SMBApiException s) {
throw new RuntimeException("Could not open directory " + path + " due to " + s.getMessage(), s);
}
}
private boolean specialDirectory(SmbListableEntity entity) {
return SPECIAL_DIRECTORIES.contains(entity.getName());
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.services.smb.SmbjClientProviderService

View File

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.smb;
import static java.util.stream.Collectors.toSet;
import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN;
import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
import static org.apache.nifi.services.smb.SmbjClientProviderService.TIMEOUT;
import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.util.MockConfigurationContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.containers.ToxiproxyContainer.ContainerProxy;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;
public class NiFiSmbjClientIT {
private final static Logger sambaContainerLogger = LoggerFactory.getLogger("sambaContainer");
private final static Logger toxyProxyLogger = LoggerFactory.getLogger("toxiProxy");
private final Network network = Network.newNetwork();
private final GenericContainer<?> sambaContainer = new GenericContainer<>(DockerImageName.parse("dperson/samba"))
.withExposedPorts(139, 445)
.waitingFor(Wait.forListeningPort())
.withLogConsumer(new Slf4jLogConsumer(sambaContainerLogger))
.withNetwork(network)
.withNetworkAliases("samba")
.withCommand("-w domain -u username;password -s share;/folder;;no;no;username;;; -p");
private final ToxiproxyContainer toxiproxy = new ToxiproxyContainer("shopify/toxiproxy")
.withNetwork(network)
.withLogConsumer(new Slf4jLogConsumer(toxyProxyLogger))
.withNetworkAliases("toxiproxy");
@BeforeEach
public void beforeEach() {
sambaContainer.start();
toxiproxy.start();
}
@AfterEach
public void afterEach() {
toxiproxy.stop();
sambaContainer.stop();
}
@Test
public void shouldRescueAfterConnectionFailure() throws Exception {
writeFile("testDirectory/file", "content");
writeFile("testDirectory/directory1/file", "content");
writeFile("testDirectory/directory2/file", "content");
writeFile("testDirectory/directory2/nested_directory/file", "content");
ContainerProxy sambaProxy = toxiproxy.getProxy("samba", 445);
SmbjClientProviderService smbjClientProviderService = new SmbjClientProviderService();
ConfigurationContext context = mock(ConfigurationContext.class);
Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(HOSTNAME, sambaProxy.getContainerIpAddress());
properties.put(PORT, String.valueOf(sambaProxy.getProxyPort()));
properties.put(SHARE, "share");
properties.put(USERNAME, "username");
properties.put(PASSWORD, "password");
properties.put(DOMAIN, "domain");
properties.put(TIMEOUT, "0.5 sec");
MockConfigurationContext mockConfigurationContext = new MockConfigurationContext(properties, null);
smbjClientProviderService.onEnabled(mockConfigurationContext);
sambaProxy.toxics().latency("slow", ToxicDirection.DOWNSTREAM, 300);
AtomicInteger i = new AtomicInteger(0);
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
CountDownLatch latch = new CountDownLatch(100);
executorService.scheduleWithFixedDelay(() -> {
int iteration = i.getAndIncrement();
if (iteration > 100) {
return;
}
executorService.submit(() -> {
SmbClientService s = null;
try {
s = smbjClientProviderService.getClient();
if (iteration == 25) {
sambaProxy.setConnectionCut(true);
}
final Set<String> actual = s.listRemoteFiles("testDirectory")
.map(SmbListableEntity::getIdentifier)
.collect(toSet());
assertTrue(actual.contains("testDirectory/file"));
assertTrue(actual.contains("testDirectory/directory1/file"));
assertTrue(actual.contains("testDirectory/directory2/file"));
assertTrue(actual.contains("testDirectory/directory2/nested_directory/file"));
} catch (Exception e) {
if (iteration == 50) {
sambaProxy.setConnectionCut(false);
}
if (iteration == 100) {
fail();
}
} finally {
if (s != null) {
try {
s.close();
} catch (Exception e) {
}
}
}
latch.countDown();
});
}, 0, 2, TimeUnit.SECONDS);
latch.await();
executorService.shutdown();
smbjClientProviderService.onDisabled();
}
private void writeFile(String path, String content) {
String containerPath = "/folder/" + path;
sambaContainer.copyFileToContainer(Transferable.of(content), containerPath);
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.smb;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.hierynomus.smbj.SMBClient;
import com.hierynomus.smbj.auth.AuthenticationContext;
import com.hierynomus.smbj.connection.Connection;
import com.hierynomus.smbj.session.Session;
import com.hierynomus.smbj.share.DiskShare;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
class NiFiSmbjClientTest {
@Mock
DiskShare share;
@Mock
SMBClient smbClient;
@Mock
AuthenticationContext authenticationContext;
@Mock
Session session;
@Mock
Connection connection;
@InjectMocks
SmbjClientService underTest;
@BeforeEach
public void beforeEach() {
MockitoAnnotations.initMocks(this);
}
@Test
public void shouldCreateDirectoriesRecursively() throws Exception {
when(smbClient.connect("hostname", 445))
.thenReturn(connection);
when(connection.authenticate(authenticationContext)).thenReturn(session);
when(session.connectShare(anyString())).thenReturn(share);
when(share.fileExists("directory")).thenReturn(true);
when(share.fileExists("path")).thenReturn(false);
when(share.fileExists("to")).thenReturn(false);
when(share.fileExists("create")).thenReturn(false);
underTest.connectToShare("hostname", 445, "share");
underTest.createDirectory("directory/path/to/create");
verify(share).mkdir("directory/path");
verify(share).mkdir("directory/path/to");
verify(share).mkdir("directory/path/to/create");
}
}

View File

@ -26,7 +26,41 @@
<packaging>pom</packaging> <packaging>pom</packaging>
<modules> <modules>
<module>nifi-smb-client-api</module>
<module>nifi-smb-client-api-nar</module>
<module>nifi-smb-smbj-client</module>
<module>nifi-smb-smbj-client-nar</module>
<module>nifi-smb-processors</module> <module>nifi-smb-processors</module>
<module>nifi-smb-nar</module> <module>nifi-smb-nar</module>
</modules> </modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.hierynomus</groupId>
<artifactId>smbj</artifactId>
<version>0.11.5</version>
</dependency>
<dependency>
<groupId>net.engio</groupId>
<artifactId>mbassador</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.18.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<version>${testcontainers.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project> </project>

View File

@ -40,7 +40,6 @@
<flyway.version>8.4.2</flyway.version> <flyway.version>8.4.2</flyway.version>
<flyway.tests.version>7.0.0</flyway.tests.version> <flyway.tests.version>7.0.0</flyway.tests.version>
<swagger.ui.version>3.12.0</swagger.ui.version> <swagger.ui.version>3.12.0</swagger.ui.version>
<testcontainers.version>1.17.3</testcontainers.version>
<groovy.eclipse.compiler.version>3.4.0-01</groovy.eclipse.compiler.version> <groovy.eclipse.compiler.version>3.4.0-01</groovy.eclipse.compiler.version>
<jaxb.version>2.3.2</jaxb.version> <jaxb.version>2.3.2</jaxb.version>
<jgit.version>5.13.0.202109080827-r</jgit.version> <jgit.version>5.13.0.202109080827-r</jgit.version>

View File

@ -112,6 +112,7 @@
<org.apache.httpcomponents.httpclient.version>4.5.13</org.apache.httpcomponents.httpclient.version> <org.apache.httpcomponents.httpclient.version>4.5.13</org.apache.httpcomponents.httpclient.version>
<org.apache.httpcomponents.httpcore.version>4.4.15</org.apache.httpcomponents.httpcore.version> <org.apache.httpcomponents.httpcore.version>4.4.15</org.apache.httpcomponents.httpcore.version>
<org.bouncycastle.version>1.70</org.bouncycastle.version> <org.bouncycastle.version>1.70</org.bouncycastle.version>
<testcontainers.version>1.17.3</testcontainers.version>
<org.slf4j.version>1.7.36</org.slf4j.version> <org.slf4j.version>1.7.36</org.slf4j.version>
<ranger.version>2.2.0</ranger.version> <ranger.version>2.2.0</ranger.version>
<jetty.version>9.4.48.v20220622</jetty.version> <jetty.version>9.4.48.v20220622</jetty.version>