NIFI-10230 added FetchSmb

This closes #6279.

Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
Gabor Kulik 2022-08-02 15:58:50 +02:00 committed by Tamas Palfy
parent 98a1874038
commit 5b565679df
15 changed files with 633 additions and 186 deletions

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.nifi.services.smb; package org.apache.nifi.services.smb;
import java.io.IOException;
import java.io.OutputStream;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
@ -27,4 +29,5 @@ public interface SmbClientService extends AutoCloseable {
void createDirectory(String path); void createDirectory(String path);
void readFile(String fileName, OutputStream outputStream) throws IOException;
} }

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.smb;
public class SmbException extends RuntimeException {
private long errorCode;
public SmbException(String message, long errorCode, Exception cause) {
super(message, cause);
this.errorCode = errorCode;
}
public long getErrorCode() {
return errorCode;
}
}

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.services.smb; package org.apache.nifi.services.smb;
import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -29,30 +30,42 @@ import org.apache.nifi.serialization.record.RecordFieldType;
public class SmbListableEntity implements ListableEntity { public class SmbListableEntity implements ListableEntity {
public static final String FILENAME = "filename";
public static final String SHORT_NAME = "shortName";
public static final String PATH = "path";
public static final String SERVICE_LOCATION = "serviceLocation";
public static final String CREATION_TIME = "creationTime";
public static final String LAST_ACCESS_TIME = "lastAccessTime";
public static final String CHANGE_TIME = "changeTime";
public static final String LAST_MODIFIED_TIME = "lastModifiedTime";
public static final String SIZE = "size";
public static final String ALLOCATION_SIZE = "allocationSize";
private final String name; private final String name;
private final String shortName; private final String shortName;
private final String path; private final String path;
private final long timestamp; private final long lastModifiedTime;
private final long creationTime; private final long creationTime;
private final long lastAccessTime; private final long lastAccessTime;
private final long changeTime; private final long changeTime;
private final boolean directory; private final boolean directory;
private final long size; private final long size;
private final long allocationSize; private final long allocationSize;
private final URI serviceLocation;
private SmbListableEntity(String name, String shortName, String path, long timestamp, long creationTime, private SmbListableEntity(String name, String shortName, String path, long lastModifiedTime, long creationTime,
long lastAccessTime, long changeTime, boolean directory, long lastAccessTime, long changeTime, boolean directory,
long size, long allocationSize) { long size, long allocationSize, URI serviceLocation) {
this.name = name; this.name = name;
this.shortName = shortName; this.shortName = shortName;
this.path = path; this.path = path;
this.timestamp = timestamp; this.lastModifiedTime = lastModifiedTime;
this.creationTime = creationTime; this.creationTime = creationTime;
this.lastAccessTime = lastAccessTime; this.lastAccessTime = lastAccessTime;
this.changeTime = changeTime; this.changeTime = changeTime;
this.directory = directory; this.directory = directory;
this.size = size; this.size = size;
this.allocationSize = allocationSize; this.allocationSize = allocationSize;
this.serviceLocation = serviceLocation;
} }
public static SimpleRecordSchema getRecordSchema() { public static SimpleRecordSchema getRecordSchema() {
@ -60,8 +73,8 @@ public class SmbListableEntity implements ListableEntity {
new RecordField("filename", RecordFieldType.STRING.getDataType(), false), new RecordField("filename", RecordFieldType.STRING.getDataType(), false),
new RecordField("shortName", RecordFieldType.STRING.getDataType(), false), new RecordField("shortName", RecordFieldType.STRING.getDataType(), false),
new RecordField("path", RecordFieldType.STRING.getDataType(), false), new RecordField("path", RecordFieldType.STRING.getDataType(), false),
new RecordField("identifier", RecordFieldType.STRING.getDataType(), false), new RecordField("absolute.path", RecordFieldType.STRING.getDataType(), false),
new RecordField("timestamp", RecordFieldType.LONG.getDataType(), false), new RecordField("lastModifiedTime", RecordFieldType.LONG.getDataType(), false),
new RecordField("creationTime", RecordFieldType.LONG.getDataType(), false), new RecordField("creationTime", RecordFieldType.LONG.getDataType(), false),
new RecordField("lastAccessTime", RecordFieldType.LONG.getDataType(), false), new RecordField("lastAccessTime", RecordFieldType.LONG.getDataType(), false),
new RecordField("changeTime", RecordFieldType.LONG.getDataType(), false), new RecordField("changeTime", RecordFieldType.LONG.getDataType(), false),
@ -108,6 +121,10 @@ public class SmbListableEntity implements ListableEntity {
return path.isEmpty() ? name : path + "/" + name; return path.isEmpty() ? name : path + "/" + name;
} }
public long getLastModifiedTime() {
return lastModifiedTime;
}
@Override @Override
public String getIdentifier() { public String getIdentifier() {
return getPathWithName(); return getPathWithName();
@ -115,7 +132,7 @@ public class SmbListableEntity implements ListableEntity {
@Override @Override
public long getTimestamp() { public long getTimestamp() {
return timestamp; return getLastModifiedTime();
} }
@Override @Override
@ -146,36 +163,42 @@ public class SmbListableEntity implements ListableEntity {
@Override @Override
public String toString() { public String toString() {
return getPathWithName() + " (last write: " + timestamp + " size: " + size + ")"; return getPathWithName() + " (last write: " + lastModifiedTime + " size: " + size + ")";
} }
@Override @Override
public Record toRecord() { public Record toRecord() {
final Map<String, Object> record = new TreeMap<>(); final Map<String, Object> record = new TreeMap<>();
record.put("filename", getName()); record.put(FILENAME, getName());
record.put("shortName", getShortName()); record.put(SHORT_NAME, getShortName());
record.put("path", path); record.put(PATH, getPath());
record.put("identifier", getPathWithName()); record.put(SERVICE_LOCATION, getServiceLocation().toString());
record.put("timestamp", getTimestamp()); record.put(CREATION_TIME, getCreationTime());
record.put("creationTime", getCreationTime()); record.put(LAST_ACCESS_TIME, getLastAccessTime());
record.put("lastAccessTime", getLastAccessTime()); record.put(LAST_MODIFIED_TIME, getLastModifiedTime());
record.put("size", getSize()); record.put(CHANGE_TIME, getChangeTime());
record.put("allocationSize", getAllocationSize()); record.put(SIZE, getSize());
record.put(ALLOCATION_SIZE, getAllocationSize());
return new MapRecord(getRecordSchema(), record); return new MapRecord(getRecordSchema(), record);
} }
private URI getServiceLocation() {
return serviceLocation;
}
public static class SmbListableEntityBuilder { public static class SmbListableEntityBuilder {
private String name; private String name;
private String shortName; private String shortName;
private String path = ""; private String path = "";
private long timestamp; private long lastModifiedTime;
private long creationTime; private long creationTime;
private long lastAccessTime; private long lastAccessTime;
private long changeTime; private long changeTime;
private boolean directory = false; private boolean directory = false;
private long size = 0; private long size = 0;
private long allocationSize = 0; private long allocationSize = 0;
private URI serviceLocation;
public SmbListableEntityBuilder setName(String name) { public SmbListableEntityBuilder setName(String name) {
this.name = name; this.name = name;
@ -192,8 +215,8 @@ public class SmbListableEntity implements ListableEntity {
return this; return this;
} }
public SmbListableEntityBuilder setTimestamp(long timestamp) { public SmbListableEntityBuilder setLastModifiedTime(long lastModifiedTime) {
this.timestamp = timestamp; this.lastModifiedTime = lastModifiedTime;
return this; return this;
} }
@ -227,9 +250,14 @@ public class SmbListableEntity implements ListableEntity {
return this; return this;
} }
public SmbListableEntityBuilder setServiceLocation(URI serviceLocation) {
this.serviceLocation = serviceLocation;
return this;
}
public SmbListableEntity build() { public SmbListableEntity build() {
return new SmbListableEntity(name, shortName, path, timestamp, creationTime, lastAccessTime, changeTime, return new SmbListableEntity(name, shortName, path, lastModifiedTime, creationTime, lastAccessTime, changeTime,
directory, size, allocationSize); directory, size, allocationSize, serviceLocation);
} }
} }

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.smb;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableSet;
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.services.smb.SmbClientProviderService;
import org.apache.nifi.services.smb.SmbClientService;
import org.apache.nifi.services.smb.SmbException;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"samba", "smb", "cifs", "files", "fetch"})
@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
@WritesAttributes({
@WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails."),
@WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails.")
})
public class FetchSmb extends AbstractProcessor {
public static final String ERROR_CODE_ATTRIBUTE = "error.code";
public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
.Builder().name("remote-file")
.displayName("Remote File")
.description("The full path of the file to be retrieved from the remote server. Expression language is supported.")
.required(true)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.defaultValue("${path}/${filename}")
.addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.build();
public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
.name("smb-client-provider-service")
.displayName("SMB Client Provider Service")
.description("Specifies the SMB client provider to use for creating SMB connections.")
.required(true)
.identifiesControllerService(SmbClientProviderService.class)
.build();
public static final Relationship REL_SUCCESS =
new Relationship.Builder()
.name("success")
.description("A flowfile will be routed here for each successfully fetched file.")
.build();
public static final Relationship REL_FAILURE =
new Relationship.Builder().name("failure")
.description(
"A flowfile will be routed here when failed to fetch its content.")
.build();
public static final Set<Relationship> RELATIONSHIPS = unmodifiableSet(new HashSet<>(asList(
REL_SUCCESS,
REL_FAILURE
)));
public static final String UNCATEGORIZED_ERROR = "-2";
private static final List<PropertyDescriptor> PROPERTIES = asList(
SMB_CLIENT_PROVIDER_SERVICE,
REMOTE_FILE
);
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final SmbClientProviderService clientProviderService =
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
try (SmbClientService client = clientProviderService.getClient()) {
fetchAndTransfer(session, context, client, flowFile);
} catch (Exception e) {
getLogger().error("Couldn't connect to SMB.", e);
flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
session.transfer(flowFile, REL_FAILURE);
}
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
private void fetchAndTransfer(ProcessSession session, ProcessContext context, SmbClientService client,
FlowFile flowFile) {
final Map<String, String> attributes = flowFile.getAttributes();
final String filename = context.getProperty(REMOTE_FILE)
.evaluateAttributeExpressions(attributes).getValue();
try {
flowFile = session.write(flowFile, outputStream -> client.readFile(filename, outputStream));
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
getLogger().error("Couldn't fetch file {}.", filename, e);
flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, getErrorMessage(e));
session.transfer(flowFile, REL_FAILURE);
}
}
private String getErrorCode(Exception exception) {
return Optional.ofNullable(exception instanceof SmbException ? (SmbException) exception : null)
.map(SmbException::getErrorCode)
.map(String::valueOf)
.orElse(UNCATEGORIZED_ERROR);
}
private String getErrorMessage(Exception exception) {
return Optional.ofNullable(exception.getMessage())
.orElse(exception.getClass().getSimpleName());
}
}

View File

@ -85,7 +85,7 @@ import java.util.regex.Pattern;
@CapabilityDescription("Reads file from a samba network location to FlowFiles. " + @CapabilityDescription("Reads file from a samba network location to FlowFiles. " +
"Use this processor instead of a cifs mounts if share access control is important. " + "Use this processor instead of a cifs mounts if share access control is important. " +
"Configure the Hostname, Share and Directory accordingly: \\\\[Hostname]\\[Share]\\[path\\to\\Directory]") "Configure the Hostname, Share and Directory accordingly: \\\\[Hostname]\\[Share]\\[path\\to\\Directory]")
@SeeAlso({PutSmbFile.class}) @SeeAlso({PutSmbFile.class, ListSmb.class, FetchSmb.class})
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the network share"), @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the network share"),
@WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's network share name. For example, " @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's network share name. For example, "

View File

@ -16,27 +16,37 @@
*/ */
package org.apache.nifi.processors.smb; package org.apache.nifi.processors.smb;
import static java.time.ZoneOffset.UTC;
import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; import static java.time.format.DateTimeFormatter.ISO_DATE_TIME;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.nifi.components.state.Scope.CLUSTER; import static org.apache.nifi.components.state.Scope.CLUSTER;
import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR; import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR;
import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR;
import static org.apache.nifi.services.smb.SmbListableEntity.ALLOCATION_SIZE;
import static org.apache.nifi.services.smb.SmbListableEntity.CHANGE_TIME;
import static org.apache.nifi.services.smb.SmbListableEntity.LAST_MODIFIED_TIME;
import static org.apache.nifi.services.smb.SmbListableEntity.CREATION_TIME;
import static org.apache.nifi.services.smb.SmbListableEntity.FILENAME;
import static org.apache.nifi.services.smb.SmbListableEntity.LAST_ACCESS_TIME;
import static org.apache.nifi.services.smb.SmbListableEntity.PATH;
import static org.apache.nifi.services.smb.SmbListableEntity.SERVICE_LOCATION;
import static org.apache.nifi.services.smb.SmbListableEntity.SHORT_NAME;
import static org.apache.nifi.services.smb.SmbListableEntity.SIZE;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
@ -69,7 +79,7 @@ import org.apache.nifi.services.smb.SmbListableEntity;
@PrimaryNodeOnly @PrimaryNodeOnly
@TriggerSerially @TriggerSerially
@Tags({"samba, smb, cifs, files", "list"}) @Tags({"samba, smb, cifs, files", "list"})
@SeeAlso({PutSmbFile.class, GetSmbFile.class}) @SeeAlso({PutSmbFile.class, GetSmbFile.class, FetchSmb.class})
@CapabilityDescription("Lists concrete files shared via SMB protocol. " + @CapabilityDescription("Lists concrete files shared via SMB protocol. " +
"Each listed file may result in one flowfile, the metadata being written as flowfile attributes. " + "Each listed file may result in one flowfile, the metadata being written as flowfile attributes. " +
"Or - in case the 'Record Writer' property is set - the entire result is written as records to a single flowfile. " "Or - in case the 'Record Writer' property is set - the entire result is written as records to a single flowfile. "
@ -79,32 +89,26 @@ import org.apache.nifi.services.smb.SmbListableEntity;
"previous node left off without duplicating all of the data.") "previous node left off without duplicating all of the data.")
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), @WritesAttribute(attribute = FILENAME, description = "The name of the file that was read from filesystem."),
@WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."), @WritesAttribute(attribute = SHORT_NAME, description = "The short name of the file that was read from filesystem."),
@WritesAttribute(attribute = "path", description = @WritesAttribute(attribute = PATH, description =
"The path is set to the relative path of the file's directory " "The path is set to the relative path of the file's directory on the remote filesystem compared to the "
+ "on filesystem compared to the Share and Input Directory properties and the configured host " + "Share root directory. For example, for a given remote location"
+ "and port inherited from the configured connection pool controller service. For example, for " + "smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from "
+ "a given remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from " + "smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file then the path attribute will be set to "
+ "smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file then the path attribute will be set to \"sub/folder/file\"."), + "\"DIRECTORY/sub/folder/file\"."),
@WritesAttribute(attribute = "absolute.path", description = @WritesAttribute(attribute = SERVICE_LOCATION, description =
"The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " "The SMB URL of the share."),
+ "given a remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listen from " @WritesAttribute(attribute = LAST_MODIFIED_TIME, description =
+ "SHARE/DIRECTORY/sub/folder/file then the absolute.path attribute will be set to " "The timestamp of when the file's content changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'."),
+ "SHARE/DIRECTORY/sub/folder/file."), @WritesAttribute(attribute = CREATION_TIME, description =
@WritesAttribute(attribute = "identifier", description = "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'."),
"The identifier of the file. This equals to the path attribute so two files with the same relative path " @WritesAttribute(attribute = LAST_ACCESS_TIME, description =
+ "coming from different file shares considered to be identical."), "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'."),
@WritesAttribute(attribute = "timestamp", description = @WritesAttribute(attribute = CHANGE_TIME, description =
"The timestamp of when the file's content changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), "The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'."),
@WritesAttribute(attribute = "createTime", description = @WritesAttribute(attribute = SIZE, description = "The size of the file in bytes."),
"The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), @WritesAttribute(attribute = ALLOCATION_SIZE, description = "The number of bytes allocated for the file on the server."),
@WritesAttribute(attribute = "lastAccessTime", description =
"The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
@WritesAttribute(attribute = "changeTime", description =
"The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
@WritesAttribute(attribute = "size", description = "The number of bytes in the source file"),
@WritesAttribute(attribute = "allocationSize", description = "The number of bytes allocated for the file on the server"),
}) })
@Stateful(scopes = {Scope.CLUSTER}, description = @Stateful(scopes = {Scope.CLUSTER}, description =
"After performing a listing of files, the state of the previous listing can be stored in order to list files " "After performing a listing of files, the state of the previous listing can be stored in order to list files "
@ -137,7 +141,7 @@ public class ListSmb extends AbstractListProcessor<SmbListableEntity> {
public static final PropertyDescriptor MAXIMUM_AGE = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAXIMUM_AGE = new PropertyDescriptor.Builder()
.displayName("Maximum File Age") .displayName("Maximum File Age")
.name("max-file-age") .name("max-file-age")
.description("Any file older than the given value will be omitted. ") .description("Any file older than the given value will be omitted.")
.required(false) .required(false)
.addValidator(TIME_PERIOD_VALIDATOR) .addValidator(TIME_PERIOD_VALIDATOR)
.build(); .build();
@ -184,11 +188,11 @@ public class ListSmb extends AbstractListProcessor<SmbListableEntity> {
.build(); .build();
private static final List<PropertyDescriptor> PROPERTIES = unmodifiableList(asList( private static final List<PropertyDescriptor> PROPERTIES = unmodifiableList(asList(
SMB_LISTING_STRATEGY,
SMB_CLIENT_PROVIDER_SERVICE, SMB_CLIENT_PROVIDER_SERVICE,
SMB_LISTING_STRATEGY,
DIRECTORY, DIRECTORY,
AbstractListProcessor.RECORD_WRITER,
FILE_NAME_SUFFIX_FILTER, FILE_NAME_SUFFIX_FILTER,
AbstractListProcessor.RECORD_WRITER,
MINIMUM_AGE, MINIMUM_AGE,
MAXIMUM_AGE, MAXIMUM_AGE,
MINIMUM_SIZE, MINIMUM_SIZE,
@ -207,17 +211,18 @@ public class ListSmb extends AbstractListProcessor<SmbListableEntity> {
@Override @Override
protected Map<String, String> createAttributes(SmbListableEntity entity, ProcessContext context) { protected Map<String, String> createAttributes(SmbListableEntity entity, ProcessContext context) {
final Map<String, String> attributes = new TreeMap<>(); final Map<String, String> attributes = new TreeMap<>();
attributes.put("filename", entity.getName()); final SmbClientProviderService clientProviderService =
attributes.put("shortname", entity.getShortName()); context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
attributes.put("path", entity.getPath()); attributes.put(FILENAME, entity.getName());
attributes.put("absolute.path", entity.getPathWithName()); attributes.put(SHORT_NAME, entity.getShortName());
attributes.put("identifier", entity.getIdentifier()); attributes.put(PATH, entity.getPath());
attributes.put("timestamp", formatTimeStamp(entity.getTimestamp())); attributes.put(SERVICE_LOCATION, clientProviderService.getServiceLocation().toString());
attributes.put("creationTime", formatTimeStamp(entity.getCreationTime())); attributes.put(LAST_MODIFIED_TIME, formatTimeStamp(entity.getLastModifiedTime()));
attributes.put("lastAccessTime", formatTimeStamp(entity.getLastAccessTime())); attributes.put(CREATION_TIME, formatTimeStamp(entity.getCreationTime()));
attributes.put("changeTime", formatTimeStamp(entity.getChangeTime())); attributes.put(LAST_ACCESS_TIME, formatTimeStamp(entity.getLastAccessTime()));
attributes.put("size", String.valueOf(entity.getSize())); attributes.put(CHANGE_TIME, formatTimeStamp(entity.getChangeTime()));
attributes.put("allocationSize", String.valueOf(entity.getAllocationSize())); attributes.put(SIZE, String.valueOf(entity.getSize()));
attributes.put(ALLOCATION_SIZE, String.valueOf(entity.getAllocationSize()));
return unmodifiableMap(attributes); return unmodifiableMap(attributes);
} }
@ -286,7 +291,7 @@ public class ListSmb extends AbstractListProcessor<SmbListableEntity> {
private String formatTimeStamp(long timestamp) { private String formatTimeStamp(long timestamp) {
return ISO_DATE_TIME.format( return ISO_DATE_TIME.format(
LocalDateTime.ofEpochSecond(TimeUnit.MILLISECONDS.toSeconds(timestamp), 0, ZoneOffset.UTC)); LocalDateTime.ofEpochSecond(MILLISECONDS.toSeconds(timestamp), 0, UTC));
} }
private boolean isExecutionStopped(ListingMode listingMode) { private boolean isExecutionStopped(ListingMode listingMode) {
@ -295,9 +300,9 @@ public class ListSmb extends AbstractListProcessor<SmbListableEntity> {
private Predicate<SmbListableEntity> createFileFilter(ProcessContext context, Long minTimestampOrNull) { private Predicate<SmbListableEntity> createFileFilter(ProcessContext context, Long minTimestampOrNull) {
final Long minimumAge = context.getProperty(MINIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS); final Long minimumAge = context.getProperty(MINIMUM_AGE).asTimePeriod(MILLISECONDS);
final Long maximumAgeOrNull = context.getProperty(MAXIMUM_AGE).isSet() ? context.getProperty(MAXIMUM_AGE) final Long maximumAgeOrNull = context.getProperty(MAXIMUM_AGE).isSet() ? context.getProperty(MAXIMUM_AGE)
.asTimePeriod(TimeUnit.MILLISECONDS) : null; .asTimePeriod(MILLISECONDS) : null;
final Double minimumSizeOrNull = final Double minimumSizeOrNull =
context.getProperty(MINIMUM_SIZE).isSet() ? context.getProperty(MINIMUM_SIZE).asDataSize(DataUnit.B) context.getProperty(MINIMUM_SIZE).isSet() ? context.getProperty(MINIMUM_SIZE).asDataSize(DataUnit.B)
: null; : null;
@ -307,14 +312,14 @@ public class ListSmb extends AbstractListProcessor<SmbListableEntity> {
final String suffixOrNull = context.getProperty(FILE_NAME_SUFFIX_FILTER).getValue(); final String suffixOrNull = context.getProperty(FILE_NAME_SUFFIX_FILTER).getValue();
final long now = getCurrentTime(); final long now = getCurrentTime();
Predicate<SmbListableEntity> filter = entity -> now - entity.getTimestamp() >= minimumAge; Predicate<SmbListableEntity> filter = entity -> now - entity.getLastModifiedTime() >= minimumAge;
if (maximumAgeOrNull != null) { if (maximumAgeOrNull != null) {
filter = filter.and(entity -> now - entity.getTimestamp() <= maximumAgeOrNull); filter = filter.and(entity -> now - entity.getLastModifiedTime() <= maximumAgeOrNull);
} }
if (minTimestampOrNull != null) { if (minTimestampOrNull != null) {
filter = filter.and(entity -> entity.getTimestamp() >= minTimestampOrNull); filter = filter.and(entity -> entity.getLastModifiedTime() >= minTimestampOrNull);
} }
if (minimumSizeOrNull != null) { if (minimumSizeOrNull != null) {
@ -341,7 +346,7 @@ public class ListSmb extends AbstractListProcessor<SmbListableEntity> {
try { try {
clientService.close(); clientService.close();
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Could not close samba client", e); throw new RuntimeException("Could not close SMB client", e);
} }
}); });
} }
@ -349,7 +354,7 @@ public class ListSmb extends AbstractListProcessor<SmbListableEntity> {
private String getDirectory(ProcessContext context) { private String getDirectory(ProcessContext context) {
final PropertyValue property = context.getProperty(DIRECTORY); final PropertyValue property = context.getProperty(DIRECTORY);
final String directory = property.isSet() ? property.getValue().replace('\\', '/') : ""; final String directory = property.isSet() ? property.getValue().replace('\\', '/') : "";
return directory.equals("/") ? "" : directory; return "/".equals(directory) ? "" : directory;
} }
private static class MustNotContainDirectorySeparatorsValidator implements Validator { private static class MustNotContainDirectorySeparatorsValidator implements Validator {
@ -359,7 +364,7 @@ public class ListSmb extends AbstractListProcessor<SmbListableEntity> {
return new ValidationResult.Builder() return new ValidationResult.Builder()
.subject(subject) .subject(subject)
.input(value) .input(value)
.valid(!value.contains("/")) .valid(!value.contains("/") && !value.contains("\\"))
.explanation(subject + " must not contain any folder separator character.") .explanation(subject + " must not contain any folder separator character.")
.build(); .build();
} }

View File

@ -68,7 +68,7 @@ import java.util.EnumSet;
@CapabilityDescription("Writes the contents of a FlowFile to a samba network location. " + @CapabilityDescription("Writes the contents of a FlowFile to a samba network location. " +
"Use this processor instead of a cifs mounts if share access control is important." + "Use this processor instead of a cifs mounts if share access control is important." +
"Configure the Hostname, Share and Directory accordingly: \\\\[Hostname]\\[Share]\\[path\\to\\Directory]") "Configure the Hostname, Share and Directory accordingly: \\\\[Hostname]\\[Share]\\[path\\to\\Directory]")
@SeeAlso({GetSmbFile.class}) @SeeAlso({GetSmbFile.class, ListSmb.class, FetchSmb.class})
@ReadsAttributes({@ReadsAttribute(attribute="filename", description="The filename to use when writing the FlowFile to the network folder.")}) @ReadsAttributes({@ReadsAttribute(attribute="filename", description="The filename to use when writing the FlowFile to the network folder.")})
public class PutSmbFile extends AbstractProcessor { public class PutSmbFile extends AbstractProcessor {
public static final String SHARE_ACCESS_NONE = "none"; public static final String SHARE_ACCESS_NONE = "none";

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
org.apache.nifi.processors.smb.FetchSmb
org.apache.nifi.processors.smb.GetSmbFile org.apache.nifi.processors.smb.GetSmbFile
org.apache.nifi.processors.smb.ListSmb org.apache.nifi.processors.smb.ListSmb
org.apache.nifi.processors.smb.PutSmbFile org.apache.nifi.processors.smb.PutSmbFile

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.smb;
import static org.apache.nifi.processors.smb.FetchSmb.REL_FAILURE;
import static org.apache.nifi.processors.smb.FetchSmb.REL_SUCCESS;
import static org.apache.nifi.processors.smb.FetchSmb.REMOTE_FILE;
import static org.apache.nifi.util.TestRunners.newTestRunner;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.services.smb.SmbjClientProviderService;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.Test;
public class FetchSmbIT extends SambaTestContainers {
@Test
public void fetchFilesUsingEL() throws Exception {
writeFile("/test_file", "test_content");
TestRunner testRunner = newTestRunner(FetchSmb.class);
testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}");
final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true);
Map<String, String> attributes = new HashMap<>();
attributes.put("attribute_to_find_using_EL", "test_file");
testRunner.enqueue("ignored", attributes);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
assertEquals("test_content", testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent());
testRunner.assertValid();
testRunner.disableControllerService(smbjClientProviderService);
}
@Test
public void tryToFetchNonExistingFileEmitsFailure() throws Exception {
TestRunner testRunner = newTestRunner(FetchSmb.class);
testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}");
final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true);
Map<String, String> attributes = new HashMap<>();
attributes.put("attribute_to_find_using_EL", "non_existing_file");
testRunner.enqueue("ignored", attributes);
testRunner.run();
testRunner.assertTransferCount(REL_FAILURE, 1);
testRunner.assertValid();
testRunner.disableControllerService(smbjClientProviderService);
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.smb;
import static org.apache.nifi.processors.smb.FetchSmb.ERROR_CODE_ATTRIBUTE;
import static org.apache.nifi.processors.smb.FetchSmb.ERROR_MESSAGE_ATTRIBUTE;
import static org.apache.nifi.processors.smb.FetchSmb.REL_FAILURE;
import static org.apache.nifi.processors.smb.FetchSmb.REL_SUCCESS;
import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
import static org.apache.nifi.util.TestRunners.newTestRunner;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.services.smb.SmbClientProviderService;
import org.apache.nifi.services.smb.SmbClientService;
import org.apache.nifi.services.smb.SmbException;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
class FetchSmbTest {
public static final String CLIENT_SERVICE_PROVIDER_ID = "client-provider-service-id";
@Mock
SmbClientService mockNifiSmbClientService;
@Mock
SmbClientProviderService clientProviderService;
@BeforeEach
public void beforeEach() throws Exception {
MockitoAnnotations.initMocks(this);
when(clientProviderService.getClient()).thenReturn(mockNifiSmbClientService);
when(clientProviderService.getIdentifier()).thenReturn(CLIENT_SERVICE_PROVIDER_ID);
when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://localhost:445/share"));
}
@Test
public void shouldUseSmbClientProperly() throws Exception {
final TestRunner testRunner = createRunner();
mockNifiSmbClientService();
Map<String, String> attributes = new HashMap<>();
attributes.put("path", "testDirectory");
attributes.put("filename", "cannotReadThis");
testRunner.enqueue("ignore", attributes);
attributes = new HashMap<>();
attributes.put("path", "testDirectory");
attributes.put("filename", "canReadThis");
testRunner.enqueue("ignore", attributes);
testRunner.run();
testRunner.assertTransferCount(REL_FAILURE, 1);
assertEquals("test exception",
testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0).getAttribute(ERROR_MESSAGE_ATTRIBUTE));
assertEquals("1",
testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0).getAttribute(ERROR_CODE_ATTRIBUTE));
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
assertEquals("content",
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent());
testRunner.assertValid();
}
@Test
public void noSuchAttributeReferencedInELShouldResultInFailure() throws Exception {
final TestRunner testRunner = createRunner();
mockNifiSmbClientService();
Map<String, String> attributes = new HashMap<>();
attributes.put("different_field_name_than_what_EL_expect", "testDirectory/cannotFindThis");
testRunner.enqueue("ignore", attributes);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 0);
testRunner.assertTransferCount(REL_FAILURE, 1);
testRunner.assertValid();
}
private void mockNifiSmbClientService() throws IOException {
doThrow(new SmbException("test exception", 1L, new RuntimeException())).when(mockNifiSmbClientService)
.readFile(anyString(), any(OutputStream.class));
doAnswer(invocation -> {
final OutputStream o = invocation.getArgument(1);
final ByteArrayInputStream bytes = new ByteArrayInputStream("content".getBytes());
IOUtils.copy(bytes, o);
return true;
}).when(mockNifiSmbClientService)
.readFile(eq("testDirectory/canReadThis"), any(OutputStream.class));
}
private TestRunner createRunner() throws Exception {
final TestRunner testRunner = newTestRunner(FetchSmb.class);
testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, CLIENT_SERVICE_PROVIDER_ID);
testRunner.addControllerService(CLIENT_SERVICE_PROVIDER_ID, clientProviderService);
testRunner.enableControllerService(clientProviderService);
return testRunner;
}
}

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.smb; package org.apache.nifi.processors.smb;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static java.util.Arrays.fill;
import static java.util.stream.Collectors.toSet; import static java.util.stream.Collectors.toSet;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_STRATEGY; import static org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_STRATEGY;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.RECORD_WRITER; import static org.apache.nifi.processor.util.list.AbstractListProcessor.RECORD_WRITER;
@ -26,13 +25,9 @@ import static org.apache.nifi.processors.smb.ListSmb.DIRECTORY;
import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER; import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER;
import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE; import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE;
import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_SIZE; import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_SIZE;
import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN;
import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME; import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT; import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE; import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
import static org.apache.nifi.util.TestRunners.newTestRunner; import static org.apache.nifi.util.TestRunners.newTestRunner;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -40,47 +35,16 @@ import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.services.smb.SmbClientProviderService; import org.apache.nifi.services.smb.SmbClientProviderService;
import org.apache.nifi.services.smb.SmbListableEntity;
import org.apache.nifi.services.smb.SmbjClientProviderService; import org.apache.nifi.services.smb.SmbjClientProviderService;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;
public class ListSmbIT { public class ListSmbIT extends SambaTestContainers {
private final static Integer DEFAULT_SAMBA_PORT = 445;
private final static Logger logger = LoggerFactory.getLogger(ListSmbTest.class);
private final GenericContainer<?> sambaContainer = new GenericContainer<>(DockerImageName.parse("dperson/samba"))
.withExposedPorts(DEFAULT_SAMBA_PORT, 139)
.waitingFor(Wait.forListeningPort())
.withLogConsumer(new Slf4jLogConsumer(logger))
.withCommand("-w domain -u username;password -s share;/folder;;no;no;username;;; -p");
@BeforeEach
public void beforeEach() {
sambaContainer.start();
}
@AfterEach
public void afterEach() {
sambaContainer.stop();
}
@ParameterizedTest @ParameterizedTest
@ValueSource(ints = {4, 50, 45000}) @ValueSource(ints = {4, 50, 45000})
@ -89,8 +53,7 @@ public class ListSmbIT {
final TestRunner testRunner = newTestRunner(ListSmb.class); final TestRunner testRunner = newTestRunner(ListSmb.class);
testRunner.setProperty(LISTING_STRATEGY, "none"); testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(MINIMUM_AGE, "0 ms"); testRunner.setProperty(MINIMUM_AGE, "0 ms");
SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner); SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true);
testRunner.enableControllerService(smbjClientProviderService);
testRunner.run(); testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1); testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.getFlowFilesForRelationship(REL_SUCCESS) testRunner.getFlowFilesForRelationship(REL_SUCCESS)
@ -105,8 +68,7 @@ public class ListSmbIT {
testRunner.setProperty(LISTING_STRATEGY, "none"); testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(MINIMUM_AGE, "0 ms"); testRunner.setProperty(MINIMUM_AGE, "0 ms");
testRunner.setProperty(DIRECTORY, "folderDoesNotExists"); testRunner.setProperty(DIRECTORY, "folderDoesNotExists");
SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner); SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true);
testRunner.enableControllerService(smbjClientProviderService);
testRunner.run(); testRunner.run();
assertEquals(1, testRunner.getLogger().getErrorMessages().size()); assertEquals(1, testRunner.getLogger().getErrorMessages().size());
testRunner.assertValid(); testRunner.assertValid();
@ -116,7 +78,7 @@ public class ListSmbIT {
@Test @Test
public void shouldShowBulletinWhenShareIsInvalid() throws Exception { public void shouldShowBulletinWhenShareIsInvalid() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class); final TestRunner testRunner = newTestRunner(ListSmb.class);
SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner); SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, false);
testRunner.setProperty(smbjClientProviderService, SHARE, "invalid_share"); testRunner.setProperty(smbjClientProviderService, SHARE, "invalid_share");
testRunner.enableControllerService(smbjClientProviderService); testRunner.enableControllerService(smbjClientProviderService);
testRunner.run(); testRunner.run();
@ -128,8 +90,7 @@ public class ListSmbIT {
@Test @Test
public void shouldShowBulletinWhenSMBPortIsInvalid() throws Exception { public void shouldShowBulletinWhenSMBPortIsInvalid() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class); final TestRunner testRunner = newTestRunner(ListSmb.class);
final SmbClientProviderService smbClientProviderService = final SmbClientProviderService smbClientProviderService = configureSmbClient(testRunner, false);
configureTestRunnerForSambaDockerContainer(testRunner);
testRunner.setProperty(smbClientProviderService, PORT, "1"); testRunner.setProperty(smbClientProviderService, PORT, "1");
testRunner.enableControllerService(smbClientProviderService); testRunner.enableControllerService(smbClientProviderService);
testRunner.run(); testRunner.run();
@ -141,12 +102,12 @@ public class ListSmbIT {
@Test @Test
public void shouldShowBulletinWhenSMBHostIsInvalid() throws Exception { public void shouldShowBulletinWhenSMBHostIsInvalid() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class); final TestRunner testRunner = newTestRunner(ListSmb.class);
final SmbClientProviderService smbClientProviderService = final SmbClientProviderService smbClientProviderService = configureSmbClient(testRunner, false);
configureTestRunnerForSambaDockerContainer(testRunner);
testRunner.setProperty(smbClientProviderService, HOSTNAME, "this.host.should.not.exists"); testRunner.setProperty(smbClientProviderService, HOSTNAME, "this.host.should.not.exists");
testRunner.enableControllerService(smbClientProviderService); testRunner.enableControllerService(smbClientProviderService);
testRunner.run(); testRunner.run();
assertEquals(1, testRunner.getLogger().getErrorMessages().size()); assertEquals(1, testRunner.getLogger().getErrorMessages().size());
testRunner.assertValid();
testRunner.disableControllerService(smbClientProviderService); testRunner.disableControllerService(smbClientProviderService);
} }
@ -163,22 +124,14 @@ public class ListSmbIT {
final TestRunner testRunner = newTestRunner(ListSmb.class); final TestRunner testRunner = newTestRunner(ListSmb.class);
final MockRecordWriter writer = new MockRecordWriter(null, false); final MockRecordWriter writer = new MockRecordWriter(null, false);
final SimpleRecordSchema simpleRecordSchema = SmbListableEntity.getRecordSchema();
testRunner.addControllerService("writer", writer); testRunner.addControllerService("writer", writer);
testRunner.enableControllerService(writer); testRunner.enableControllerService(writer);
testRunner.setProperty(LISTING_STRATEGY, "none"); testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(RECORD_WRITER, "writer"); testRunner.setProperty(RECORD_WRITER, "writer");
testRunner.setProperty(MINIMUM_AGE, "0 ms"); testRunner.setProperty(MINIMUM_AGE, "0 ms");
SmbjClientProviderService smbjClientProviderService = configureTestRunnerForSambaDockerContainer(testRunner); final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true);
testRunner.enableControllerService(smbjClientProviderService);
testRunner.run(); testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1); testRunner.assertTransferCount(REL_SUCCESS, 1);
final String result = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent();
final int identifierColumnIndex = simpleRecordSchema.getFieldNames().indexOf("identifier");
final Set<String> actual = Arrays.stream(result.split("\n"))
.map(row -> row.split(",")[identifierColumnIndex])
.collect(toSet());
assertEquals(testFiles, actual);
testRunner.assertValid(); testRunner.assertValid();
testRunner.disableControllerService(smbjClientProviderService); testRunner.disableControllerService(smbjClientProviderService);
} }
@ -190,8 +143,7 @@ public class ListSmbIT {
)); ));
testFiles.forEach(file -> writeFile(file, generateContentWithSize(4))); testFiles.forEach(file -> writeFile(file, generateContentWithSize(4)));
final TestRunner testRunner = newTestRunner(ListSmb.class); final TestRunner testRunner = newTestRunner(ListSmb.class);
final SmbjClientProviderService smbjClientProviderService = final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, false);
configureTestRunnerForSambaDockerContainer(testRunner);
testRunner.setProperty(LISTING_STRATEGY, "none"); testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(MINIMUM_AGE, "0 sec"); testRunner.setProperty(MINIMUM_AGE, "0 sec");
testRunner.enableControllerService(smbjClientProviderService); testRunner.enableControllerService(smbjClientProviderService);
@ -202,16 +154,6 @@ public class ListSmbIT {
.map(MockFlowFile::getAttributes) .map(MockFlowFile::getAttributes)
.collect(toSet()); .collect(toSet());
final Set<String> identifiers = allAttributes.stream()
.map(attributes -> attributes.get("identifier"))
.collect(toSet());
assertEquals(testFiles, identifiers);
allAttributes.forEach(attribute -> assertEquals(
Stream.of(attribute.get("path"), attribute.get("filename")).filter(s -> !s.isEmpty()).collect(
Collectors.joining("/")),
attribute.get("absolute.path")));
final Set<String> fileNames = allAttributes.stream() final Set<String> fileNames = allAttributes.stream()
.map(attributes -> attributes.get("filename")) .map(attributes -> attributes.get("filename"))
.collect(toSet()); .collect(toSet());
@ -225,9 +167,7 @@ public class ListSmbIT {
@Test @Test
public void shouldFilterFilesBySizeCriteria() throws Exception { public void shouldFilterFilesBySizeCriteria() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class); final TestRunner testRunner = newTestRunner(ListSmb.class);
final SmbClientProviderService smbClientProviderService = final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true);
configureTestRunnerForSambaDockerContainer(testRunner);
testRunner.enableControllerService(smbClientProviderService);
testRunner.setProperty(MINIMUM_AGE, "0 ms"); testRunner.setProperty(MINIMUM_AGE, "0 ms");
testRunner.setProperty(LISTING_STRATEGY, "none"); testRunner.setProperty(LISTING_STRATEGY, "none");
@ -247,17 +187,15 @@ public class ListSmbIT {
testRunner.setProperty(MINIMUM_SIZE, "50 B"); testRunner.setProperty(MINIMUM_SIZE, "50 B");
testRunner.run(); testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1); testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertValid();
testRunner.disableControllerService(smbClientProviderService); testRunner.disableControllerService(smbjClientProviderService);
} }
@Test @Test
public void shouldFilterByGivenSuffix() throws Exception { public void shouldFilterByGivenSuffix() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class); final TestRunner testRunner = newTestRunner(ListSmb.class);
final SmbClientProviderService smbClientProviderService = final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true);
configureTestRunnerForSambaDockerContainer(testRunner);
testRunner.enableControllerService(smbClientProviderService);
testRunner.setProperty(MINIMUM_AGE, "0 ms"); testRunner.setProperty(MINIMUM_AGE, "0 ms");
testRunner.setProperty(FILE_NAME_SUFFIX_FILTER, ".suffix"); testRunner.setProperty(FILE_NAME_SUFFIX_FILTER, ".suffix");
testRunner.setProperty(LISTING_STRATEGY, "none"); testRunner.setProperty(LISTING_STRATEGY, "none");
@ -265,33 +203,8 @@ public class ListSmbIT {
writeFile("should_skip_this.suffix", generateContentWithSize(1)); writeFile("should_skip_this.suffix", generateContentWithSize(1));
testRunner.run(); testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1); testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.disableControllerService(smbClientProviderService); testRunner.assertValid();
} testRunner.disableControllerService(smbjClientProviderService);
private SmbjClientProviderService configureTestRunnerForSambaDockerContainer(TestRunner testRunner)
throws Exception {
SmbjClientProviderService smbjClientProviderService = new SmbjClientProviderService();
testRunner.addControllerService("connection-pool", smbjClientProviderService);
testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "connection-pool");
testRunner.setProperty(smbjClientProviderService, HOSTNAME, sambaContainer.getHost());
testRunner.setProperty(smbjClientProviderService, PORT,
String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
testRunner.setProperty(smbjClientProviderService, USERNAME, "username");
testRunner.setProperty(smbjClientProviderService, PASSWORD, "password");
testRunner.setProperty(smbjClientProviderService, SHARE, "share");
testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain");
return smbjClientProviderService;
}
private String generateContentWithSize(int sizeInBytes) {
byte[] bytes = new byte[sizeInBytes];
fill(bytes, (byte) 1);
return new String(bytes);
}
private void writeFile(String path, String content) {
String containerPath = "/folder/" + path;
sambaContainer.copyFileToContainer(Transferable.of(content), containerPath);
} }
} }

View File

@ -115,7 +115,6 @@ class ListSmbTest {
testRunner.run(); testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1); testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertValid(); testRunner.assertValid();
} }
@ -236,6 +235,7 @@ class ListSmbTest {
when(mockNifiSmbClientService.listRemoteFiles(anyString())).thenThrow(new RuntimeException("test exception")); when(mockNifiSmbClientService.listRemoteFiles(anyString())).thenThrow(new RuntimeException("test exception"));
testRunner.run(); testRunner.run();
assertEquals(1, testRunner.getLogger().getErrorMessages().size()); assertEquals(1, testRunner.getLogger().getErrorMessages().size());
testRunner.assertValid();
} }
@Test @Test
@ -288,7 +288,7 @@ class ListSmbTest {
private SmbListableEntity listableEntity(String name, long timeStamp) { private SmbListableEntity listableEntity(String name, long timeStamp) {
return SmbListableEntity.builder() return SmbListableEntity.builder()
.setName(name) .setName(name)
.setTimestamp(timeStamp) .setLastModifiedTime(timeStamp)
.build(); .build();
} }

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.smb;
import static java.util.Arrays.fill;
import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN;
import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
import org.apache.nifi.services.smb.SmbjClientProviderService;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;
public class SambaTestContainers {
protected final static Integer DEFAULT_SAMBA_PORT = 445;
protected final static Logger logger = LoggerFactory.getLogger(SambaTestContainers.class);
protected final GenericContainer<?> sambaContainer = new GenericContainer<>(DockerImageName.parse("dperson/samba"))
.withExposedPorts(DEFAULT_SAMBA_PORT, 139)
.waitingFor(Wait.forListeningPort())
.withLogConsumer(new Slf4jLogConsumer(logger))
.withCommand("-w domain -u username;password -s share;/folder;;no;no;username;;; -p");
@BeforeEach
public void beforeEach() {
sambaContainer.start();
}
@AfterEach
public void afterEach() {
sambaContainer.stop();
}
protected SmbjClientProviderService configureSmbClient(TestRunner testRunner, boolean shouldEnableSmbClient)
throws Exception {
final SmbjClientProviderService smbjClientProviderService = new SmbjClientProviderService();
testRunner.addControllerService("client-provider", smbjClientProviderService);
testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "client-provider");
testRunner.setProperty(smbjClientProviderService, HOSTNAME, sambaContainer.getHost());
testRunner.setProperty(smbjClientProviderService, PORT,
String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
testRunner.setProperty(smbjClientProviderService, USERNAME, "username");
testRunner.setProperty(smbjClientProviderService, PASSWORD, "password");
testRunner.setProperty(smbjClientProviderService, SHARE, "share");
testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain");
if (shouldEnableSmbClient) {
testRunner.enableControllerService(smbjClientProviderService);
}
return smbjClientProviderService;
}
protected String generateContentWithSize(int sizeInBytes) {
byte[] bytes = new byte[sizeInBytes];
fill(bytes, (byte) 1);
return new String(bytes);
}
protected void writeFile(String path, String content) {
String containerPath = "/folder/" + path;
sambaContainer.copyFileToContainer(Transferable.of(content), containerPath);
}
}

View File

@ -116,7 +116,7 @@ public class SmbjClientProviderService extends AbstractControllerService impleme
@Override @Override
public SmbClientService getClient() throws IOException { public SmbClientService getClient() throws IOException {
final SmbjClientService client = new SmbjClientService(smbClient, authenticationContext); final SmbjClientService client = new SmbjClientService(smbClient, authenticationContext, getServiceLocation());
try { try {
client.connectToShare(hostname, port, shareName); client.connectToShare(hostname, port, shareName);

View File

@ -33,8 +33,11 @@ import com.hierynomus.smbj.connection.Connection;
import com.hierynomus.smbj.session.Session; import com.hierynomus.smbj.session.Session;
import com.hierynomus.smbj.share.Directory; import com.hierynomus.smbj.share.Directory;
import com.hierynomus.smbj.share.DiskShare; import com.hierynomus.smbj.share.DiskShare;
import com.hierynomus.smbj.share.File;
import com.hierynomus.smbj.share.Share; import com.hierynomus.smbj.share.Share;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -42,17 +45,20 @@ import java.util.stream.Stream;
public class SmbjClientService implements SmbClientService { public class SmbjClientService implements SmbClientService {
private static final List<String> SPECIAL_DIRECTORIES = asList(".", ".."); private static final List<String> SPECIAL_DIRECTORIES = asList(".", "..");
private static final long UNCATEGORISED_ERROR = -1L;
final private AuthenticationContext authenticationContext; final private AuthenticationContext authenticationContext;
final private SMBClient smbClient; final private SMBClient smbClient;
final private URI serviceLocation;
private Connection connection; private Connection connection;
private Session session; private Session session;
private DiskShare share; private DiskShare share;
public SmbjClientService(SMBClient smbClient, AuthenticationContext authenticationContext) { public SmbjClientService(SMBClient smbClient, AuthenticationContext authenticationContext, URI serviceLocation) {
this.smbClient = smbClient; this.smbClient = smbClient;
this.authenticationContext = authenticationContext; this.authenticationContext = authenticationContext;
this.serviceLocation = serviceLocation;
} }
public void connectToShare(String hostname, int port, String shareName) throws IOException { public void connectToShare(String hostname, int port, String shareName) throws IOException {
@ -104,7 +110,7 @@ public class SmbjClientService implements SmbClientService {
return Stream.of(filePath).flatMap(path -> { return Stream.of(filePath).flatMap(path -> {
final Directory directory = openDirectory(path); final Directory directory = openDirectory(path);
return stream(directory::spliterator, 0, false) return stream(directory::spliterator, 0, false)
.map(entity -> buildSmbListableEntity(entity, path)) .map(entity -> buildSmbListableEntity(entity, path, serviceLocation))
.filter(entity -> !specialDirectory(entity)) .filter(entity -> !specialDirectory(entity))
.flatMap(listable -> listable.isDirectory() ? listRemoteFiles(listable.getPathWithName()) .flatMap(listable -> listable.isDirectory() ? listRemoteFiles(listable.getPathWithName())
: Stream.of(listable)) : Stream.of(listable))
@ -123,18 +129,39 @@ public class SmbjClientService implements SmbClientService {
} }
} }
private SmbListableEntity buildSmbListableEntity(FileIdBothDirectoryInformation info, String path) { @Override
public void readFile(String fileName, OutputStream outputStream) throws IOException {
try (File f = share.openFile(
fileName,
EnumSet.of(AccessMask.GENERIC_READ),
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
SMB2CreateDisposition.FILE_OPEN,
EnumSet.of(SMB2CreateOptions.FILE_SEQUENTIAL_ONLY))
) {
f.read(outputStream);
} catch (SMBApiException a) {
throw new SmbException(a.getMessage(), a.getStatusCode(), a);
} catch (Exception e) {
throw new SmbException(e.getMessage(), UNCATEGORISED_ERROR, e);
} finally {
outputStream.close();
}
}
private SmbListableEntity buildSmbListableEntity(FileIdBothDirectoryInformation info, String path, URI serviceLocation) {
return SmbListableEntity.builder() return SmbListableEntity.builder()
.setName(info.getFileName()) .setName(info.getFileName())
.setShortName(info.getShortName()) .setShortName(info.getShortName())
.setPath(path) .setPath(path)
.setTimestamp(info.getLastWriteTime().toEpochMillis()) .setLastModifiedTime(info.getLastWriteTime().toEpochMillis())
.setCreationTime(info.getCreationTime().toEpochMillis()) .setCreationTime(info.getCreationTime().toEpochMillis())
.setChangeTime(info.getChangeTime().toEpochMillis()) .setChangeTime(info.getChangeTime().toEpochMillis())
.setLastAccessTime(info.getLastAccessTime().toEpochMillis()) .setLastAccessTime(info.getLastAccessTime().toEpochMillis())
.setDirectory((info.getFileAttributes() & FileAttributes.FILE_ATTRIBUTE_DIRECTORY.getValue()) != 0) .setDirectory((info.getFileAttributes() & FileAttributes.FILE_ATTRIBUTE_DIRECTORY.getValue()) != 0)
.setSize(info.getEndOfFile()) .setSize(info.getEndOfFile())
.setAllocationSize(info.getAllocationSize()) .setAllocationSize(info.getAllocationSize())
.setServiceLocation(serviceLocation)
.build(); .build();
} }