diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 338e90c1c4..caf87d0a18 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -664,6 +664,18 @@ language governing permissions and limitations under the License. --> 1.18.0-SNAPSHOT nar + + org.apache.nifi + nifi-smb-client-api-nar + 1.18.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-smb-smbj-client-nar + 1.18.0-SNAPSHOT + nar + org.apache.nifi nifi-windows-event-log-nar diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 922907c8dc..a92cb34575 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -182,7 +182,7 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) { - run(iterations, stopOnFinish, initialize, 5000); + run(iterations, stopOnFinish, initialize, 5000 + iterations * runSchedule); } @Override diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java index 682450211e..30a0ca2baf 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java @@ -543,6 +543,10 @@ public abstract class AbstractListProcessor extends Ab return System.currentTimeMillis(); } + protected long getCurrentNanoTime() { + return System.nanoTime(); + } + public void listByNoTracking(final ProcessContext context, final ProcessSession session) { final List entityList; @@ -655,6 +659,7 @@ public abstract class AbstractListProcessor extends Ab .computeIfAbsent(entity.getTimestamp(), __ -> new ArrayList<>()) .add(entity) ); + if (getLogger().isTraceEnabled()) { getLogger().trace("orderedEntries: " + orderedEntries.values().stream() @@ -744,8 +749,8 @@ public abstract class AbstractListProcessor extends Ab } final List entityList; - final long currentRunTimeNanos = System.nanoTime(); - final long currentRunTimeMillis = System.currentTimeMillis(); + final long currentRunTimeNanos = getCurrentNanoTime(); + final long currentRunTimeMillis = getCurrentTime(); try { // track of when this last executed for consideration of the lag nanos entityList = performListing(context, minTimestampToListMillis, ListingMode.EXECUTION); @@ -1100,7 +1105,7 @@ public abstract class AbstractListProcessor extends Ab } protected ListedEntityTracker createListedEntityTracker() { - return new ListedEntityTracker<>(getIdentifier(), getLogger(), getRecordSchema()); + return new ListedEntityTracker<>(getIdentifier(), getLogger(), this::getCurrentTime, getRecordSchema()); } private void listByTrackingEntities(ProcessContext context, ProcessSession session) throws ProcessException { diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api-nar/pom.xml b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api-nar/pom.xml new file mode 100644 index 0000000000..925d9b93ee --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api-nar/pom.xml @@ -0,0 +1,45 @@ + + + + + org.apache.nifi + nifi-smb-bundle + 1.18.0-SNAPSHOT + + 4.0.0 + + nifi-smb-client-api-nar + nar + + + true + true + + + + + org.apache.nifi + nifi-standard-services-api-nar + 1.18.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-smb-client-api + 1.18.0-SNAPSHOT + + + diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/pom.xml b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/pom.xml new file mode 100644 index 0000000000..1c3a3b04b2 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/pom.xml @@ -0,0 +1,40 @@ + + + 4.0.0 + + org.apache.nifi + nifi-smb-bundle + 1.18.0-SNAPSHOT + + nifi-smb-client-api + jar + + + org.apache.nifi + nifi-api + 1.18.0-SNAPSHOT + + + org.apache.nifi + nifi-listed-entity + 1.18.0-SNAPSHOT + + + org.apache.nifi + nifi-record + + + diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientProviderService.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientProviderService.java new file mode 100644 index 0000000000..3dc23f1aa0 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientProviderService.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import java.io.IOException; +import java.net.URI; +import org.apache.nifi.controller.ControllerService; + +public interface SmbClientProviderService extends ControllerService { + + /** + * Returns the identifier of the service location. + * + * @return the remote location + */ + URI getServiceLocation(); + + /** + * Returns the smb client to use. + * + * @return the client. + */ + SmbClientService getClient() throws IOException; + +} diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java new file mode 100644 index 0000000000..4ecaf9a507 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import java.util.stream.Stream; + +/** + * Service abstraction for Server Message Block protocol operations. + */ +public interface SmbClientService extends AutoCloseable { + + Stream listRemoteFiles(String path); + + void createDirectory(String path); + +} diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java new file mode 100644 index 0000000000..da33602540 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.nifi.processor.util.list.ListableEntity; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; + +public class SmbListableEntity implements ListableEntity { + + private final String name; + private final String shortName; + private final String path; + private final long timestamp; + private final long creationTime; + private final long lastAccessTime; + private final long changeTime; + private final boolean directory; + private final long size; + private final long allocationSize; + + private SmbListableEntity(String name, String shortName, String path, long timestamp, long creationTime, + long lastAccessTime, long changeTime, boolean directory, + long size, long allocationSize) { + this.name = name; + this.shortName = shortName; + this.path = path; + this.timestamp = timestamp; + this.creationTime = creationTime; + this.lastAccessTime = lastAccessTime; + this.changeTime = changeTime; + this.directory = directory; + this.size = size; + this.allocationSize = allocationSize; + } + + public static SimpleRecordSchema getRecordSchema() { + List fields = Arrays.asList( + new RecordField("filename", RecordFieldType.STRING.getDataType(), false), + new RecordField("shortName", RecordFieldType.STRING.getDataType(), false), + new RecordField("path", RecordFieldType.STRING.getDataType(), false), + new RecordField("identifier", RecordFieldType.STRING.getDataType(), false), + new RecordField("timestamp", RecordFieldType.LONG.getDataType(), false), + new RecordField("creationTime", RecordFieldType.LONG.getDataType(), false), + new RecordField("lastAccessTime", RecordFieldType.LONG.getDataType(), false), + new RecordField("changeTime", RecordFieldType.LONG.getDataType(), false), + new RecordField("size", RecordFieldType.LONG.getDataType(), false), + new RecordField("allocationSize", RecordFieldType.LONG.getDataType(), false) + ); + return new SimpleRecordSchema(fields); + } + + public static SmbListableEntityBuilder builder() { + return new SmbListableEntityBuilder(); + } + + public String getShortName() { + return shortName; + } + + public long getCreationTime() { + return creationTime; + } + + public long getLastAccessTime() { + return lastAccessTime; + } + + public long getChangeTime() { + return changeTime; + } + + public long getAllocationSize() { + return allocationSize; + } + + @Override + public String getName() { + return name; + } + + public String getPath() { + return path; + } + + public String getPathWithName() { + return path.isEmpty() ? name : path + "/" + name; + } + + @Override + public String getIdentifier() { + return getPathWithName(); + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public long getSize() { + return size; + } + + public boolean isDirectory() { + return directory; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SmbListableEntity that = (SmbListableEntity) o; + return getPathWithName().equals(that.getPathWithName()); + } + + @Override + public int hashCode() { + return getPathWithName().hashCode(); + } + + @Override + public String toString() { + return getPathWithName() + " (last write: " + timestamp + " size: " + size + ")"; + } + + @Override + public Record toRecord() { + final Map record = new TreeMap<>(); + record.put("filename", getName()); + record.put("shortName", getShortName()); + record.put("path", path); + record.put("identifier", getPathWithName()); + record.put("timestamp", getTimestamp()); + record.put("creationTime", getCreationTime()); + record.put("lastAccessTime", getLastAccessTime()); + record.put("size", getSize()); + record.put("allocationSize", getAllocationSize()); + return new MapRecord(getRecordSchema(), record); + } + + public static class SmbListableEntityBuilder { + + private String name; + private String shortName; + private String path = ""; + private long timestamp; + private long creationTime; + private long lastAccessTime; + private long changeTime; + private boolean directory = false; + private long size = 0; + private long allocationSize = 0; + + public SmbListableEntityBuilder setName(String name) { + this.name = name; + return this; + } + + public SmbListableEntityBuilder setShortName(String shortName) { + this.shortName = shortName; + return this; + } + + public SmbListableEntityBuilder setPath(String path) { + this.path = path; + return this; + } + + public SmbListableEntityBuilder setTimestamp(long timestamp) { + this.timestamp = timestamp; + return this; + } + + public SmbListableEntityBuilder setCreationTime(long creationTime) { + this.creationTime = creationTime; + return this; + } + + public SmbListableEntityBuilder setLastAccessTime(long lastAccessTime) { + this.lastAccessTime = lastAccessTime; + return this; + } + + public SmbListableEntityBuilder setChangeTime(long changeTime) { + this.changeTime = changeTime; + return this; + } + + public SmbListableEntityBuilder setDirectory(boolean directory) { + this.directory = directory; + return this; + } + + public SmbListableEntityBuilder setSize(long size) { + this.size = size; + return this; + } + + public SmbListableEntityBuilder setAllocationSize(long allocationSize) { + this.allocationSize = allocationSize; + return this; + } + + public SmbListableEntity build() { + return new SmbListableEntity(name, shortName, path, timestamp, creationTime, lastAccessTime, changeTime, + directory, size, allocationSize); + } + } + +} diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-nar/pom.xml b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-nar/pom.xml index 25255fd5af..61c19970a9 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-nar/pom.xml +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-nar/pom.xml @@ -35,5 +35,11 @@ nifi-smb-processors 1.18.0-SNAPSHOT + + org.apache.nifi + nifi-smb-client-api-nar + 1.18.0-SNAPSHOT + nar + diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/pom.xml b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/pom.xml index 58b04c5898..de266368b2 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/pom.xml +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/pom.xml @@ -26,10 +26,24 @@ jar + + org.apache.nifi + nifi-smb-client-api + 1.18.0-SNAPSHOT + provided + org.apache.nifi nifi-api + + org.apache.nifi + nifi-record + + + org.apache.nifi + nifi-record-serialization-service-api + org.apache.nifi nifi-utils @@ -38,7 +52,10 @@ com.hierynomus smbj - 0.10.0 + + + commons-io + commons-io org.apache.nifi @@ -46,5 +63,26 @@ 1.18.0-SNAPSHOT test + + org.testcontainers + testcontainers + test + + + org.apache.nifi + nifi-mock-record-utils + test + + + org.apache.nifi + nifi-smb-smbj-client + 1.18.0-SNAPSHOT + test + + + org.apache.nifi + nifi-distributed-cache-client-service-api + test + diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java new file mode 100644 index 0000000000..e0029144f6 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbClientProviderService; +import org.apache.nifi.services.smb.SmbClientService; +import org.apache.nifi.services.smb.SmbListableEntity; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"samba, smb, cifs, files", "list"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Lists concrete files shared via SMB protocol. " + + "Each listed file may result in one flowfile, the metadata being written as flowfile attributes. " + + "Or - in case the 'Record Writer' property is set - the entire result is written as records to a single flowfile. " + + + "This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the " + + + "previous node left off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file then the path attribute will be set to \"sub/folder/file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listen from " + + "SHARE/DIRECTORY/sub/folder/file then the absolute.path attribute will be set to " + + "SHARE/DIRECTORY/sub/folder/file."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file's content changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "createTime", description = + "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "lastAccessTime", description = + "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "changeTime", description = + "The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), + @WritesAttribute(attribute = "allocationSize", description = "The number of bytes allocated for the file on the server"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder from which to list files. This is the remaining relative path " + + "after the share: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]/sub/directories. It is also possible " + + "to add subdirectories. The given path on the remote file share must exist. " + + "This can be checked using verification. You may mix Windows and Linux-style " + + "directory separators.") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum File Age") + .name("min-file-age") + .description("The minimum age that a file must be in order to be listed; any file younger than this " + + "amount of time will be ignored.") + .required(true) + .addValidator(TIME_PERIOD_VALIDATOR) + .defaultValue("5 secs") + .build(); + + public static final PropertyDescriptor MAXIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Maximum File Age") + .name("max-file-age") + .description("Any file older than the given value will be omitted. ") + .required(false) + .addValidator(TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Minimum File Size") + .name("min-file-size") + .description("Any file smaller than the given value will be omitted.") + .required(false) + .addValidator(DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAXIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Maximum File Size") + .name("max-file-size") + .description("Any file larger than the given value will be omitted.") + .required(false) + .addValidator(DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor SMB_LISTING_STRATEGY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(LISTING_STRATEGY) + .allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS) + .build(); + + public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder() + .name("smb-client-provider-service") + .displayName("SMB Client Provider Service") + .description("Specifies the SMB client provider to use for creating SMB connections.") + .required(true) + .identifiesControllerService(SmbClientProviderService.class) + .build(); + + public static final PropertyDescriptor FILE_NAME_SUFFIX_FILTER = new Builder() + .name("file-name-suffix-filter") + .displayName("File Name Suffix Filter") + .description("Files ending with the given suffix will be omitted. Can be used to make sure that files " + + "that are still uploading are not listed multiple times, by having those files have a suffix " + + "and remove the suffix once the upload finishes. This is highly recommended when using " + + "'Tracking Entities' or 'Tracking Timestamps' listing strategies.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .addValidator(new MustNotContainDirectorySeparatorsValidator()) + .build(); + + private static final List PROPERTIES = unmodifiableList(asList( + SMB_LISTING_STRATEGY, + SMB_CLIENT_PROVIDER_SERVICE, + DIRECTORY, + AbstractListProcessor.RECORD_WRITER, + FILE_NAME_SUFFIX_FILTER, + MINIMUM_AGE, + MAXIMUM_AGE, + MINIMUM_SIZE, + MAXIMUM_SIZE, + AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, + ListedEntityTracker.TRACKING_STATE_CACHE, + ListedEntityTracker.TRACKING_TIME_WINDOW, + ListedEntityTracker.INITIAL_LISTING_TARGET + )); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected Map createAttributes(SmbListableEntity entity, ProcessContext context) { + final Map attributes = new TreeMap<>(); + attributes.put("filename", entity.getName()); + attributes.put("shortname", entity.getShortName()); + attributes.put("path", entity.getPath()); + attributes.put("absolute.path", entity.getPathWithName()); + attributes.put("identifier", entity.getIdentifier()); + attributes.put("timestamp", formatTimeStamp(entity.getTimestamp())); + attributes.put("creationTime", formatTimeStamp(entity.getCreationTime())); + attributes.put("lastAccessTime", formatTimeStamp(entity.getLastAccessTime())); + attributes.put("changeTime", formatTimeStamp(entity.getChangeTime())); + attributes.put("size", String.valueOf(entity.getSize())); + attributes.put("allocationSize", String.valueOf(entity.getAllocationSize())); + return unmodifiableMap(attributes); + } + + @Override + protected String getPath(ProcessContext context) { + final SmbClientProviderService clientProviderService = + context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class); + final URI serviceLocation = clientProviderService.getServiceLocation(); + final String directory = getDirectory(context); + return String.format("%s/%s", serviceLocation.toString(), directory.isEmpty() ? "" : directory + "/"); + } + + @Override + protected List performListing(ProcessContext context, Long minimumTimestampOrNull, + ListingMode listingMode) throws IOException { + + final Predicate fileFilter = + createFileFilter(context, minimumTimestampOrNull); + + try (Stream listing = performListing(context)) { + final Iterator iterator = listing.iterator(); + final List result = new LinkedList<>(); + while (iterator.hasNext()) { + if (isExecutionStopped(listingMode)) { + return emptyList(); + } + final SmbListableEntity entity = iterator.next(); + if (fileFilter.test(entity)) { + result.add(entity); + } + } + return result; + } catch (Exception e) { + throw new IOException("Could not perform listing", e); + } + } + + @Override + protected boolean isListingResetNecessary(PropertyDescriptor property) { + return asList(SMB_CLIENT_PROVIDER_SERVICE, DIRECTORY, FILE_NAME_SUFFIX_FILTER).contains(property); + } + + @Override + protected Scope getStateScope(PropertyContext context) { + return CLUSTER; + } + + @Override + protected RecordSchema getRecordSchema() { + return SmbListableEntity.getRecordSchema(); + } + + @Override + protected Integer countUnfilteredListing(ProcessContext context) throws IOException { + try (Stream listing = performListing(context)) { + return Long.valueOf(listing.count()).intValue(); + } catch (Exception e) { + throw new IOException("Could not count files", e); + } + } + + @Override + protected String getListingContainerName(ProcessContext context) { + return String.format("Remote Directory [%s]", getPath(context)); + } + + private String formatTimeStamp(long timestamp) { + return ISO_DATE_TIME.format( + LocalDateTime.ofEpochSecond(TimeUnit.MILLISECONDS.toSeconds(timestamp), 0, ZoneOffset.UTC)); + } + + private boolean isExecutionStopped(ListingMode listingMode) { + return ListingMode.EXECUTION.equals(listingMode) && !isScheduled(); + } + + private Predicate createFileFilter(ProcessContext context, Long minTimestampOrNull) { + + final Long minimumAge = context.getProperty(MINIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final Long maximumAgeOrNull = context.getProperty(MAXIMUM_AGE).isSet() ? context.getProperty(MAXIMUM_AGE) + .asTimePeriod(TimeUnit.MILLISECONDS) : null; + final Double minimumSizeOrNull = + context.getProperty(MINIMUM_SIZE).isSet() ? context.getProperty(MINIMUM_SIZE).asDataSize(DataUnit.B) + : null; + final Double maximumSizeOrNull = + context.getProperty(MAXIMUM_SIZE).isSet() ? context.getProperty(MAXIMUM_SIZE).asDataSize(DataUnit.B) + : null; + final String suffixOrNull = context.getProperty(FILE_NAME_SUFFIX_FILTER).getValue(); + + final long now = getCurrentTime(); + Predicate filter = entity -> now - entity.getTimestamp() >= minimumAge; + + if (maximumAgeOrNull != null) { + filter = filter.and(entity -> now - entity.getTimestamp() <= maximumAgeOrNull); + } + + if (minTimestampOrNull != null) { + filter = filter.and(entity -> entity.getTimestamp() >= minTimestampOrNull); + } + + if (minimumSizeOrNull != null) { + filter = filter.and(entity -> entity.getSize() >= minimumSizeOrNull); + } + + if (maximumSizeOrNull != null) { + filter = filter.and(entity -> entity.getSize() <= maximumSizeOrNull); + } + + if (suffixOrNull != null) { + filter = filter.and(entity -> !entity.getName().endsWith(suffixOrNull)); + } + + return filter; + } + + private Stream performListing(ProcessContext context) throws IOException { + final SmbClientProviderService clientProviderService = + context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class); + final String directory = getDirectory(context); + final SmbClientService clientService = clientProviderService.getClient(); + return clientService.listRemoteFiles(directory).onClose(() -> { + try { + clientService.close(); + } catch (Exception e) { + throw new RuntimeException("Could not close samba client", e); + } + }); + } + + private String getDirectory(ProcessContext context) { + final PropertyValue property = context.getProperty(DIRECTORY); + final String directory = property.isSet() ? property.getValue().replace('\\', '/') : ""; + return directory.equals("/") ? "" : directory; + } + + private static class MustNotContainDirectorySeparatorsValidator implements Validator { + + @Override + public ValidationResult validate(String subject, String value, ValidationContext context) { + return new ValidationResult.Builder() + .subject(subject) + .input(value) + .valid(!value.contains("/")) + .explanation(subject + " must not contain any folder separator character.") + .build(); + } + + } + +} diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index bc5320b99b..0dd9276bc0 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,5 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +org.apache.nifi.processors.smb.GetSmbFile +org.apache.nifi.processors.smb.ListSmb org.apache.nifi.processors.smb.PutSmbFile -org.apache.nifi.processors.smb.GetSmbFile \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java new file mode 100644 index 0000000000..8ef5aa7ba3 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.util.Arrays.asList; +import static java.util.Arrays.fill; +import static java.util.stream.Collectors.toSet; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_STRATEGY; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.RECORD_WRITER; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; +import static org.apache.nifi.processors.smb.ListSmb.DIRECTORY; +import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER; +import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE; +import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_SIZE; +import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE; +import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN; +import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME; +import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD; +import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT; +import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE; +import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME; +import static org.apache.nifi.util.TestRunners.newTestRunner; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.services.smb.SmbClientProviderService; +import org.apache.nifi.services.smb.SmbListableEntity; +import org.apache.nifi.services.smb.SmbjClientProviderService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.utility.DockerImageName; + +public class ListSmbIT { + + private final static Integer DEFAULT_SAMBA_PORT = 445; + private final static Logger logger = LoggerFactory.getLogger(ListSmbTest.class); + private final GenericContainer sambaContainer = new GenericContainer<>(DockerImageName.parse("dperson/samba")) + .withExposedPorts(DEFAULT_SAMBA_PORT, 139) + .waitingFor(Wait.forListeningPort()) + .withLogConsumer(new Slf4jLogConsumer(logger)) + .withCommand("-w domain -u username;password -s share;/folder;;no;no;username;;; -p"); + + @BeforeEach + public void beforeEach() { + sambaContainer.start(); + } + + @AfterEach + public void afterEach() { + sambaContainer.stop(); + } + + @ParameterizedTest + @ValueSource(ints = {4, 50, 45000}) + public void shouldFillSizeAttributeProperly(int size) throws Exception { + writeFile("1.txt", generateContentWithSize(size)); + final TestRunner testRunner = newTestRunner(ListSmb.class); + testRunner.setProperty(LISTING_STRATEGY, "none"); + testRunner.setProperty(MINIMUM_AGE, "0 ms"); + SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner); + testRunner.enableControllerService(smbjClientProviderService); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.getFlowFilesForRelationship(REL_SUCCESS) + .forEach(flowFile -> assertEquals(size, Integer.valueOf(flowFile.getAttribute("size")))); + testRunner.assertValid(); + testRunner.disableControllerService(smbjClientProviderService); + } + + @Test + public void shouldShowBulletinOnMissingDirectory() throws Exception { + final TestRunner testRunner = newTestRunner(ListSmb.class); + testRunner.setProperty(LISTING_STRATEGY, "none"); + testRunner.setProperty(MINIMUM_AGE, "0 ms"); + testRunner.setProperty(DIRECTORY, "folderDoesNotExists"); + SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner); + testRunner.enableControllerService(smbjClientProviderService); + testRunner.run(); + assertEquals(1, testRunner.getLogger().getErrorMessages().size()); + testRunner.assertValid(); + testRunner.disableControllerService(smbjClientProviderService); + } + + @Test + public void shouldShowBulletinWhenShareIsInvalid() throws Exception { + final TestRunner testRunner = newTestRunner(ListSmb.class); + SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner); + testRunner.setProperty(smbjClientProviderService, SHARE, "invalid_share"); + testRunner.enableControllerService(smbjClientProviderService); + testRunner.run(); + assertEquals(1, testRunner.getLogger().getErrorMessages().size()); + testRunner.assertValid(); + testRunner.disableControllerService(smbjClientProviderService); + } + + @Test + public void shouldShowBulletinWhenSMBPortIsInvalid() throws Exception { + final TestRunner testRunner = newTestRunner(ListSmb.class); + final SmbClientProviderService smbClientProviderService = + configureTestRunnerForSambaDockerContainer(testRunner); + testRunner.setProperty(smbClientProviderService, PORT, "1"); + testRunner.enableControllerService(smbClientProviderService); + testRunner.run(); + assertEquals(1, testRunner.getLogger().getErrorMessages().size()); + testRunner.assertValid(); + testRunner.disableControllerService(smbClientProviderService); + } + + @Test + public void shouldShowBulletinWhenSMBHostIsInvalid() throws Exception { + final TestRunner testRunner = newTestRunner(ListSmb.class); + final SmbClientProviderService smbClientProviderService = + configureTestRunnerForSambaDockerContainer(testRunner); + testRunner.setProperty(smbClientProviderService, HOSTNAME, "this.host.should.not.exists"); + testRunner.enableControllerService(smbClientProviderService); + testRunner.run(); + assertEquals(1, testRunner.getLogger().getErrorMessages().size()); + testRunner.disableControllerService(smbClientProviderService); + } + + @Test + public void shouldUseRecordWriterProperly() throws Exception { + final Set testFiles = new HashSet<>(asList( + "1.txt", + "directory/2.txt", + "directory/subdirectory/3.txt", + "directory/subdirectory2/4.txt", + "directory/subdirectory3/5.txt" + )); + testFiles.forEach(file -> writeFile(file, generateContentWithSize(4))); + + final TestRunner testRunner = newTestRunner(ListSmb.class); + final MockRecordWriter writer = new MockRecordWriter(null, false); + final SimpleRecordSchema simpleRecordSchema = SmbListableEntity.getRecordSchema(); + testRunner.addControllerService("writer", writer); + testRunner.enableControllerService(writer); + testRunner.setProperty(LISTING_STRATEGY, "none"); + testRunner.setProperty(RECORD_WRITER, "writer"); + testRunner.setProperty(MINIMUM_AGE, "0 ms"); + SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner); + testRunner.enableControllerService(smbjClientProviderService); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 1); + final String result = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent(); + final int identifierColumnIndex = simpleRecordSchema.getFieldNames().indexOf("identifier"); + final Set actual = Arrays.stream(result.split("\n")) + .map(row -> row.split(",")[identifierColumnIndex]) + .collect(toSet()); + assertEquals(testFiles, actual); + testRunner.assertValid(); + testRunner.disableControllerService(smbjClientProviderService); + } + + @Test + public void shouldWriteFlowFileAttributesProperly() throws Exception { + final Set testFiles = new HashSet<>(asList( + "file_name", "directory/file_name", "directory/subdirectory/file_name" + )); + testFiles.forEach(file -> writeFile(file, generateContentWithSize(4))); + final TestRunner testRunner = newTestRunner(ListSmb.class); + final SmbjClientProviderService smbjClientProviderService = + configureTestRunnerForSambaDockerContainer(testRunner); + testRunner.setProperty(LISTING_STRATEGY, "none"); + testRunner.setProperty(MINIMUM_AGE, "0 sec"); + testRunner.enableControllerService(smbjClientProviderService); + testRunner.run(1); + testRunner.assertTransferCount(REL_SUCCESS, 3); + final Set> allAttributes = testRunner.getFlowFilesForRelationship(REL_SUCCESS) + .stream() + .map(MockFlowFile::getAttributes) + .collect(toSet()); + + final Set identifiers = allAttributes.stream() + .map(attributes -> attributes.get("identifier")) + .collect(toSet()); + assertEquals(testFiles, identifiers); + + allAttributes.forEach(attribute -> assertEquals( + Stream.of(attribute.get("path"), attribute.get("filename")).filter(s -> !s.isEmpty()).collect( + Collectors.joining("/")), + attribute.get("absolute.path"))); + + final Set fileNames = allAttributes.stream() + .map(attributes -> attributes.get("filename")) + .collect(toSet()); + + assertEquals(new HashSet<>(Arrays.asList("file_name")), fileNames); + + testRunner.assertValid(); + testRunner.disableControllerService(smbjClientProviderService); + } + + @Test + public void shouldFilterFilesBySizeCriteria() throws Exception { + final TestRunner testRunner = newTestRunner(ListSmb.class); + final SmbClientProviderService smbClientProviderService = + configureTestRunnerForSambaDockerContainer(testRunner); + testRunner.enableControllerService(smbClientProviderService); + testRunner.setProperty(MINIMUM_AGE, "0 ms"); + testRunner.setProperty(LISTING_STRATEGY, "none"); + + writeFile("1.txt", generateContentWithSize(1)); + writeFile("10.txt", generateContentWithSize(10)); + writeFile("100.txt", generateContentWithSize(100)); + + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 3); + testRunner.clearTransferState(); + + testRunner.setProperty(MINIMUM_SIZE, "10 B"); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 2); + testRunner.clearTransferState(); + + testRunner.setProperty(MINIMUM_SIZE, "50 B"); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 1); + + testRunner.disableControllerService(smbClientProviderService); + + } + + @Test + public void shouldFilterByGivenSuffix() throws Exception { + final TestRunner testRunner = newTestRunner(ListSmb.class); + final SmbClientProviderService smbClientProviderService = + configureTestRunnerForSambaDockerContainer(testRunner); + testRunner.enableControllerService(smbClientProviderService); + testRunner.setProperty(MINIMUM_AGE, "0 ms"); + testRunner.setProperty(FILE_NAME_SUFFIX_FILTER, ".suffix"); + testRunner.setProperty(LISTING_STRATEGY, "none"); + writeFile("should_list_this", generateContentWithSize(1)); + writeFile("should_skip_this.suffix", generateContentWithSize(1)); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.disableControllerService(smbClientProviderService); + } + + private SmbjClientProviderService configureTestRunnerForSambaDockerContainer(TestRunner testRunner) + throws Exception { + SmbjClientProviderService smbjClientProviderService = new SmbjClientProviderService(); + testRunner.addControllerService("connection-pool", smbjClientProviderService); + testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "connection-pool"); + testRunner.setProperty(smbjClientProviderService, HOSTNAME, sambaContainer.getHost()); + testRunner.setProperty(smbjClientProviderService, PORT, + String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT))); + testRunner.setProperty(smbjClientProviderService, USERNAME, "username"); + testRunner.setProperty(smbjClientProviderService, PASSWORD, "password"); + testRunner.setProperty(smbjClientProviderService, SHARE, "share"); + testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain"); + return smbjClientProviderService; + } + + private String generateContentWithSize(int sizeInBytes) { + byte[] bytes = new byte[sizeInBytes]; + fill(bytes, (byte) 1); + return new String(bytes); + } + + private void writeFile(String path, String content) { + String containerPath = "/folder/" + path; + sambaContainer.copyFileToContainer(Transferable.of(content), containerPath); + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java new file mode 100644 index 0000000000..e279bf2e85 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.util.Arrays.stream; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_STRATEGY; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION; +import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_STATE_CACHE; +import static org.apache.nifi.processors.smb.ListSmb.DIRECTORY; +import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER; +import static org.apache.nifi.processors.smb.ListSmb.MAXIMUM_AGE; +import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE; +import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE; +import static org.apache.nifi.util.TestRunners.newTestRunner; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.URI; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.processor.util.list.ListedEntity; +import org.apache.nifi.services.smb.SmbClientProviderService; +import org.apache.nifi.services.smb.SmbClientService; +import org.apache.nifi.services.smb.SmbListableEntity; +import org.apache.nifi.util.TestRunner; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class ListSmbTest { + + private final static AtomicLong currentMillis = new AtomicLong(); + private final static AtomicLong currentNanos = new AtomicLong(); + public static final String CLIENT_SERVICE_PROVIDER_ID = "client-provider-service-id"; + + private static long currentMillis() { + return currentMillis.get(); + } + + private static long currentNanos() { + return currentNanos.get(); + } + + private static void setTime(Long timeInMillis) { + currentMillis.set(timeInMillis); + currentNanos.set(NANOSECONDS.convert(timeInMillis, MILLISECONDS)); + } + + private static void timePassed(Long timeInMillis) { + currentMillis.addAndGet(timeInMillis); + currentNanos.addAndGet(NANOSECONDS.convert(timeInMillis, MILLISECONDS)); + } + + @ParameterizedTest + @ValueSource(strings = {"timestamps", "entities"}) + public void shouldResetStateWhenPropertiesChanged(String listingStrategy) throws Exception { + final TestRunner testRunner = newTestRunner(ListSmb.class); + testRunner.setProperty(LISTING_STRATEGY, listingStrategy); + testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis"); + testRunner.setProperty(MINIMUM_AGE, "0 ms"); + final DistributedMapCacheClient cacheService = mockDistributedMap(); + testRunner.addControllerService("cacheService", cacheService); + testRunner.setProperty(TRACKING_STATE_CACHE, "cacheService"); + testRunner.enableControllerService(cacheService); + final SmbClientService mockNifiSmbClientService = configureTestRunnerWithMockedSambaClient(testRunner); + long now = System.currentTimeMillis(); + mockSmbFolders(mockNifiSmbClientService, + listableEntity("should_list_this_again_after_property_change", now - 100)); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.clearTransferState(); + testRunner.setProperty(DIRECTORY, "testDirectoryChanged"); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.clearTransferState(); + + testRunner.setProperty(FILE_NAME_SUFFIX_FILTER, "suffix_changed"); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.clearTransferState(); + + final SmbClientProviderService clientProviderService = mock(SmbClientProviderService.class); + when(clientProviderService.getIdentifier()).thenReturn("different-client-provider"); + when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://localhost:445/share")); + when(clientProviderService.getClient()).thenReturn(mockNifiSmbClientService); + testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "different-client-provider"); + testRunner.addControllerService("different-client-provider", clientProviderService); + testRunner.enableControllerService(clientProviderService); + + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 1); + + testRunner.assertValid(); + } + + @ParameterizedTest + @ValueSource(longs = {1L, 50L, 150L, 3000L}) + public void testShouldUseTimestampBasedStrategyProperly(Long minimumAge) throws Exception { + final TestRunner testRunner = newTestRunner(TimeMockingListSmb.class); + testRunner.setProperty(LISTING_STRATEGY, "timestamps"); + testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis"); + testRunner.setProperty(MINIMUM_AGE, minimumAge + " ms"); + final SmbClientService mockNifiSmbClientService = configureTestRunnerWithMockedSambaClient(testRunner); + setTime(1000L); + mockSmbFolders(mockNifiSmbClientService); + testRunner.run(); + verify(mockNifiSmbClientService).close(); + mockSmbFolders(mockNifiSmbClientService, + listableEntity("first", 900), + listableEntity("second", 1000) + ); + testRunner.run(); + timePassed(100L); + mockSmbFolders(mockNifiSmbClientService, + listableEntity("first", 900), + listableEntity("second", 1000), + listableEntity("third", 1100) + ); + testRunner.run(); + timePassed(1L); + mockSmbFolders(mockNifiSmbClientService, + listableEntity("first", 900), + listableEntity("second", 1000), + listableEntity("third", 1100), + listableEntity("appeared_during_the_previous_iteration_and_it_was_missed", 1099) + ); + timePassed(100L); + testRunner.run(); + timePassed(minimumAge); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 4); + testRunner.assertValid(); + } + + @ParameterizedTest + @ValueSource(longs = {0L, 50L, 150L, 3000L}) + public void testShouldUseEntityTrackingBasedStrategyProperly(Long minimumAge) throws Exception { + final TestRunner testRunner = newTestRunner(TimeMockingListSmb.class); + testRunner.setProperty(LISTING_STRATEGY, "entities"); + testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis"); + testRunner.setProperty(MINIMUM_AGE, minimumAge + " ms"); + final DistributedMapCacheClient cacheService = mockDistributedMap(); + testRunner.addControllerService("cacheService", cacheService); + testRunner.setProperty(TRACKING_STATE_CACHE, "cacheService"); + testRunner.enableControllerService(cacheService); + final SmbClientService mockNifiSmbClientService = configureTestRunnerWithMockedSambaClient(testRunner); + setTime(1000L); + mockSmbFolders(mockNifiSmbClientService, + listableEntity("first", 1000) + ); + testRunner.run(); + verify(mockNifiSmbClientService).close(); + timePassed(100L); + mockSmbFolders(mockNifiSmbClientService, + listableEntity("first", 1000), + listableEntity("second", 1100) + ); + testRunner.run(); + timePassed(minimumAge); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 2); + testRunner.assertValid(); + } + + @ParameterizedTest + @ValueSource(longs = {0L, 50L, 150L, 3000L}) + public void testShouldUseNoTrackingBasedStrategyProperly(Long minimumAge) throws Exception { + final TestRunner testRunner = newTestRunner(TimeMockingListSmb.class); + testRunner.setProperty(LISTING_STRATEGY, "none"); + testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis"); + testRunner.setProperty(MINIMUM_AGE, minimumAge + " ms"); + final SmbClientService mockNifiSmbClientService = configureTestRunnerWithMockedSambaClient(testRunner); + setTime(1000L); + mockSmbFolders(mockNifiSmbClientService, + listableEntity("first", 1000) + ); + timePassed(minimumAge); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 2); + testRunner.assertValid(); + } + + @Test + public void testShouldFilterByFileAgeCriteria() throws Exception { + final TestRunner testRunner = newTestRunner(TimeMockingListSmb.class); + testRunner.setProperty(LISTING_STRATEGY, "none"); + testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis"); + testRunner.setProperty(MINIMUM_AGE, "10 ms"); + testRunner.setProperty(MAXIMUM_AGE, "30 ms"); + final SmbClientService mockNifiSmbClientService = configureTestRunnerWithMockedSambaClient(testRunner); + setTime(1000L); + mockSmbFolders(mockNifiSmbClientService, + listableEntity("too_young", 1000), + listableEntity("too_old", 1000 - 31), + listableEntity("should_list_this", 1000 - 11) + ); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertValid(); + } + + @Test + public void shouldTurnSmbClientExceptionsToBulletins() throws Exception { + final TestRunner testRunner = newTestRunner(ListSmb.class); + testRunner.setProperty(LISTING_STRATEGY, "timestamps"); + testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis"); + final SmbClientService mockNifiSmbClientService = configureTestRunnerWithMockedSambaClient(testRunner); + when(mockNifiSmbClientService.listRemoteFiles(anyString())).thenThrow(new RuntimeException("test exception")); + testRunner.run(); + assertEquals(1, testRunner.getLogger().getErrorMessages().size()); + } + + @Test + public void shouldFormatRemotePathProperly() throws Exception { + final TestRunner testRunner = newTestRunner(ListSmb.class); + final SmbClientProviderService clientProviderService = mockSmbConnectionPoolService(); + testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, CLIENT_SERVICE_PROVIDER_ID); + testRunner.addControllerService(CLIENT_SERVICE_PROVIDER_ID, clientProviderService); + when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://hostname:445/share")); + final ListSmb underTest = (ListSmb) testRunner.getProcessor(); + + assertEquals("smb://hostname:445/share/", underTest.getPath(testRunner.getProcessContext())); + + testRunner.setProperty(DIRECTORY, "/"); + assertEquals("smb://hostname:445/share/", underTest.getPath(testRunner.getProcessContext())); + + testRunner.setProperty(DIRECTORY, "root"); + assertEquals("smb://hostname:445/share/root/", underTest.getPath(testRunner.getProcessContext())); + + testRunner.setProperty(DIRECTORY, "root/subdirectory"); + assertEquals("smb://hostname:445/share/root/subdirectory/", underTest.getPath(testRunner.getProcessContext())); + + } + + private SmbClientProviderService mockSmbConnectionPoolService() { + final SmbClientProviderService clientProviderService = mock(SmbClientProviderService.class); + when(clientProviderService.getIdentifier()).thenReturn(CLIENT_SERVICE_PROVIDER_ID); + when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://localhost:445/share")); + return clientProviderService; + } + + private SmbClientService configureTestRunnerWithMockedSambaClient(TestRunner testRunner) + throws Exception { + final SmbClientService mockNifiSmbClientService = mock(SmbClientService.class); + testRunner.setProperty(DIRECTORY, "testDirectory"); + + final SmbClientProviderService clientProviderService = mockSmbConnectionPoolService(); + when(clientProviderService.getClient()).thenReturn(mockNifiSmbClientService); + testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, CLIENT_SERVICE_PROVIDER_ID); + testRunner.addControllerService(CLIENT_SERVICE_PROVIDER_ID, clientProviderService); + testRunner.enableControllerService(clientProviderService); + + return mockNifiSmbClientService; + } + + private void mockSmbFolders(SmbClientService mockNifiSmbClientService, SmbListableEntity... entities) { + doAnswer(ignore -> stream(entities)).when(mockNifiSmbClientService).listRemoteFiles(anyString()); + } + + private SmbListableEntity listableEntity(String name, long timeStamp) { + return SmbListableEntity.builder() + .setName(name) + .setTimestamp(timeStamp) + .build(); + } + + private DistributedMapCacheClient mockDistributedMap() throws IOException { + final Map> store = new ConcurrentHashMap<>(); + final DistributedMapCacheClient cacheService = mock(DistributedMapCacheClient.class); + when(cacheService.putIfAbsent(any(), any(), any(), any())).thenReturn(false); + when(cacheService.containsKey(any(), any())).thenReturn(false); + when(cacheService.getIdentifier()).thenReturn("cacheService"); + doAnswer(invocation -> store.get(invocation.getArgument(0))) + .when(cacheService).get(any(), any(), any()); + doAnswer(invocation -> store.put(invocation.getArgument(0), invocation.getArgument(1))) + .when(cacheService).put(any(), any(), any(), any()); + doAnswer(invocation -> Optional.ofNullable(invocation.getArgument(0)).map(store::remove).isPresent()) + .when(cacheService).remove(any(), any()); + return cacheService; + } + + public static class TimeMockingListSmb extends ListSmb { + + @Override + protected long getCurrentTime() { + return currentMillis(); + } + + @Override + protected long getCurrentNanoTime() { + return currentNanos(); + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client-nar/pom.xml b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client-nar/pom.xml new file mode 100644 index 0000000000..c9b13f4768 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client-nar/pom.xml @@ -0,0 +1,45 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-smb-bundle + 1.18.0-SNAPSHOT + + + nifi-smb-smbj-client-nar + nar + + true + true + + + + + org.apache.nifi + nifi-smb-client-api-nar + 1.18.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-smb-smbj-client + 1.18.0-SNAPSHOT + + + diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/pom.xml b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/pom.xml new file mode 100644 index 0000000000..b9bac48ddc --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/pom.xml @@ -0,0 +1,69 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-smb-bundle + 1.18.0-SNAPSHOT + + + nifi-smb-smbj-client + jar + + + + org.apache.nifi + nifi-smb-client-api + 1.18.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-record + + + org.apache.nifi + nifi-utils + 1.18.0-SNAPSHOT + + + com.hierynomus + smbj + + + org.testcontainers + testcontainers + test + + + org.apache.nifi + nifi-mock + 1.18.0-SNAPSHOT + test + + + org.testcontainers + toxiproxy + test + + + diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java new file mode 100644 index 0000000000..f72cec6a2b --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.SmbConfig; +import com.hierynomus.smbj.auth.AuthenticationContext; +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; + +@Tags({"samba, smb, cifs, files"}) +@CapabilityDescription("Provides access to SMB Sessions with shared authentication credentials.") +public class SmbjClientProviderService extends AbstractControllerService implements SmbClientProviderService { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .displayName("Hostname") + .name("hostname") + .description("The network host of the SMB file server.") + .required(true) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor DOMAIN = new PropertyDescriptor.Builder() + .displayName("Domain") + .name("domain") + .description( + "The domain used for authentication. Optional, in most cases username and password is sufficient.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .displayName("Username") + .name("username") + .description( + "The username used for authentication.") + .required(false) + .defaultValue("Guest") + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .displayName("Password") + .name("password") + .description("The password used for authentication.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .displayName("Port") + .name("port") + .description("Port to use for connection.") + .required(true) + .addValidator(PORT_VALIDATOR) + .defaultValue("445") + .build(); + public static final PropertyDescriptor SHARE = new PropertyDescriptor.Builder() + .displayName("Share") + .name("share") + .description("The network share to which files should be listed from. This is the \"first folder\"" + + "after the hostname: smb://hostname:port/[share]/dir1/dir2") + .required(true) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .displayName("Timeout") + .name("timeout") + .description("Timeout for read and write operations.") + .required(true) + .defaultValue("5 sec") + .addValidator(TIME_PERIOD_VALIDATOR) + .build(); + private static final List PROPERTIES = Collections + .unmodifiableList(asList( + HOSTNAME, + PORT, + SHARE, + USERNAME, + PASSWORD, + DOMAIN, + TIMEOUT + )); + private SMBClient smbClient; + private AuthenticationContext authenticationContext; + private String hostname; + private int port; + private String shareName; + + @Override + public SmbClientService getClient() throws IOException { + final SmbjClientService client = new SmbjClientService(smbClient, authenticationContext); + + try { + client.connectToShare(hostname, port, shareName); + } catch (IOException e) { + client.forceFullyCloseConnection(); + client.connectToShare(hostname, port, shareName); + } + + return client; + } + + @Override + public URI getServiceLocation() { + return URI.create(String.format("smb://%s:%d/%s", hostname, port, shareName)); + } + + @OnEnabled + public void onEnabled(ConfigurationContext context) { + this.hostname = context.getProperty(HOSTNAME).getValue(); + this.port = context.getProperty(PORT).asInteger(); + this.shareName = context.getProperty(SHARE).getValue(); + this.smbClient = new SMBClient(SmbConfig.builder() + .withTimeout(context.getProperty(TIMEOUT).asTimePeriod(MILLISECONDS), MILLISECONDS) + .build()); + createAuthenticationContext(context); + } + + @OnDisabled + public void onDisabled() { + smbClient.close(); + smbClient = null; + hostname = null; + port = 0; + shareName = null; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + private void createAuthenticationContext(ConfigurationContext context) { + if (context.getProperty(USERNAME).isSet()) { + final String userName = context.getProperty(USERNAME).getValue(); + final String password = + context.getProperty(PASSWORD).isSet() ? context.getProperty(PASSWORD).getValue() : ""; + final String domainOrNull = + context.getProperty(DOMAIN).isSet() ? context.getProperty(DOMAIN).getValue() : null; + authenticationContext = new AuthenticationContext(userName, password.toCharArray(), domainOrNull); + } else { + authenticationContext = AuthenticationContext.anonymous(); + } + } + +} diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java new file mode 100644 index 0000000000..431a4e9f52 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static java.lang.String.format; +import static java.util.Arrays.asList; +import static java.util.stream.StreamSupport.stream; + +import com.hierynomus.msdtyp.AccessMask; +import com.hierynomus.msfscc.FileAttributes; +import com.hierynomus.msfscc.fileinformation.FileIdBothDirectoryInformation; +import com.hierynomus.mssmb2.SMB2CreateDisposition; +import com.hierynomus.mssmb2.SMB2CreateOptions; +import com.hierynomus.mssmb2.SMB2ShareAccess; +import com.hierynomus.mssmb2.SMBApiException; +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.auth.AuthenticationContext; +import com.hierynomus.smbj.connection.Connection; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.share.Directory; +import com.hierynomus.smbj.share.DiskShare; +import com.hierynomus.smbj.share.Share; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.stream.Stream; + +public class SmbjClientService implements SmbClientService { + + private static final List SPECIAL_DIRECTORIES = asList(".", ".."); + + final private AuthenticationContext authenticationContext; + final private SMBClient smbClient; + + private Connection connection; + private Session session; + private DiskShare share; + + public SmbjClientService(SMBClient smbClient, AuthenticationContext authenticationContext) { + this.smbClient = smbClient; + this.authenticationContext = authenticationContext; + } + + public void connectToShare(String hostname, int port, String shareName) throws IOException { + Share share; + try { + connection = smbClient.connect(hostname, port); + session = connection.authenticate(authenticationContext); + share = session.connectShare(shareName); + } catch (Exception e) { + close(); + throw new IOException("Could not connect to share " + format("%s:%d/%s", hostname, port, shareName), e); + } + if (share instanceof DiskShare) { + this.share = (DiskShare) share; + } else { + close(); + throw new IllegalArgumentException("DiskShare not found. Share " + + share.getClass().getSimpleName() + " found on " + format("%s:%d/%s", hostname, port, + shareName)); + } + } + + public void forceFullyCloseConnection() { + try { + if (connection != null) { + connection.close(true); + } + } catch (IOException ignore) { + } finally { + connection = null; + } + } + + @Override + public void close() { + try { + if (session != null) { + session.close(); + } + } catch (IOException ignore) { + + } finally { + session = null; + } + } + + @Override + public Stream listRemoteFiles(String filePath) { + return Stream.of(filePath).flatMap(path -> { + final Directory directory = openDirectory(path); + return stream(directory::spliterator, 0, false) + .map(entity -> buildSmbListableEntity(entity, path)) + .filter(entity -> !specialDirectory(entity)) + .flatMap(listable -> listable.isDirectory() ? listRemoteFiles(listable.getPathWithName()) + : Stream.of(listable)) + .onClose(directory::close); + }); + } + + @Override + public void createDirectory(String path) { + final int lastDirectorySeparatorPosition = path.lastIndexOf("/"); + if (lastDirectorySeparatorPosition > 0) { + createDirectory(path.substring(0, lastDirectorySeparatorPosition)); + } + if (!share.folderExists(path)) { + share.mkdir(path); + } + } + + private SmbListableEntity buildSmbListableEntity(FileIdBothDirectoryInformation info, String path) { + return SmbListableEntity.builder() + .setName(info.getFileName()) + .setShortName(info.getShortName()) + .setPath(path) + .setTimestamp(info.getLastWriteTime().toEpochMillis()) + .setCreationTime(info.getCreationTime().toEpochMillis()) + .setChangeTime(info.getChangeTime().toEpochMillis()) + .setLastAccessTime(info.getLastAccessTime().toEpochMillis()) + .setDirectory((info.getFileAttributes() & FileAttributes.FILE_ATTRIBUTE_DIRECTORY.getValue()) != 0) + .setSize(info.getEndOfFile()) + .setAllocationSize(info.getAllocationSize()) + .build(); + } + + private Directory openDirectory(String path) { + try { + return share.openDirectory( + path, + EnumSet.of(AccessMask.GENERIC_READ), + EnumSet.of(FileAttributes.FILE_ATTRIBUTE_DIRECTORY), + EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ), + SMB2CreateDisposition.FILE_OPEN, + EnumSet.of(SMB2CreateOptions.FILE_DIRECTORY_FILE) + ); + } catch (SMBApiException s) { + throw new RuntimeException("Could not open directory " + path + " due to " + s.getMessage(), s); + } + } + + private boolean specialDirectory(SmbListableEntity entity) { + return SPECIAL_DIRECTORIES.contains(entity.getName()); + } + +} + + diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..3e64193628 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.services.smb.SmbjClientProviderService diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java new file mode 100644 index 0000000000..f3de70ce71 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static java.util.stream.Collectors.toSet; +import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN; +import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME; +import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD; +import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT; +import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE; +import static org.apache.nifi.services.smb.SmbjClientProviderService.TIMEOUT; +import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; + +import eu.rekawek.toxiproxy.model.ToxicDirection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.util.MockConfigurationContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.ToxiproxyContainer; +import org.testcontainers.containers.ToxiproxyContainer.ContainerProxy; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.utility.DockerImageName; + +public class NiFiSmbjClientIT { + + private final static Logger sambaContainerLogger = LoggerFactory.getLogger("sambaContainer"); + private final static Logger toxyProxyLogger = LoggerFactory.getLogger("toxiProxy"); + + private final Network network = Network.newNetwork(); + + private final GenericContainer sambaContainer = new GenericContainer<>(DockerImageName.parse("dperson/samba")) + .withExposedPorts(139, 445) + .waitingFor(Wait.forListeningPort()) + .withLogConsumer(new Slf4jLogConsumer(sambaContainerLogger)) + .withNetwork(network) + .withNetworkAliases("samba") + .withCommand("-w domain -u username;password -s share;/folder;;no;no;username;;; -p"); + + private final ToxiproxyContainer toxiproxy = new ToxiproxyContainer("shopify/toxiproxy") + .withNetwork(network) + .withLogConsumer(new Slf4jLogConsumer(toxyProxyLogger)) + .withNetworkAliases("toxiproxy"); + + @BeforeEach + public void beforeEach() { + sambaContainer.start(); + toxiproxy.start(); + } + + @AfterEach + public void afterEach() { + toxiproxy.stop(); + sambaContainer.stop(); + } + + @Test + public void shouldRescueAfterConnectionFailure() throws Exception { + writeFile("testDirectory/file", "content"); + writeFile("testDirectory/directory1/file", "content"); + writeFile("testDirectory/directory2/file", "content"); + writeFile("testDirectory/directory2/nested_directory/file", "content"); + ContainerProxy sambaProxy = toxiproxy.getProxy("samba", 445); + SmbjClientProviderService smbjClientProviderService = new SmbjClientProviderService(); + ConfigurationContext context = mock(ConfigurationContext.class); + + Map properties = new HashMap<>(); + properties.put(HOSTNAME, sambaProxy.getContainerIpAddress()); + properties.put(PORT, String.valueOf(sambaProxy.getProxyPort())); + properties.put(SHARE, "share"); + properties.put(USERNAME, "username"); + properties.put(PASSWORD, "password"); + properties.put(DOMAIN, "domain"); + properties.put(TIMEOUT, "0.5 sec"); + MockConfigurationContext mockConfigurationContext = new MockConfigurationContext(properties, null); + + smbjClientProviderService.onEnabled(mockConfigurationContext); + + sambaProxy.toxics().latency("slow", ToxicDirection.DOWNSTREAM, 300); + + AtomicInteger i = new AtomicInteger(0); + + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10); + CountDownLatch latch = new CountDownLatch(100); + executorService.scheduleWithFixedDelay(() -> { + + int iteration = i.getAndIncrement(); + + if (iteration > 100) { + return; + } + + executorService.submit(() -> { + + SmbClientService s = null; + try { + + s = smbjClientProviderService.getClient(); + if (iteration == 25) { + sambaProxy.setConnectionCut(true); + } + + final Set actual = s.listRemoteFiles("testDirectory") + .map(SmbListableEntity::getIdentifier) + .collect(toSet()); + + assertTrue(actual.contains("testDirectory/file")); + assertTrue(actual.contains("testDirectory/directory1/file")); + assertTrue(actual.contains("testDirectory/directory2/file")); + assertTrue(actual.contains("testDirectory/directory2/nested_directory/file")); + + + } catch (Exception e) { + if (iteration == 50) { + sambaProxy.setConnectionCut(false); + } + if (iteration == 100) { + fail(); + } + } finally { + if (s != null) { + try { + s.close(); + } catch (Exception e) { + } + } + } + latch.countDown(); + }); + + }, 0, 2, TimeUnit.SECONDS); + + latch.await(); + executorService.shutdown(); + smbjClientProviderService.onDisabled(); + } + + private void writeFile(String path, String content) { + String containerPath = "/folder/" + path; + sambaContainer.copyFileToContainer(Transferable.of(content), containerPath); + } + +} diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java new file mode 100644 index 0000000000..abf032072d --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.auth.AuthenticationContext; +import com.hierynomus.smbj.connection.Connection; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.share.DiskShare; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +class NiFiSmbjClientTest { + + @Mock + DiskShare share; + + @Mock + SMBClient smbClient; + + @Mock + AuthenticationContext authenticationContext; + + @Mock + Session session; + + @Mock + Connection connection; + + @InjectMocks + SmbjClientService underTest; + + @BeforeEach + public void beforeEach() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void shouldCreateDirectoriesRecursively() throws Exception { + + when(smbClient.connect("hostname", 445)) + .thenReturn(connection); + when(connection.authenticate(authenticationContext)).thenReturn(session); + when(session.connectShare(anyString())).thenReturn(share); + when(share.fileExists("directory")).thenReturn(true); + when(share.fileExists("path")).thenReturn(false); + when(share.fileExists("to")).thenReturn(false); + when(share.fileExists("create")).thenReturn(false); + + underTest.connectToShare("hostname", 445, "share"); + underTest.createDirectory("directory/path/to/create"); + + verify(share).mkdir("directory/path"); + verify(share).mkdir("directory/path/to"); + verify(share).mkdir("directory/path/to/create"); + + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-smb-bundle/pom.xml b/nifi-nar-bundles/nifi-smb-bundle/pom.xml index 23553206c8..53e2db5783 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-smb-bundle/pom.xml @@ -26,7 +26,41 @@ pom + nifi-smb-client-api + nifi-smb-client-api-nar + nifi-smb-smbj-client + nifi-smb-smbj-client-nar nifi-smb-processors nifi-smb-nar + + + + + com.hierynomus + smbj + 0.11.5 + + + net.engio + mbassador + 1.3.2 + + + org.apache.nifi + nifi-mock + 1.18.0-SNAPSHOT + + + org.testcontainers + testcontainers + ${testcontainers.version} + + + org.testcontainers + toxiproxy + ${testcontainers.version} + + + diff --git a/nifi-registry/pom.xml b/nifi-registry/pom.xml index 7dea5683e1..8b52eb969f 100644 --- a/nifi-registry/pom.xml +++ b/nifi-registry/pom.xml @@ -40,7 +40,6 @@ 8.4.2 7.0.0 3.12.0 - 1.17.3 3.4.0-01 2.3.2 5.13.0.202109080827-r diff --git a/pom.xml b/pom.xml index 4e4d741108..2a705edb85 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,7 @@ 4.5.13 4.4.15 1.70 + 1.17.3 1.7.36 2.2.0 9.4.48.v20220622