diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java index 4ecaf9a507..9eb0e41a67 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.services.smb; +import java.io.IOException; +import java.io.OutputStream; import java.util.stream.Stream; /** @@ -27,4 +29,5 @@ public interface SmbClientService extends AutoCloseable { void createDirectory(String path); + void readFile(String fileName, OutputStream outputStream) throws IOException; } diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbException.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbException.java new file mode 100644 index 0000000000..e076560943 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +public class SmbException extends RuntimeException { + + private long errorCode; + + public SmbException(String message, long errorCode, Exception cause) { + super(message, cause); + this.errorCode = errorCode; + } + + public long getErrorCode() { + return errorCode; + } + +} diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java index da33602540..53862f6532 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.services.smb; +import java.net.URI; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -29,30 +30,42 @@ import org.apache.nifi.serialization.record.RecordFieldType; public class SmbListableEntity implements ListableEntity { + public static final String FILENAME = "filename"; + public static final String SHORT_NAME = "shortName"; + public static final String PATH = "path"; + public static final String SERVICE_LOCATION = "serviceLocation"; + public static final String CREATION_TIME = "creationTime"; + public static final String LAST_ACCESS_TIME = "lastAccessTime"; + public static final String CHANGE_TIME = "changeTime"; + public static final String LAST_MODIFIED_TIME = "lastModifiedTime"; + public static final String SIZE = "size"; + public static final String ALLOCATION_SIZE = "allocationSize"; private final String name; private final String shortName; private final String path; - private final long timestamp; + private final long lastModifiedTime; private final long creationTime; private final long lastAccessTime; private final long changeTime; private final boolean directory; private final long size; private final long allocationSize; + private final URI serviceLocation; - private SmbListableEntity(String name, String shortName, String path, long timestamp, long creationTime, + private SmbListableEntity(String name, String shortName, String path, long lastModifiedTime, long creationTime, long lastAccessTime, long changeTime, boolean directory, - long size, long allocationSize) { + long size, long allocationSize, URI serviceLocation) { this.name = name; this.shortName = shortName; this.path = path; - this.timestamp = timestamp; + this.lastModifiedTime = lastModifiedTime; this.creationTime = creationTime; this.lastAccessTime = lastAccessTime; this.changeTime = changeTime; this.directory = directory; this.size = size; this.allocationSize = allocationSize; + this.serviceLocation = serviceLocation; } public static SimpleRecordSchema getRecordSchema() { @@ -60,8 +73,8 @@ public class SmbListableEntity implements ListableEntity { new RecordField("filename", RecordFieldType.STRING.getDataType(), false), new RecordField("shortName", RecordFieldType.STRING.getDataType(), false), new RecordField("path", RecordFieldType.STRING.getDataType(), false), - new RecordField("identifier", RecordFieldType.STRING.getDataType(), false), - new RecordField("timestamp", RecordFieldType.LONG.getDataType(), false), + new RecordField("absolute.path", RecordFieldType.STRING.getDataType(), false), + new RecordField("lastModifiedTime", RecordFieldType.LONG.getDataType(), false), new RecordField("creationTime", RecordFieldType.LONG.getDataType(), false), new RecordField("lastAccessTime", RecordFieldType.LONG.getDataType(), false), new RecordField("changeTime", RecordFieldType.LONG.getDataType(), false), @@ -108,6 +121,10 @@ public class SmbListableEntity implements ListableEntity { return path.isEmpty() ? name : path + "/" + name; } + public long getLastModifiedTime() { + return lastModifiedTime; + } + @Override public String getIdentifier() { return getPathWithName(); @@ -115,7 +132,7 @@ public class SmbListableEntity implements ListableEntity { @Override public long getTimestamp() { - return timestamp; + return getLastModifiedTime(); } @Override @@ -146,36 +163,42 @@ public class SmbListableEntity implements ListableEntity { @Override public String toString() { - return getPathWithName() + " (last write: " + timestamp + " size: " + size + ")"; + return getPathWithName() + " (last write: " + lastModifiedTime + " size: " + size + ")"; } @Override public Record toRecord() { final Map record = new TreeMap<>(); - record.put("filename", getName()); - record.put("shortName", getShortName()); - record.put("path", path); - record.put("identifier", getPathWithName()); - record.put("timestamp", getTimestamp()); - record.put("creationTime", getCreationTime()); - record.put("lastAccessTime", getLastAccessTime()); - record.put("size", getSize()); - record.put("allocationSize", getAllocationSize()); + record.put(FILENAME, getName()); + record.put(SHORT_NAME, getShortName()); + record.put(PATH, getPath()); + record.put(SERVICE_LOCATION, getServiceLocation().toString()); + record.put(CREATION_TIME, getCreationTime()); + record.put(LAST_ACCESS_TIME, getLastAccessTime()); + record.put(LAST_MODIFIED_TIME, getLastModifiedTime()); + record.put(CHANGE_TIME, getChangeTime()); + record.put(SIZE, getSize()); + record.put(ALLOCATION_SIZE, getAllocationSize()); return new MapRecord(getRecordSchema(), record); } + private URI getServiceLocation() { + return serviceLocation; + } + public static class SmbListableEntityBuilder { private String name; private String shortName; private String path = ""; - private long timestamp; + private long lastModifiedTime; private long creationTime; private long lastAccessTime; private long changeTime; private boolean directory = false; private long size = 0; private long allocationSize = 0; + private URI serviceLocation; public SmbListableEntityBuilder setName(String name) { this.name = name; @@ -192,8 +215,8 @@ public class SmbListableEntity implements ListableEntity { return this; } - public SmbListableEntityBuilder setTimestamp(long timestamp) { - this.timestamp = timestamp; + public SmbListableEntityBuilder setLastModifiedTime(long lastModifiedTime) { + this.lastModifiedTime = lastModifiedTime; return this; } @@ -227,9 +250,14 @@ public class SmbListableEntity implements ListableEntity { return this; } + public SmbListableEntityBuilder setServiceLocation(URI serviceLocation) { + this.serviceLocation = serviceLocation; + return this; + } + public SmbListableEntity build() { - return new SmbListableEntity(name, shortName, path, timestamp, creationTime, lastAccessTime, changeTime, - directory, size, allocationSize); + return new SmbListableEntity(name, shortName, path, lastModifiedTime, creationTime, lastAccessTime, changeTime, + directory, size, allocationSize, serviceLocation); } } diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java new file mode 100644 index 0000000000..c38216058d --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableSet; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.services.smb.SmbClientProviderService; +import org.apache.nifi.services.smb.SmbClientService; +import org.apache.nifi.services.smb.SmbException; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"samba", "smb", "cifs", "files", "fetch"}) +@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.") +@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class}) +@WritesAttributes({ + @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails."), + @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails.") +}) +public class FetchSmb extends AbstractProcessor { + + public static final String ERROR_CODE_ATTRIBUTE = "error.code"; + public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message"; + + public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor + .Builder().name("remote-file") + .displayName("Remote File") + .description("The full path of the file to be retrieved from the remote server. Expression language is supported.") + .required(true) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .defaultValue("${path}/${filename}") + .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) + .build(); + + public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder() + .name("smb-client-provider-service") + .displayName("SMB Client Provider Service") + .description("Specifies the SMB client provider to use for creating SMB connections.") + .required(true) + .identifiesControllerService(SmbClientProviderService.class) + .build(); + public static final Relationship REL_SUCCESS = + new Relationship.Builder() + .name("success") + .description("A flowfile will be routed here for each successfully fetched file.") + .build(); + public static final Relationship REL_FAILURE = + new Relationship.Builder().name("failure") + .description( + "A flowfile will be routed here when failed to fetch its content.") + .build(); + public static final Set RELATIONSHIPS = unmodifiableSet(new HashSet<>(asList( + REL_SUCCESS, + REL_FAILURE + ))); + public static final String UNCATEGORIZED_ERROR = "-2"; + private static final List PROPERTIES = asList( + SMB_CLIENT_PROVIDER_SERVICE, + REMOTE_FILE + ); + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final SmbClientProviderService clientProviderService = + context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class); + + try (SmbClientService client = clientProviderService.getClient()) { + fetchAndTransfer(session, context, client, flowFile); + } catch (Exception e) { + getLogger().error("Couldn't connect to SMB.", e); + flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e)); + flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); + session.transfer(flowFile, REL_FAILURE); + } + + } + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + private void fetchAndTransfer(ProcessSession session, ProcessContext context, SmbClientService client, + FlowFile flowFile) { + final Map attributes = flowFile.getAttributes(); + final String filename = context.getProperty(REMOTE_FILE) + .evaluateAttributeExpressions(attributes).getValue(); + try { + flowFile = session.write(flowFile, outputStream -> client.readFile(filename, outputStream)); + session.transfer(flowFile, REL_SUCCESS); + } catch (Exception e) { + getLogger().error("Couldn't fetch file {}.", filename, e); + flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e)); + flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, getErrorMessage(e)); + session.transfer(flowFile, REL_FAILURE); + } + } + + private String getErrorCode(Exception exception) { + return Optional.ofNullable(exception instanceof SmbException ? (SmbException) exception : null) + .map(SmbException::getErrorCode) + .map(String::valueOf) + .orElse(UNCATEGORIZED_ERROR); + } + + private String getErrorMessage(Exception exception) { + return Optional.ofNullable(exception.getMessage()) + .orElse(exception.getClass().getSimpleName()); + } + +} + diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java index 402e27173f..9ed9a3c969 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java @@ -85,7 +85,7 @@ import java.util.regex.Pattern; @CapabilityDescription("Reads file from a samba network location to FlowFiles. " + "Use this processor instead of a cifs mounts if share access control is important. " + "Configure the Hostname, Share and Directory accordingly: \\\\[Hostname]\\[Share]\\[path\\to\\Directory]") -@SeeAlso({PutSmbFile.class}) +@SeeAlso({PutSmbFile.class, ListSmb.class, FetchSmb.class}) @WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the network share"), @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's network share name. For example, " diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java index e0029144f6..f9eab575d4 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java @@ -16,27 +16,37 @@ */ package org.apache.nifi.processors.smb; +import static java.time.ZoneOffset.UTC; import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableMap; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.nifi.components.state.Scope.CLUSTER; import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR; import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; +import static org.apache.nifi.services.smb.SmbListableEntity.ALLOCATION_SIZE; +import static org.apache.nifi.services.smb.SmbListableEntity.CHANGE_TIME; +import static org.apache.nifi.services.smb.SmbListableEntity.LAST_MODIFIED_TIME; +import static org.apache.nifi.services.smb.SmbListableEntity.CREATION_TIME; +import static org.apache.nifi.services.smb.SmbListableEntity.FILENAME; +import static org.apache.nifi.services.smb.SmbListableEntity.LAST_ACCESS_TIME; +import static org.apache.nifi.services.smb.SmbListableEntity.PATH; +import static org.apache.nifi.services.smb.SmbListableEntity.SERVICE_LOCATION; +import static org.apache.nifi.services.smb.SmbListableEntity.SHORT_NAME; +import static org.apache.nifi.services.smb.SmbListableEntity.SIZE; import java.io.IOException; import java.net.URI; import java.time.LocalDateTime; -import java.time.ZoneOffset; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Stream; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -69,7 +79,7 @@ import org.apache.nifi.services.smb.SmbListableEntity; @PrimaryNodeOnly @TriggerSerially @Tags({"samba, smb, cifs, files", "list"}) -@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class, FetchSmb.class}) @CapabilityDescription("Lists concrete files shared via SMB protocol. " + "Each listed file may result in one flowfile, the metadata being written as flowfile attributes. " + "Or - in case the 'Record Writer' property is set - the entire result is written as records to a single flowfile. " @@ -79,32 +89,26 @@ import org.apache.nifi.services.smb.SmbListableEntity; "previous node left off without duplicating all of the data.") @InputRequirement(Requirement.INPUT_FORBIDDEN) @WritesAttributes({ - @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), - @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."), - @WritesAttribute(attribute = "path", description = - "The path is set to the relative path of the file's directory " - + "on filesystem compared to the Share and Input Directory properties and the configured host " - + "and port inherited from the configured connection pool controller service. For example, for " - + "a given remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from " - + "smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file then the path attribute will be set to \"sub/folder/file\"."), - @WritesAttribute(attribute = "absolute.path", description = - "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " - + "given a remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listen from " - + "SHARE/DIRECTORY/sub/folder/file then the absolute.path attribute will be set to " - + "SHARE/DIRECTORY/sub/folder/file."), - @WritesAttribute(attribute = "identifier", description = - "The identifier of the file. This equals to the path attribute so two files with the same relative path " - + "coming from different file shares considered to be identical."), - @WritesAttribute(attribute = "timestamp", description = - "The timestamp of when the file's content changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), - @WritesAttribute(attribute = "createTime", description = - "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), - @WritesAttribute(attribute = "lastAccessTime", description = - "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), - @WritesAttribute(attribute = "changeTime", description = - "The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), - @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), - @WritesAttribute(attribute = "allocationSize", description = "The number of bytes allocated for the file on the server"), + @WritesAttribute(attribute = FILENAME, description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = SHORT_NAME, description = "The short name of the file that was read from filesystem."), + @WritesAttribute(attribute = PATH, description = + "The path is set to the relative path of the file's directory on the remote filesystem compared to the " + + "Share root directory. For example, for a given remote location" + + "smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file then the path attribute will be set to " + + "\"DIRECTORY/sub/folder/file\"."), + @WritesAttribute(attribute = SERVICE_LOCATION, description = + "The SMB URL of the share."), + @WritesAttribute(attribute = LAST_MODIFIED_TIME, description = + "The timestamp of when the file's content changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'."), + @WritesAttribute(attribute = CREATION_TIME, description = + "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'."), + @WritesAttribute(attribute = LAST_ACCESS_TIME, description = + "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'."), + @WritesAttribute(attribute = CHANGE_TIME, description = + "The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'."), + @WritesAttribute(attribute = SIZE, description = "The size of the file in bytes."), + @WritesAttribute(attribute = ALLOCATION_SIZE, description = "The number of bytes allocated for the file on the server."), }) @Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of files, the state of the previous listing can be stored in order to list files " @@ -137,7 +141,7 @@ public class ListSmb extends AbstractListProcessor { public static final PropertyDescriptor MAXIMUM_AGE = new PropertyDescriptor.Builder() .displayName("Maximum File Age") .name("max-file-age") - .description("Any file older than the given value will be omitted. ") + .description("Any file older than the given value will be omitted.") .required(false) .addValidator(TIME_PERIOD_VALIDATOR) .build(); @@ -184,11 +188,11 @@ public class ListSmb extends AbstractListProcessor { .build(); private static final List PROPERTIES = unmodifiableList(asList( - SMB_LISTING_STRATEGY, SMB_CLIENT_PROVIDER_SERVICE, + SMB_LISTING_STRATEGY, DIRECTORY, - AbstractListProcessor.RECORD_WRITER, FILE_NAME_SUFFIX_FILTER, + AbstractListProcessor.RECORD_WRITER, MINIMUM_AGE, MAXIMUM_AGE, MINIMUM_SIZE, @@ -207,17 +211,18 @@ public class ListSmb extends AbstractListProcessor { @Override protected Map createAttributes(SmbListableEntity entity, ProcessContext context) { final Map attributes = new TreeMap<>(); - attributes.put("filename", entity.getName()); - attributes.put("shortname", entity.getShortName()); - attributes.put("path", entity.getPath()); - attributes.put("absolute.path", entity.getPathWithName()); - attributes.put("identifier", entity.getIdentifier()); - attributes.put("timestamp", formatTimeStamp(entity.getTimestamp())); - attributes.put("creationTime", formatTimeStamp(entity.getCreationTime())); - attributes.put("lastAccessTime", formatTimeStamp(entity.getLastAccessTime())); - attributes.put("changeTime", formatTimeStamp(entity.getChangeTime())); - attributes.put("size", String.valueOf(entity.getSize())); - attributes.put("allocationSize", String.valueOf(entity.getAllocationSize())); + final SmbClientProviderService clientProviderService = + context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class); + attributes.put(FILENAME, entity.getName()); + attributes.put(SHORT_NAME, entity.getShortName()); + attributes.put(PATH, entity.getPath()); + attributes.put(SERVICE_LOCATION, clientProviderService.getServiceLocation().toString()); + attributes.put(LAST_MODIFIED_TIME, formatTimeStamp(entity.getLastModifiedTime())); + attributes.put(CREATION_TIME, formatTimeStamp(entity.getCreationTime())); + attributes.put(LAST_ACCESS_TIME, formatTimeStamp(entity.getLastAccessTime())); + attributes.put(CHANGE_TIME, formatTimeStamp(entity.getChangeTime())); + attributes.put(SIZE, String.valueOf(entity.getSize())); + attributes.put(ALLOCATION_SIZE, String.valueOf(entity.getAllocationSize())); return unmodifiableMap(attributes); } @@ -286,7 +291,7 @@ public class ListSmb extends AbstractListProcessor { private String formatTimeStamp(long timestamp) { return ISO_DATE_TIME.format( - LocalDateTime.ofEpochSecond(TimeUnit.MILLISECONDS.toSeconds(timestamp), 0, ZoneOffset.UTC)); + LocalDateTime.ofEpochSecond(MILLISECONDS.toSeconds(timestamp), 0, UTC)); } private boolean isExecutionStopped(ListingMode listingMode) { @@ -295,9 +300,9 @@ public class ListSmb extends AbstractListProcessor { private Predicate createFileFilter(ProcessContext context, Long minTimestampOrNull) { - final Long minimumAge = context.getProperty(MINIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final Long minimumAge = context.getProperty(MINIMUM_AGE).asTimePeriod(MILLISECONDS); final Long maximumAgeOrNull = context.getProperty(MAXIMUM_AGE).isSet() ? context.getProperty(MAXIMUM_AGE) - .asTimePeriod(TimeUnit.MILLISECONDS) : null; + .asTimePeriod(MILLISECONDS) : null; final Double minimumSizeOrNull = context.getProperty(MINIMUM_SIZE).isSet() ? context.getProperty(MINIMUM_SIZE).asDataSize(DataUnit.B) : null; @@ -307,14 +312,14 @@ public class ListSmb extends AbstractListProcessor { final String suffixOrNull = context.getProperty(FILE_NAME_SUFFIX_FILTER).getValue(); final long now = getCurrentTime(); - Predicate filter = entity -> now - entity.getTimestamp() >= minimumAge; + Predicate filter = entity -> now - entity.getLastModifiedTime() >= minimumAge; if (maximumAgeOrNull != null) { - filter = filter.and(entity -> now - entity.getTimestamp() <= maximumAgeOrNull); + filter = filter.and(entity -> now - entity.getLastModifiedTime() <= maximumAgeOrNull); } if (minTimestampOrNull != null) { - filter = filter.and(entity -> entity.getTimestamp() >= minTimestampOrNull); + filter = filter.and(entity -> entity.getLastModifiedTime() >= minTimestampOrNull); } if (minimumSizeOrNull != null) { @@ -341,7 +346,7 @@ public class ListSmb extends AbstractListProcessor { try { clientService.close(); } catch (Exception e) { - throw new RuntimeException("Could not close samba client", e); + throw new RuntimeException("Could not close SMB client", e); } }); } @@ -349,7 +354,7 @@ public class ListSmb extends AbstractListProcessor { private String getDirectory(ProcessContext context) { final PropertyValue property = context.getProperty(DIRECTORY); final String directory = property.isSet() ? property.getValue().replace('\\', '/') : ""; - return directory.equals("/") ? "" : directory; + return "/".equals(directory) ? "" : directory; } private static class MustNotContainDirectorySeparatorsValidator implements Validator { @@ -359,11 +364,11 @@ public class ListSmb extends AbstractListProcessor { return new ValidationResult.Builder() .subject(subject) .input(value) - .valid(!value.contains("/")) + .valid(!value.contains("/") && !value.contains("\\")) .explanation(subject + " must not contain any folder separator character.") .build(); } } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java index 6c6821fca4..d88561e895 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java @@ -68,7 +68,7 @@ import java.util.EnumSet; @CapabilityDescription("Writes the contents of a FlowFile to a samba network location. " + "Use this processor instead of a cifs mounts if share access control is important." + "Configure the Hostname, Share and Directory accordingly: \\\\[Hostname]\\[Share]\\[path\\to\\Directory]") -@SeeAlso({GetSmbFile.class}) +@SeeAlso({GetSmbFile.class, ListSmb.class, FetchSmb.class}) @ReadsAttributes({@ReadsAttribute(attribute="filename", description="The filename to use when writing the FlowFile to the network folder.")}) public class PutSmbFile extends AbstractProcessor { public static final String SHARE_ACCESS_NONE = "none"; diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 0dd9276bc0..650be2dd0a 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +org.apache.nifi.processors.smb.FetchSmb org.apache.nifi.processors.smb.GetSmbFile org.apache.nifi.processors.smb.ListSmb org.apache.nifi.processors.smb.PutSmbFile diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java new file mode 100644 index 0000000000..fb057791a8 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static org.apache.nifi.processors.smb.FetchSmb.REL_FAILURE; +import static org.apache.nifi.processors.smb.FetchSmb.REL_SUCCESS; +import static org.apache.nifi.processors.smb.FetchSmb.REMOTE_FILE; +import static org.apache.nifi.util.TestRunners.newTestRunner; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.HashMap; +import java.util.Map; +import org.apache.nifi.services.smb.SmbjClientProviderService; +import org.apache.nifi.util.TestRunner; +import org.junit.jupiter.api.Test; + +public class FetchSmbIT extends SambaTestContainers { + + @Test + public void fetchFilesUsingEL() throws Exception { + writeFile("/test_file", "test_content"); + TestRunner testRunner = newTestRunner(FetchSmb.class); + testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}"); + final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true); + + Map attributes = new HashMap<>(); + attributes.put("attribute_to_find_using_EL", "test_file"); + + testRunner.enqueue("ignored", attributes); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 1); + assertEquals("test_content", testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent()); + testRunner.assertValid(); + testRunner.disableControllerService(smbjClientProviderService); + } + + @Test + public void tryToFetchNonExistingFileEmitsFailure() throws Exception { + TestRunner testRunner = newTestRunner(FetchSmb.class); + testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}"); + final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true); + + Map attributes = new HashMap<>(); + attributes.put("attribute_to_find_using_EL", "non_existing_file"); + + testRunner.enqueue("ignored", attributes); + testRunner.run(); + testRunner.assertTransferCount(REL_FAILURE, 1); + testRunner.assertValid(); + testRunner.disableControllerService(smbjClientProviderService); + } + +} diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbTest.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbTest.java new file mode 100644 index 0000000000..42651a4ef1 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static org.apache.nifi.processors.smb.FetchSmb.ERROR_CODE_ATTRIBUTE; +import static org.apache.nifi.processors.smb.FetchSmb.ERROR_MESSAGE_ATTRIBUTE; +import static org.apache.nifi.processors.smb.FetchSmb.REL_FAILURE; +import static org.apache.nifi.processors.smb.FetchSmb.REL_SUCCESS; +import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE; +import static org.apache.nifi.util.TestRunners.newTestRunner; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.services.smb.SmbClientProviderService; +import org.apache.nifi.services.smb.SmbClientService; +import org.apache.nifi.services.smb.SmbException; +import org.apache.nifi.util.TestRunner; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +class FetchSmbTest { + + public static final String CLIENT_SERVICE_PROVIDER_ID = "client-provider-service-id"; + + @Mock + SmbClientService mockNifiSmbClientService; + + @Mock + SmbClientProviderService clientProviderService; + + @BeforeEach + public void beforeEach() throws Exception { + MockitoAnnotations.initMocks(this); + when(clientProviderService.getClient()).thenReturn(mockNifiSmbClientService); + when(clientProviderService.getIdentifier()).thenReturn(CLIENT_SERVICE_PROVIDER_ID); + when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://localhost:445/share")); + } + + @Test + public void shouldUseSmbClientProperly() throws Exception { + final TestRunner testRunner = createRunner(); + mockNifiSmbClientService(); + Map attributes = new HashMap<>(); + attributes.put("path", "testDirectory"); + attributes.put("filename", "cannotReadThis"); + testRunner.enqueue("ignore", attributes); + attributes = new HashMap<>(); + attributes.put("path", "testDirectory"); + attributes.put("filename", "canReadThis"); + testRunner.enqueue("ignore", attributes); + testRunner.run(); + testRunner.assertTransferCount(REL_FAILURE, 1); + assertEquals("test exception", + testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0).getAttribute(ERROR_MESSAGE_ATTRIBUTE)); + assertEquals("1", + testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0).getAttribute(ERROR_CODE_ATTRIBUTE)); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 1); + assertEquals("content", + testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent()); + testRunner.assertValid(); + } + + @Test + public void noSuchAttributeReferencedInELShouldResultInFailure() throws Exception { + final TestRunner testRunner = createRunner(); + mockNifiSmbClientService(); + Map attributes = new HashMap<>(); + attributes.put("different_field_name_than_what_EL_expect", "testDirectory/cannotFindThis"); + testRunner.enqueue("ignore", attributes); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_FAILURE, 1); + testRunner.assertValid(); + } + + private void mockNifiSmbClientService() throws IOException { + doThrow(new SmbException("test exception", 1L, new RuntimeException())).when(mockNifiSmbClientService) + .readFile(anyString(), any(OutputStream.class)); + doAnswer(invocation -> { + final OutputStream o = invocation.getArgument(1); + final ByteArrayInputStream bytes = new ByteArrayInputStream("content".getBytes()); + IOUtils.copy(bytes, o); + return true; + }).when(mockNifiSmbClientService) + .readFile(eq("testDirectory/canReadThis"), any(OutputStream.class)); + } + + private TestRunner createRunner() throws Exception { + final TestRunner testRunner = newTestRunner(FetchSmb.class); + testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, CLIENT_SERVICE_PROVIDER_ID); + testRunner.addControllerService(CLIENT_SERVICE_PROVIDER_ID, clientProviderService); + testRunner.enableControllerService(clientProviderService); + return testRunner; + } + +} diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java index 8ef5aa7ba3..f059502353 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.smb; import static java.util.Arrays.asList; -import static java.util.Arrays.fill; import static java.util.stream.Collectors.toSet; import static org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_STRATEGY; import static org.apache.nifi.processor.util.list.AbstractListProcessor.RECORD_WRITER; @@ -26,13 +25,9 @@ import static org.apache.nifi.processors.smb.ListSmb.DIRECTORY; import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER; import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE; import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_SIZE; -import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE; -import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN; import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME; -import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD; import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT; import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE; -import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME; import static org.apache.nifi.util.TestRunners.newTestRunner; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -40,47 +35,16 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.services.smb.SmbClientProviderService; -import org.apache.nifi.services.smb.SmbListableEntity; import org.apache.nifi.services.smb.SmbjClientProviderService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.images.builder.Transferable; -import org.testcontainers.utility.DockerImageName; -public class ListSmbIT { - - private final static Integer DEFAULT_SAMBA_PORT = 445; - private final static Logger logger = LoggerFactory.getLogger(ListSmbTest.class); - private final GenericContainer sambaContainer = new GenericContainer<>(DockerImageName.parse("dperson/samba")) - .withExposedPorts(DEFAULT_SAMBA_PORT, 139) - .waitingFor(Wait.forListeningPort()) - .withLogConsumer(new Slf4jLogConsumer(logger)) - .withCommand("-w domain -u username;password -s share;/folder;;no;no;username;;; -p"); - - @BeforeEach - public void beforeEach() { - sambaContainer.start(); - } - - @AfterEach - public void afterEach() { - sambaContainer.stop(); - } +public class ListSmbIT extends SambaTestContainers { @ParameterizedTest @ValueSource(ints = {4, 50, 45000}) @@ -89,8 +53,7 @@ public class ListSmbIT { final TestRunner testRunner = newTestRunner(ListSmb.class); testRunner.setProperty(LISTING_STRATEGY, "none"); testRunner.setProperty(MINIMUM_AGE, "0 ms"); - SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner); - testRunner.enableControllerService(smbjClientProviderService); + SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true); testRunner.run(); testRunner.assertTransferCount(REL_SUCCESS, 1); testRunner.getFlowFilesForRelationship(REL_SUCCESS) @@ -105,8 +68,7 @@ public class ListSmbIT { testRunner.setProperty(LISTING_STRATEGY, "none"); testRunner.setProperty(MINIMUM_AGE, "0 ms"); testRunner.setProperty(DIRECTORY, "folderDoesNotExists"); - SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner); - testRunner.enableControllerService(smbjClientProviderService); + SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true); testRunner.run(); assertEquals(1, testRunner.getLogger().getErrorMessages().size()); testRunner.assertValid(); @@ -116,7 +78,7 @@ public class ListSmbIT { @Test public void shouldShowBulletinWhenShareIsInvalid() throws Exception { final TestRunner testRunner = newTestRunner(ListSmb.class); - SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner); + SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, false); testRunner.setProperty(smbjClientProviderService, SHARE, "invalid_share"); testRunner.enableControllerService(smbjClientProviderService); testRunner.run(); @@ -128,8 +90,7 @@ public class ListSmbIT { @Test public void shouldShowBulletinWhenSMBPortIsInvalid() throws Exception { final TestRunner testRunner = newTestRunner(ListSmb.class); - final SmbClientProviderService smbClientProviderService = - configureTestRunnerForSambaDockerContainer(testRunner); + final SmbClientProviderService smbClientProviderService = configureSmbClient(testRunner, false); testRunner.setProperty(smbClientProviderService, PORT, "1"); testRunner.enableControllerService(smbClientProviderService); testRunner.run(); @@ -141,12 +102,12 @@ public class ListSmbIT { @Test public void shouldShowBulletinWhenSMBHostIsInvalid() throws Exception { final TestRunner testRunner = newTestRunner(ListSmb.class); - final SmbClientProviderService smbClientProviderService = - configureTestRunnerForSambaDockerContainer(testRunner); + final SmbClientProviderService smbClientProviderService = configureSmbClient(testRunner, false); testRunner.setProperty(smbClientProviderService, HOSTNAME, "this.host.should.not.exists"); testRunner.enableControllerService(smbClientProviderService); testRunner.run(); assertEquals(1, testRunner.getLogger().getErrorMessages().size()); + testRunner.assertValid(); testRunner.disableControllerService(smbClientProviderService); } @@ -163,22 +124,14 @@ public class ListSmbIT { final TestRunner testRunner = newTestRunner(ListSmb.class); final MockRecordWriter writer = new MockRecordWriter(null, false); - final SimpleRecordSchema simpleRecordSchema = SmbListableEntity.getRecordSchema(); testRunner.addControllerService("writer", writer); testRunner.enableControllerService(writer); testRunner.setProperty(LISTING_STRATEGY, "none"); testRunner.setProperty(RECORD_WRITER, "writer"); testRunner.setProperty(MINIMUM_AGE, "0 ms"); - SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner); - testRunner.enableControllerService(smbjClientProviderService); + final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true); testRunner.run(); testRunner.assertTransferCount(REL_SUCCESS, 1); - final String result = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent(); - final int identifierColumnIndex = simpleRecordSchema.getFieldNames().indexOf("identifier"); - final Set actual = Arrays.stream(result.split("\n")) - .map(row -> row.split(",")[identifierColumnIndex]) - .collect(toSet()); - assertEquals(testFiles, actual); testRunner.assertValid(); testRunner.disableControllerService(smbjClientProviderService); } @@ -190,8 +143,7 @@ public class ListSmbIT { )); testFiles.forEach(file -> writeFile(file, generateContentWithSize(4))); final TestRunner testRunner = newTestRunner(ListSmb.class); - final SmbjClientProviderService smbjClientProviderService = - configureTestRunnerForSambaDockerContainer(testRunner); + final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, false); testRunner.setProperty(LISTING_STRATEGY, "none"); testRunner.setProperty(MINIMUM_AGE, "0 sec"); testRunner.enableControllerService(smbjClientProviderService); @@ -202,16 +154,6 @@ public class ListSmbIT { .map(MockFlowFile::getAttributes) .collect(toSet()); - final Set identifiers = allAttributes.stream() - .map(attributes -> attributes.get("identifier")) - .collect(toSet()); - assertEquals(testFiles, identifiers); - - allAttributes.forEach(attribute -> assertEquals( - Stream.of(attribute.get("path"), attribute.get("filename")).filter(s -> !s.isEmpty()).collect( - Collectors.joining("/")), - attribute.get("absolute.path"))); - final Set fileNames = allAttributes.stream() .map(attributes -> attributes.get("filename")) .collect(toSet()); @@ -225,9 +167,7 @@ public class ListSmbIT { @Test public void shouldFilterFilesBySizeCriteria() throws Exception { final TestRunner testRunner = newTestRunner(ListSmb.class); - final SmbClientProviderService smbClientProviderService = - configureTestRunnerForSambaDockerContainer(testRunner); - testRunner.enableControllerService(smbClientProviderService); + final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true); testRunner.setProperty(MINIMUM_AGE, "0 ms"); testRunner.setProperty(LISTING_STRATEGY, "none"); @@ -247,17 +187,15 @@ public class ListSmbIT { testRunner.setProperty(MINIMUM_SIZE, "50 B"); testRunner.run(); testRunner.assertTransferCount(REL_SUCCESS, 1); - - testRunner.disableControllerService(smbClientProviderService); + testRunner.assertValid(); + testRunner.disableControllerService(smbjClientProviderService); } @Test public void shouldFilterByGivenSuffix() throws Exception { final TestRunner testRunner = newTestRunner(ListSmb.class); - final SmbClientProviderService smbClientProviderService = - configureTestRunnerForSambaDockerContainer(testRunner); - testRunner.enableControllerService(smbClientProviderService); + final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true); testRunner.setProperty(MINIMUM_AGE, "0 ms"); testRunner.setProperty(FILE_NAME_SUFFIX_FILTER, ".suffix"); testRunner.setProperty(LISTING_STRATEGY, "none"); @@ -265,33 +203,8 @@ public class ListSmbIT { writeFile("should_skip_this.suffix", generateContentWithSize(1)); testRunner.run(); testRunner.assertTransferCount(REL_SUCCESS, 1); - testRunner.disableControllerService(smbClientProviderService); - } - - private SmbjClientProviderService configureTestRunnerForSambaDockerContainer(TestRunner testRunner) - throws Exception { - SmbjClientProviderService smbjClientProviderService = new SmbjClientProviderService(); - testRunner.addControllerService("connection-pool", smbjClientProviderService); - testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "connection-pool"); - testRunner.setProperty(smbjClientProviderService, HOSTNAME, sambaContainer.getHost()); - testRunner.setProperty(smbjClientProviderService, PORT, - String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT))); - testRunner.setProperty(smbjClientProviderService, USERNAME, "username"); - testRunner.setProperty(smbjClientProviderService, PASSWORD, "password"); - testRunner.setProperty(smbjClientProviderService, SHARE, "share"); - testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain"); - return smbjClientProviderService; - } - - private String generateContentWithSize(int sizeInBytes) { - byte[] bytes = new byte[sizeInBytes]; - fill(bytes, (byte) 1); - return new String(bytes); - } - - private void writeFile(String path, String content) { - String containerPath = "/folder/" + path; - sambaContainer.copyFileToContainer(Transferable.of(content), containerPath); + testRunner.assertValid(); + testRunner.disableControllerService(smbjClientProviderService); } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java index e279bf2e85..0594ef3e2f 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java @@ -115,7 +115,6 @@ class ListSmbTest { testRunner.run(); testRunner.assertTransferCount(REL_SUCCESS, 1); - testRunner.assertValid(); } @@ -236,6 +235,7 @@ class ListSmbTest { when(mockNifiSmbClientService.listRemoteFiles(anyString())).thenThrow(new RuntimeException("test exception")); testRunner.run(); assertEquals(1, testRunner.getLogger().getErrorMessages().size()); + testRunner.assertValid(); } @Test @@ -288,7 +288,7 @@ class ListSmbTest { private SmbListableEntity listableEntity(String name, long timeStamp) { return SmbListableEntity.builder() .setName(name) - .setTimestamp(timeStamp) + .setLastModifiedTime(timeStamp) .build(); } diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java new file mode 100644 index 0000000000..f38d36d171 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.util.Arrays.fill; +import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE; +import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN; +import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME; +import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD; +import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT; +import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE; +import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME; + +import org.apache.nifi.services.smb.SmbjClientProviderService; +import org.apache.nifi.util.TestRunner; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.utility.DockerImageName; + +public class SambaTestContainers { + + protected final static Integer DEFAULT_SAMBA_PORT = 445; + protected final static Logger logger = LoggerFactory.getLogger(SambaTestContainers.class); + protected final GenericContainer sambaContainer = new GenericContainer<>(DockerImageName.parse("dperson/samba")) + .withExposedPorts(DEFAULT_SAMBA_PORT, 139) + .waitingFor(Wait.forListeningPort()) + .withLogConsumer(new Slf4jLogConsumer(logger)) + .withCommand("-w domain -u username;password -s share;/folder;;no;no;username;;; -p"); + + @BeforeEach + public void beforeEach() { + sambaContainer.start(); + } + + @AfterEach + public void afterEach() { + sambaContainer.stop(); + } + + protected SmbjClientProviderService configureSmbClient(TestRunner testRunner, boolean shouldEnableSmbClient) + throws Exception { + final SmbjClientProviderService smbjClientProviderService = new SmbjClientProviderService(); + testRunner.addControllerService("client-provider", smbjClientProviderService); + testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "client-provider"); + testRunner.setProperty(smbjClientProviderService, HOSTNAME, sambaContainer.getHost()); + testRunner.setProperty(smbjClientProviderService, PORT, + String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT))); + testRunner.setProperty(smbjClientProviderService, USERNAME, "username"); + testRunner.setProperty(smbjClientProviderService, PASSWORD, "password"); + testRunner.setProperty(smbjClientProviderService, SHARE, "share"); + testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain"); + if (shouldEnableSmbClient) { + testRunner.enableControllerService(smbjClientProviderService); + } + return smbjClientProviderService; + } + + protected String generateContentWithSize(int sizeInBytes) { + byte[] bytes = new byte[sizeInBytes]; + fill(bytes, (byte) 1); + return new String(bytes); + } + + protected void writeFile(String path, String content) { + String containerPath = "/folder/" + path; + sambaContainer.copyFileToContainer(Transferable.of(content), containerPath); + } + +} diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java index f72cec6a2b..3ee35d6289 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java @@ -116,7 +116,7 @@ public class SmbjClientProviderService extends AbstractControllerService impleme @Override public SmbClientService getClient() throws IOException { - final SmbjClientService client = new SmbjClientService(smbClient, authenticationContext); + final SmbjClientService client = new SmbjClientService(smbClient, authenticationContext, getServiceLocation()); try { client.connectToShare(hostname, port, shareName); diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java index 431a4e9f52..a103fec359 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java @@ -33,8 +33,11 @@ import com.hierynomus.smbj.connection.Connection; import com.hierynomus.smbj.session.Session; import com.hierynomus.smbj.share.Directory; import com.hierynomus.smbj.share.DiskShare; +import com.hierynomus.smbj.share.File; import com.hierynomus.smbj.share.Share; import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; import java.util.EnumSet; import java.util.List; import java.util.stream.Stream; @@ -42,17 +45,20 @@ import java.util.stream.Stream; public class SmbjClientService implements SmbClientService { private static final List SPECIAL_DIRECTORIES = asList(".", ".."); + private static final long UNCATEGORISED_ERROR = -1L; final private AuthenticationContext authenticationContext; final private SMBClient smbClient; + final private URI serviceLocation; private Connection connection; private Session session; private DiskShare share; - public SmbjClientService(SMBClient smbClient, AuthenticationContext authenticationContext) { + public SmbjClientService(SMBClient smbClient, AuthenticationContext authenticationContext, URI serviceLocation) { this.smbClient = smbClient; this.authenticationContext = authenticationContext; + this.serviceLocation = serviceLocation; } public void connectToShare(String hostname, int port, String shareName) throws IOException { @@ -104,7 +110,7 @@ public class SmbjClientService implements SmbClientService { return Stream.of(filePath).flatMap(path -> { final Directory directory = openDirectory(path); return stream(directory::spliterator, 0, false) - .map(entity -> buildSmbListableEntity(entity, path)) + .map(entity -> buildSmbListableEntity(entity, path, serviceLocation)) .filter(entity -> !specialDirectory(entity)) .flatMap(listable -> listable.isDirectory() ? listRemoteFiles(listable.getPathWithName()) : Stream.of(listable)) @@ -123,18 +129,39 @@ public class SmbjClientService implements SmbClientService { } } - private SmbListableEntity buildSmbListableEntity(FileIdBothDirectoryInformation info, String path) { + @Override + public void readFile(String fileName, OutputStream outputStream) throws IOException { + try (File f = share.openFile( + fileName, + EnumSet.of(AccessMask.GENERIC_READ), + EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL), + EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ), + SMB2CreateDisposition.FILE_OPEN, + EnumSet.of(SMB2CreateOptions.FILE_SEQUENTIAL_ONLY)) + ) { + f.read(outputStream); + } catch (SMBApiException a) { + throw new SmbException(a.getMessage(), a.getStatusCode(), a); + } catch (Exception e) { + throw new SmbException(e.getMessage(), UNCATEGORISED_ERROR, e); + } finally { + outputStream.close(); + } + } + + private SmbListableEntity buildSmbListableEntity(FileIdBothDirectoryInformation info, String path, URI serviceLocation) { return SmbListableEntity.builder() .setName(info.getFileName()) .setShortName(info.getShortName()) .setPath(path) - .setTimestamp(info.getLastWriteTime().toEpochMillis()) + .setLastModifiedTime(info.getLastWriteTime().toEpochMillis()) .setCreationTime(info.getCreationTime().toEpochMillis()) .setChangeTime(info.getChangeTime().toEpochMillis()) .setLastAccessTime(info.getLastAccessTime().toEpochMillis()) .setDirectory((info.getFileAttributes() & FileAttributes.FILE_ATTRIBUTE_DIRECTORY.getValue()) != 0) .setSize(info.getEndOfFile()) .setAllocationSize(info.getAllocationSize()) + .setServiceLocation(serviceLocation) .build(); }