NIFI-673: Initial implementation of ListSFTP, FetchSFTP

This commit is contained in:
Mark Payne 2015-10-04 15:48:28 -04:00
parent 8a80060851
commit d1d57931bf
15 changed files with 1988 additions and 300 deletions

View File

@ -0,0 +1,505 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.EntityListing;
import org.apache.nifi.processors.standard.util.ListableEntity;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
/**
* <p>
* An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote resources.
* Those remote resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that
* we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor.
* </p>
*
* <p>
* This class is responsible for triggering the listing to occur, filtering the results returned such that only new (unlisted) entities
* or entities that have been modified will be emitted from the Processor.
* </p>
*
* <p>
* In order to make use of this abstract class, the entities listed must meet the following criteria:
* <ul>
* <li>
* Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
* returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
* </li>
* <li>
* Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is
* new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
* than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's
* identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been
* seen already.
* </li>
* <li>
* Entity must have a user-readable name that can be used for logging purposes.
* </li>
* </p>
*
* <p>
* This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the remote system. This is performed using
* two different mechanisms. First, state is stored locally. This allows the system to be restarted and begin processing where it left off. The state that is
* stored is the latest timestamp that has been pulled (as determined by the timestamps of the entities that are returned), as well as the unique identifier of
* each entity that has that timestamp. See the section above for information about how these pieces of information are used in order to determine entity uniqueness.
* </p>
*
* <p>
* In addition to storing state locally, the Processor exposes an optional <code>Distributed Cache Service</code> property. In standalone deployment of NiFi, this is
* not necessary. However, in a clustered environment, subclasses of this class are expected to be run only on primary node. While this means that the local state is
* accurate as long as the primary node remains constant, the primary node in the cluster can be changed. As a result, if running in a clustered environment, it is
* recommended that this property be set. This allows the same state that is described above to also be replicated across the cluster. If this property is set, then
* on restart the Processor will not begin listing until it has retrieved an updated state from this service, as it does not know whether or not another node has
* modified the state in the mean time.
* </p>
*
* <p>
* For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set
* of attributes (defined by the concrete implementation) that can be used to fetch those remote resources or interact with them in whatever way makes sense for
* the configured dataflow.
* </p>
*
* <p>
* Subclasses are responsible for the following:
*
* <ul>
* <li>
* Perform a listing of remote resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all
* entities on the remote system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those
* entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability
* to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation.
* </li>
* <li>
* Creating a Map of attributes that are applicable for an entity. The attributes that are assigned to each FlowFile are exactly those returned by the
* {@link #createAttributes(ListableEntity, ProcessContext)}.
* </li>
* <li>
* Returning the configured path. Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only
* within that path. The {@link #getPath(ProcessContext)} method is responsible for returning the path that is currently being polled for entities. If this does concept
* does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method.
* </li>
* <li>
* Determining when the listing must be cleared. It is sometimes necessary to clear state about which entities have already been ingested, as the result of a user
* changing a property value. The {@link #isListingResetNecessary(PropertyDescriptor)} method is responsible for determining when the listing needs to be reset by returning
* a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared.
* </li>
* </ul>
* </p>
*/
@TriggerSerially
public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node "
+ "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. "
+ "This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.")
.required(false)
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are received are routed to success")
.build();
private volatile Long lastListingTime = null;
private volatile Set<String> latestIdentifiersListed = new HashSet<>();
private volatile boolean electedPrimaryNode = false;
protected File getPersistenceFile() {
return new File("conf/state/" + getIdentifier());
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DISTRIBUTED_CACHE_SERVICE);
return properties;
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (isListingResetNecessary(descriptor)) {
lastListingTime = null; // clear lastListingTime so that we have to fetch new time
latestIdentifiersListed = new HashSet<>();
}
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
return relationships;
}
protected String getKey(final String directory) {
return getIdentifier() + ".lastListingTime." + directory;
}
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
electedPrimaryNode = true;
}
}
private EntityListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException {
final ObjectMapper mapper = new ObjectMapper();
final JsonNode jsonNode = mapper.readTree(serializedState);
return mapper.readValue(jsonNode, EntityListing.class);
}
private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException {
// Determine the timestamp for the last file that we've listed.
Long minTimestamp = lastListingTime;
if (minTimestamp == null || electedPrimaryNode) {
// We haven't yet restored any state from local or distributed state - or it's been at least a minute since
// we have performed a listing. In this case,
// First, attempt to get timestamp from distributed cache service.
if (client != null) {
try {
final StringSerDe serde = new StringSerDe();
final String serializedState = client.get(getKey(directory), serde, serde);
if (serializedState == null || serializedState.isEmpty()) {
minTimestamp = null;
this.latestIdentifiersListed = Collections.emptySet();
} else {
final EntityListing listing = deserialize(serializedState);
this.lastListingTime = listing.getLatestTimestamp().getTime();
minTimestamp = listing.getLatestTimestamp().getTime();
this.latestIdentifiersListed = new HashSet<>(listing.getMatchingIdentifiers());
}
this.lastListingTime = minTimestamp;
electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore.
} catch (final IOException ioe) {
throw ioe;
}
}
// Check the persistence file. We want to use the latest timestamp that we have so that
// we don't duplicate data.
try {
final File persistenceFile = getPersistenceFile();
if (persistenceFile.exists()) {
try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
final Properties props = new Properties();
props.load(fis);
// get the local timestamp for this directory, if it exists.
final String locallyPersistedValue = props.getProperty(directory);
if (locallyPersistedValue != null) {
final EntityListing listing = deserialize(locallyPersistedValue);
final long localTimestamp = listing.getLatestTimestamp().getTime();
// If distributed state doesn't have an entry or the local entry is later than the distributed state,
// update the distributed state so that we are in sync.
if (client != null && (minTimestamp == null || localTimestamp > minTimestamp)) {
minTimestamp = localTimestamp;
// Our local persistence file shows a later time than the Distributed service.
// Update the distributed service to match our local state.
try {
final StringSerDe serde = new StringSerDe();
client.put(getKey(directory), locallyPersistedValue, serde, serde);
} catch (final IOException ioe) {
getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed "
+ "state due to {}. If a new node performs Listing, data duplication may occur",
new Object[] {directory, locallyPersistedValue, ioe});
}
}
}
}
}
} catch (final IOException ioe) {
getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe);
}
}
return minTimestamp;
}
private String serializeState(final List<T> entities) throws JsonGenerationException, JsonMappingException, IOException {
// we need to keep track of all files that we pulled in that had a modification time equal to
// lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
// that have a mod time equal to that timestamp because more files may come in with the same timestamp
// later in the same millisecond.
if (entities.isEmpty()) {
return null;
} else {
final List<T> sortedEntities = new ArrayList<>(entities);
Collections.sort(sortedEntities, new Comparator<ListableEntity>() {
@Override
public int compare(final ListableEntity o1, final ListableEntity o2) {
return Long.compare(o1.getTimestamp(), o2.getTimestamp());
}
});
final long latestListingModTime = sortedEntities.get(sortedEntities.size() - 1).getTimestamp();
final Set<String> idsWithTimestampEqualToListingTime = new HashSet<>();
for (int i = sortedEntities.size() - 1; i >= 0; i--) {
final ListableEntity entity = sortedEntities.get(i);
if (entity.getTimestamp() == latestListingModTime) {
idsWithTimestampEqualToListingTime.add(entity.getIdentifier());
}
}
this.latestIdentifiersListed = idsWithTimestampEqualToListingTime;
final EntityListing listing = new EntityListing();
listing.setLatestTimestamp(new Date(latestListingModTime));
final Set<String> ids = new HashSet<>();
for (final String id : idsWithTimestampEqualToListingTime) {
ids.add(id);
}
listing.setMatchingIdentifiers(ids);
final ObjectMapper mapper = new ObjectMapper();
final String serializedState = mapper.writerWithType(EntityListing.class).writeValueAsString(listing);
return serializedState;
}
}
protected void persistLocalState(final String path, final String serializedState) throws IOException {
// we need to keep track of all files that we pulled in that had a modification time equal to
// lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
// that have a mod time equal to that timestamp because more files may come in with the same timestamp
// later in the same millisecond.
final File persistenceFile = getPersistenceFile();
final File dir = persistenceFile.getParentFile();
if (!dir.exists() && !dir.mkdirs()) {
throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");
}
final Properties props = new Properties();
if (persistenceFile.exists()) {
try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
props.load(fis);
}
}
props.setProperty(path, serializedState);
try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
props.store(fos, null);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final String path = getPath(context);
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
final Long minTimestamp;
try {
minTimestamp = getMinTimestamp(path, client);
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
context.yield();
return;
}
final List<T> entityList;
try {
entityList = performListing(context, minTimestamp);
} catch (final IOException e) {
getLogger().error("Failed to perform listing on remote host due to {}", e);
context.yield();
return;
}
if (entityList == null) {
context.yield();
return;
}
int listCount = 0;
Long latestListingTimestamp = null;
for (final T entity : entityList) {
final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp ||
(entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier())));
// Create the FlowFile for this path.
if (list) {
final Map<String, String> attributes = createAttributes(entity, context);
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
listCount++;
if (latestListingTimestamp == null || entity.getTimestamp() > latestListingTimestamp) {
latestListingTimestamp = entity.getTimestamp();
}
}
}
if (listCount > 0) {
getLogger().info("Successfully created listing with {} new objects", new Object[] {listCount});
session.commit();
// We have performed a listing and pushed the FlowFiles out.
// Now, we need to persist state about the Last Modified timestamp of the newest file
// that we pulled in. We do this in order to avoid pulling in the same file twice.
// However, we want to save the state both locally and remotely.
// We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
// previously Primary Node left off.
// We also store the state locally so that if the node is restarted, and the node cannot contact
// the distributed state cache, the node can continue to run (if it is primary node).
String serializedState = null;
try {
serializedState = serializeState(entityList);
} catch (final Exception e) {
getLogger().error("Failed to serialize state due to {}", new Object[] {e});
}
if (serializedState != null) {
// Save our state locally.
try {
persistLocalState(path, serializedState);
} catch (final IOException ioe) {
getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe);
}
// Attempt to save state to remote server.
if (client != null) {
try {
client.put(getKey(path), serializedState, new StringSerDe(), new StringSerDe());
} catch (final IOException ioe) {
getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe);
}
}
}
lastListingTime = latestListingTimestamp;
} else {
getLogger().debug("There is no data to list. Yielding.");
context.yield();
// lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
if (lastListingTime == null) {
lastListingTime = 0L;
}
return;
}
}
/**
* Creates a Map of attributes that should be applied to the FlowFile to represent this entity. This processor will emit a FlowFile for each "new" entity
* (see the documentation for this class for a discussion of how this class determines whether or not an entity is "new"). The FlowFile will contain no
* content. The attributes that will be included are exactly the attributes that are returned by this method.
*
* @param entity the entity represented by the FlowFile
* @param context the ProcessContext for obtaining configuration information
* @return a Map of attributes for this entity
*/
protected abstract Map<String, String> createAttributes(T entity, ProcessContext context);
/**
* Returns the path to perform a listing on.
* Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only
* within that path. This method is responsible for returning the path that is currently being polled for entities. If this does concept
* does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method.
*
* @param context the ProcessContex to use in order to obtain configuration
* @return the path that is to be used to perform the listing, or <code>null</code> if not applicable.
*/
protected abstract String getPath(final ProcessContext context);
/**
* Performs a listing of the remote entities that can be pulled. If any entity that is returned has already been "discovered" or "emitted"
* by this Processor, it will be ignored. A discussion of how the Processor determines those entities that have already been emitted is
* provided above in the documentation for this class. Any entity that is returned by this method with a timestamp prior to the minTimestamp
* will be filtered out by the Processor. Therefore, it is not necessary that implementations perform this filtering but can be more efficient
* if the filtering can be performed on the server side prior to retrieving the information.
*
* @param context the ProcessContex to use in order to pull the appropriate entities
* @param minTimestamp the minimum timestamp of entities that should be returned.
*
* @return a Listing of entities that have a timestamp >= minTimestamp
*/
protected abstract List<T> performListing(final ProcessContext context, final Long minTimestamp) throws IOException;
/**
* Determines whether or not the listing must be reset if the value of the given property is changed
*
* @param property the property that has changed
* @return <code>true</code> if a change in value of the given property necessitates that the listing be reset, <code>false</code> otherwise.
*/
protected abstract boolean isListingResetNecessary(final PropertyDescriptor property);
private static class StringSerDe implements Serializer<String>, Deserializer<String> {
@Override
public String deserialize(final byte[] value) throws DeserializationException, IOException {
if (value == null) {
return null;
}
return new String(value, StandardCharsets.UTF_8);
}
@Override
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
out.write(value.getBytes(StandardCharsets.UTF_8));
}
}
}

View File

@ -0,0 +1,296 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.PermissionDeniedException;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
/**
* A base class for FetchSFTP, FetchFTP processors
*/
public abstract class FetchFileTransfer extends AbstractProcessor {
static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The fully-qualified hostname or IP address of the host to fetch the data from")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.required(true)
.build();
static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder()
.name("Port")
.description("The port to connect to on the remote host to fetch the data from")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.required(true)
.build();
public static final PropertyDescriptor REMOTE_FILENAME = new PropertyDescriptor.Builder()
.name("Remote File")
.description("The fully qualified filename on the remote system")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder()
.name("Delete Original")
.description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred")
.defaultValue("true")
.allowableValues("true", "false")
.required(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are received are routed to success")
.build();
static final Relationship REL_COMMS_FAILURE = new Relationship.Builder()
.name("comms.failure")
.description("Any FlowFile that could not be fetched from the remote server due to a communications failure will be transferred to this Relationship.")
.build();
static final Relationship REL_NOT_FOUND = new Relationship.Builder()
.name("not.found")
.description("Any FlowFile for which we receive a 'Not Found' message from the remote server will be transferred to this Relationship.")
.build();
static final Relationship REL_PERMISSION_DENIED = new Relationship.Builder()
.name("permission.denied")
.description("Any FlowFile that could not be fetched from the remote server due to insufficient permissions will be transferred to this Relationship.")
.build();
private final Map<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>> fileTransferMap = new HashMap<>();
private final long IDLE_CONNECTION_MILLIS = TimeUnit.SECONDS.toMillis(10L); // amount of time to wait before closing an idle connection
private volatile long lastClearTime = System.currentTimeMillis();
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_NOT_FOUND);
relationships.add(REL_PERMISSION_DENIED);
relationships.add(REL_COMMS_FAILURE);
return relationships;
}
/**
* Close connections that are idle or optionally close all connections.
* Connections are considered "idle" if they have not been used in 10 seconds.
*
* @param closeNonIdleConnections if <code>true</code> will close all connection; if <code>false</code> will close only idle connections
*/
private void closeConnections(final boolean closeNonIdleConnections) {
for (final Map.Entry<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>> entry : fileTransferMap.entrySet()) {
final BlockingQueue<FileTransferIdleWrapper> wrapperQueue = entry.getValue();
final List<FileTransferIdleWrapper> putBack = new ArrayList<>();
FileTransferIdleWrapper wrapper;
while ((wrapper = wrapperQueue.poll()) != null) {
final long lastUsed = wrapper.getLastUsed();
final long nanosSinceLastUse = System.nanoTime() - lastUsed;
if (!closeNonIdleConnections && TimeUnit.NANOSECONDS.toMillis(nanosSinceLastUse) < IDLE_CONNECTION_MILLIS) {
putBack.add(wrapper);
} else {
try {
wrapper.getFileTransfer().close();
} catch (final IOException ioe) {
getLogger().warn("Failed to close Idle Connection due to {}", new Object[] {ioe}, ioe);
}
}
}
for (final FileTransferIdleWrapper toPutBack : putBack) {
wrapperQueue.offer(toPutBack);
}
}
}
@OnStopped
public void cleanup() {
// close all connections
closeConnections(true);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HOSTNAME);
properties.add(UNDEFAULTED_PORT);
properties.add(REMOTE_FILENAME);
properties.add(DELETE_ORIGINAL);
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final StopWatch stopWatch = new StopWatch(true);
final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
final int port = context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions(flowFile).asInteger();
final String filename = context.getProperty(REMOTE_FILENAME).evaluateAttributeExpressions(flowFile).getValue();
// Try to get a FileTransfer object from our cache.
BlockingQueue<FileTransferIdleWrapper> transferQueue;
synchronized (fileTransferMap) {
final Tuple<String, Integer> tuple = new Tuple<>(host, port);
transferQueue = fileTransferMap.get(tuple);
if (transferQueue == null) {
transferQueue = new LinkedBlockingQueue<>();
fileTransferMap.put(tuple, transferQueue);
}
// periodically close idle connections
if (System.currentTimeMillis() - lastClearTime > IDLE_CONNECTION_MILLIS) {
closeConnections(false);
lastClearTime = System.currentTimeMillis();
}
}
// we have a queue of FileTransfer Objects. Get one from the queue or create a new one.
FileTransfer transfer;
FileTransferIdleWrapper transferWrapper = transferQueue.poll();
if (transferWrapper == null) {
transfer = createFileTransfer(context);
} else {
transfer = transferWrapper.getFileTransfer();
}
// Pull data from remote system.
final InputStream in;
try {
in = transfer.getInputStream(filename, flowFile);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
StreamUtils.copy(in, out);
}
});
transfer.flush();
transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime()));
} catch (final FileNotFoundException e) {
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
new Object[] {flowFile, filename, host, REL_NOT_FOUND.getName()});
session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
return;
} catch (final PermissionDeniedException e) {
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}",
new Object[] {flowFile, filename, host, REL_PERMISSION_DENIED.getName()});
session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED);
session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED);
return;
} catch (final IOException e) {
try {
transfer.close();
} catch (final IOException e1) {
getLogger().warn("Failed to close connection to {}:{} due to {}", new Object[] {host, port, e.toString()}, e);
}
getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to failure",
new Object[] {flowFile, filename, host, port, e.toString()}, e);
session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE);
return;
}
// Add FlowFile attributes
final String protocolName = transfer.getProtocolName();
final Map<String, String> attributes = new HashMap<>();
attributes.put(protocolName + ".remote.host", host);
attributes.put(protocolName + ".remote.port", String.valueOf(port));
attributes.put(protocolName + ".remote.filename", filename);
attributes.put(CoreAttributes.FILENAME.key(), filename);
flowFile = session.putAllAttributes(flowFile, attributes);
// emit provenance event and transfer FlowFile
session.getProvenanceReporter().modifyContent(flowFile, "Content replaced with content from " + protocolName + "://" + host + ":" + port + "/" + filename,
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
// delete remote file is necessary
final boolean deleteOriginal = context.getProperty(DELETE_ORIGINAL).asBoolean();
if (deleteOriginal) {
try {
transfer.deleteFile(null, filename);
} catch (final FileNotFoundException e) {
// file doesn't exist -- effectively the same as removing it. Move on.
} catch (final IOException ioe) {
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}", new Object[] {flowFile, host, port, filename, ioe}, ioe);
}
}
}
/**
* Creates a new instance of a FileTransfer that can be used to pull files from a remote system.
*
* @param context the ProcessContext to use in order to obtain configured properties
* @return a FileTransfer that can be used to pull files from a remote system
*/
protected abstract FileTransfer createFileTransfer(ProcessContext context);
/**
* Wrapper around a FileTransfer object that is used to know when the FileTransfer was last used, so that
* we have the ability to close connections that are "idle," or unused for some period of time.
*/
private static class FileTransferIdleWrapper {
private final FileTransfer fileTransfer;
private final long lastUsed;
public FileTransferIdleWrapper(final FileTransfer fileTransfer, final long lastUsed) {
this.fileTransfer = fileTransfer;
this.lastUsed = lastUsed;
}
public FileTransfer getFileTransfer() {
return fileTransfer;
}
public long getLastUsed() {
return this.lastUsed;
}
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
@SupportsBatching
@Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
@CapabilityDescription("Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.")
@SeeAlso({GetSFTP.class, PutSFTP.class, GetFTP.class, PutFTP.class})
@WritesAttributes({
@WritesAttribute(attribute = "sftp.remote.host", description = "The hostname or IP address from which the file was pulled"),
@WritesAttribute(attribute = "sftp.remote.port", description = "The port that was used to communicate with the remote SFTP server"),
@WritesAttribute(attribute = "sftp.remote.filename", description = "The name of the remote file that was pulled"),
@WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"),
})
public class FetchSFTP extends FetchFileTransfer {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(FetchFileTransfer.HOSTNAME);
properties.add(SFTPTransfer.PORT);
properties.add(SFTPTransfer.USERNAME);
properties.add(SFTPTransfer.PASSWORD);
properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
properties.add(FetchFileTransfer.REMOTE_FILENAME);
properties.add(SFTPTransfer.DELETE_ORIGINAL);
properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
properties.add(SFTPTransfer.DATA_TIMEOUT);
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
properties.add(SFTPTransfer.HOST_KEY_FILE);
properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
properties.add(SFTPTransfer.USE_COMPRESSION);
return properties;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
if (!validationContext.getProperty(SFTPTransfer.PASSWORD).isSet() && !(validationContext.getProperty(SFTPTransfer.PRIVATE_KEY_PASSPHRASE).isSet()
&& validationContext.getProperty(SFTPTransfer.PRIVATE_KEY_PATH).isSet())) {
return Collections.singleton(new ValidationResult.Builder()
.subject("Password")
.valid(false)
.explanation("Must set either password or Private Key Path & Passphrase")
.build());
}
return Collections.emptyList();
}
@Override
protected FileTransfer createFileTransfer(final ProcessContext context) {
return new SFTPTransfer(context, getLogger());
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer;
public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The fully qualified hostname or IP address of the remote system")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder()
.name("Remote Path")
.description("The path on the remote system from which to pull or push files")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue(".")
.build();
@Override
protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(getProtocolName() + ".remote.host", context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
attributes.put("file.owner", fileInfo.getOwner());
attributes.put("file.group", fileInfo.getGroup());
attributes.put("file.permissions", fileInfo.getPermissions());
attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
final String fullPath = fileInfo.getFullPathFileName();
if (fullPath != null) {
final int index = fullPath.lastIndexOf("/");
if (index > -1) {
final String path = fullPath.substring(0, index);
attributes.put(CoreAttributes.PATH.key(), path);
}
}
return attributes;
}
@Override
protected String getPath(final ProcessContext context) {
return context.getProperty(REMOTE_PATH).getValue();
}
@Override
protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
final FileTransfer transfer = getFileTransfer(context);
final List<FileInfo> listing = transfer.getListing();
if (minTimestamp == null) {
return listing;
}
final Iterator<FileInfo> itr = listing.iterator();
while (itr.hasNext()) {
final FileInfo next = itr.next();
if (next.getLastModifiedTime() < minTimestamp) {
itr.remove();
}
}
return listing;
}
@Override
protected boolean isListingResetNecessary(final PropertyDescriptor property) {
return HOSTNAME.equals(property) || REMOTE_PATH.equals(property);
}
protected abstract FileTransfer getFileTransfer(final ProcessContext context);
protected abstract String getProtocolName();
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
@TriggerSerially
@Tags({"list", "sftp", "remote", "ingest", "source", "input", "files"})
@CapabilityDescription("Performs a listing of the files residing on an SFTP server. For each file that is found on the remote server, a new FlowFile will be created with the filename attribute "
+ "set to the name of the file on the remote server. This can then be used in conjunction with FetchSFTP in order to fetch those files.")
@SeeAlso({FetchSFTP.class, GetSFTP.class, PutSFTP.class})
@WritesAttributes({
@WritesAttribute(attribute = "sftp.remote.host", description = "The hostname of the SFTP Server"),
@WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"),
@WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"),
@WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"),
@WritesAttribute(attribute = "filename", description = "The name of the file on the SFTP Server"),
@WritesAttribute(attribute = "path", description = "The fully qualified name of the directory on the SFTP Server from which the file was pulled"),
})
public class ListSFTP extends ListFileTransfer {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SFTPTransfer.HOSTNAME);
properties.add(SFTPTransfer.PORT);
properties.add(SFTPTransfer.USERNAME);
properties.add(SFTPTransfer.PASSWORD);
properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
properties.add(REMOTE_PATH);
properties.add(DISTRIBUTED_CACHE_SERVICE);
properties.add(SFTPTransfer.RECURSIVE_SEARCH);
properties.add(SFTPTransfer.FILE_FILTER_REGEX);
properties.add(SFTPTransfer.PATH_FILTER_REGEX);
properties.add(SFTPTransfer.IGNORE_DOTTED_FILES);
properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
properties.add(SFTPTransfer.HOST_KEY_FILE);
properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
properties.add(SFTPTransfer.DATA_TIMEOUT);
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
return properties;
}
@Override
protected FileTransfer getFileTransfer(final ProcessContext context) {
return new SFTPTransfer(context, getLogger());
}
@Override
protected String getProtocolName() {
return "sftp";
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import java.util.Collection;
import java.util.Date;
import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.XmlType;
/**
* A simple POJO for maintaining state about the last entities listed by an AbstractListProcessor that was performed so that
* we can avoid pulling the same file multiple times
*/
@XmlType(name = "listing")
public class EntityListing {
private Date latestTimestamp;
private Collection<String> matchingIdentifiers;
/**
* @return the modification date of the newest file that was contained in the listing
*/
public Date getLatestTimestamp() {
return latestTimestamp;
}
/**
* Sets the timestamp of the modification date of the newest file that was contained in the listing
*
* @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the listing
*/
public void setLatestTimestamp(Date latestTimestamp) {
this.latestTimestamp = latestTimestamp;
}
/**
* @return a Collection containing the identifiers of all entities in the listing whose timestamp
* was equal to {@link #getLatestTimestamp()}
*/
@XmlTransient
public Collection<String> getMatchingIdentifiers() {
return matchingIdentifiers;
}
/**
* Sets the Collection containing the identifiers of all entities in the listing whose Timestamp was
* equal to {@link #getLatestTimestamp()}
*
* @param matchingIdentifiers the identifiers that have last modified date matching the latest timestamp
*/
public void setMatchingIdentifiers(Collection<String> matchingIdentifiers) {
this.matchingIdentifiers = matchingIdentifiers;
}
}

View File

@ -34,16 +34,16 @@ import java.util.Locale;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPHTTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPHTTPClient;
import org.apache.commons.net.ftp.FTPReply;
public class FTPTransfer implements FileTransfer { public class FTPTransfer implements FileTransfer {
@ -57,53 +57,53 @@ public class FTPTransfer implements FileTransfer {
public static final String PROXY_TYPE_SOCKS = Proxy.Type.SOCKS.name(); public static final String PROXY_TYPE_SOCKS = Proxy.Type.SOCKS.name();
public static final PropertyDescriptor CONNECTION_MODE = new PropertyDescriptor.Builder() public static final PropertyDescriptor CONNECTION_MODE = new PropertyDescriptor.Builder()
.name("Connection Mode") .name("Connection Mode")
.description("The FTP Connection Mode") .description("The FTP Connection Mode")
.allowableValues(CONNECTION_MODE_ACTIVE, CONNECTION_MODE_PASSIVE) .allowableValues(CONNECTION_MODE_ACTIVE, CONNECTION_MODE_PASSIVE)
.defaultValue(CONNECTION_MODE_PASSIVE) .defaultValue(CONNECTION_MODE_PASSIVE)
.build(); .build();
public static final PropertyDescriptor TRANSFER_MODE = new PropertyDescriptor.Builder() public static final PropertyDescriptor TRANSFER_MODE = new PropertyDescriptor.Builder()
.name("Transfer Mode") .name("Transfer Mode")
.description("The FTP Transfer Mode") .description("The FTP Transfer Mode")
.allowableValues(TRANSFER_MODE_BINARY, TRANSFER_MODE_ASCII) .allowableValues(TRANSFER_MODE_BINARY, TRANSFER_MODE_ASCII)
.defaultValue(TRANSFER_MODE_BINARY) .defaultValue(TRANSFER_MODE_BINARY)
.build(); .build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("Port") .name("Port")
.description("The port that the remote system is listening on for file transfers") .description("The port that the remote system is listening on for file transfers")
.addValidator(StandardValidators.PORT_VALIDATOR) .addValidator(StandardValidators.PORT_VALIDATOR)
.required(true) .required(true)
.defaultValue("21") .defaultValue("21")
.build(); .build();
public static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder()
.name("Proxy Type") .name("Proxy Type")
.description("Proxy type used for file transfers") .description("Proxy type used for file transfers")
.allowableValues(PROXY_TYPE_DIRECT, PROXY_TYPE_HTTP, PROXY_TYPE_SOCKS) .allowableValues(PROXY_TYPE_DIRECT, PROXY_TYPE_HTTP, PROXY_TYPE_SOCKS)
.defaultValue(PROXY_TYPE_DIRECT) .defaultValue(PROXY_TYPE_DIRECT)
.build(); .build();
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
.name("Proxy Host") .name("Proxy Host")
.description("The fully qualified hostname or IP address of the proxy server") .description("The fully qualified hostname or IP address of the proxy server")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
.name("Proxy Port") .name("Proxy Port")
.description("The port of the proxy server") .description("The port of the proxy server")
.addValidator(StandardValidators.PORT_VALIDATOR) .addValidator(StandardValidators.PORT_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
.name("Http Proxy Username") .name("Http Proxy Username")
.description("Http Proxy Username") .description("Http Proxy Username")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false) .required(false)
.build(); .build();
public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder() public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
.name("Http Proxy Password") .name("Http Proxy Password")
.description("Http Proxy Password") .description("Http Proxy Password")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false) .required(false)
.sensitive(true) .sensitive(true)
.build(); .build();
private final ProcessorLog logger; private final ProcessorLog logger;
@ -135,7 +135,7 @@ public class FTPTransfer implements FileTransfer {
client.disconnect(); client.disconnect();
} }
} catch (final Exception ex) { } catch (final Exception ex) {
logger.warn("Failed to close FTPClient due to {}", new Object[]{ex.toString()}, ex); logger.warn("Failed to close FTPClient due to {}", new Object[] { ex.toString() }, ex);
} }
client = null; client = null;
} }
@ -261,19 +261,24 @@ public class FTPTransfer implements FileTransfer {
perms.append(file.hasPermission(FTPFile.WORLD_ACCESS, FTPFile.EXECUTE_PERMISSION) ? "x" : "-"); perms.append(file.hasPermission(FTPFile.WORLD_ACCESS, FTPFile.EXECUTE_PERMISSION) ? "x" : "-");
FileInfo.Builder builder = new FileInfo.Builder() FileInfo.Builder builder = new FileInfo.Builder()
.filename(file.getName()) .filename(file.getName())
.fullPathFileName(newFullForwardPath) .fullPathFileName(newFullForwardPath)
.directory(file.isDirectory()) .directory(file.isDirectory())
.size(file.getSize()) .size(file.getSize())
.lastModifiedTime(file.getTimestamp().getTimeInMillis()) .lastModifiedTime(file.getTimestamp().getTimeInMillis())
.permissions(perms.toString()) .permissions(perms.toString())
.owner(file.getUser()) .owner(file.getUser())
.group(file.getGroup()); .group(file.getGroup());
return builder.build(); return builder.build();
} }
@Override @Override
public InputStream getInputStream(final String remoteFileName) throws IOException { public InputStream getInputStream(String remoteFileName) throws IOException {
return getInputStream(remoteFileName, null);
}
@Override
public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
final FTPClient client = getClient(null); final FTPClient client = getClient(null);
InputStream in = client.retrieveFileStream(remoteFileName); InputStream in = client.retrieveFileStream(remoteFileName);
if (in == null) { if (in == null) {
@ -329,9 +334,9 @@ public class FTPTransfer implements FileTransfer {
final boolean cdSuccessful = setWorkingDirectory(remoteDirectory); final boolean cdSuccessful = setWorkingDirectory(remoteDirectory);
if (!cdSuccessful) { if (!cdSuccessful) {
logger.debug("Remote Directory {} does not exist; creating it", new Object[]{remoteDirectory}); logger.debug("Remote Directory {} does not exist; creating it", new Object[] { remoteDirectory });
if (client.makeDirectory(remoteDirectory)) { if (client.makeDirectory(remoteDirectory)) {
logger.debug("Created {}", new Object[]{remoteDirectory}); logger.debug("Created {}", new Object[] { remoteDirectory });
} else { } else {
throw new IOException("Failed to create remote directory " + remoteDirectory); throw new IOException("Failed to create remote directory " + remoteDirectory);
} }
@ -387,10 +392,10 @@ public class FTPTransfer implements FileTransfer {
final String time = outformat.format(fileModifyTime); final String time = outformat.format(fileModifyTime);
if (!client.setModificationTime(tempFilename, time)) { if (!client.setModificationTime(tempFilename, time)) {
// FTP server probably doesn't support MFMT command // FTP server probably doesn't support MFMT command
logger.warn("Could not set lastModifiedTime on {} to {}", new Object[]{flowFile, lastModifiedTime}); logger.warn("Could not set lastModifiedTime on {} to {}", new Object[] { flowFile, lastModifiedTime });
} }
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[]{flowFile, lastModifiedTime, e}); logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] { flowFile, lastModifiedTime, e });
} }
} }
final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue(); final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
@ -399,17 +404,17 @@ public class FTPTransfer implements FileTransfer {
int perms = numberPermissions(permissions); int perms = numberPermissions(permissions);
if (perms >= 0) { if (perms >= 0) {
if (!client.sendSiteCommand("chmod " + Integer.toOctalString(perms) + " " + tempFilename)) { if (!client.sendSiteCommand("chmod " + Integer.toOctalString(perms) + " " + tempFilename)) {
logger.warn("Could not set permission on {} to {}", new Object[]{flowFile, permissions}); logger.warn("Could not set permission on {} to {}", new Object[] { flowFile, permissions });
} }
} }
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to set permission on {} to {} due to {}", new Object[]{flowFile, permissions, e}); logger.error("Failed to set permission on {} to {} due to {}", new Object[] { flowFile, permissions, e });
} }
} }
if (!filename.equals(tempFilename)) { if (!filename.equals(tempFilename)) {
try { try {
logger.debug("Renaming remote path from {} to {} for {}", new Object[]{tempFilename, filename, flowFile}); logger.debug("Renaming remote path from {} to {} for {}", new Object[] { tempFilename, filename, flowFile });
final boolean renameSuccessful = client.rename(tempFilename, filename); final boolean renameSuccessful = client.rename(tempFilename, filename);
if (!renameSuccessful) { if (!renameSuccessful) {
throw new IOException("Failed to rename temporary file " + tempFilename + " to " + fullPath + " due to: " + client.getReplyString()); throw new IOException("Failed to rename temporary file " + tempFilename + " to " + fullPath + " due to: " + client.getReplyString());
@ -513,13 +518,13 @@ public class FTPTransfer implements FileTransfer {
inetAddress = InetAddress.getByName(remoteHostname); inetAddress = InetAddress.getByName(remoteHostname);
} }
client.connect(inetAddress, ctx.getProperty(PORT).asInteger()); client.connect(inetAddress, ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger());
this.closed = false; this.closed = false;
client.setDataTimeout(ctx.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); client.setDataTimeout(ctx.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
client.setSoTimeout(ctx.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); client.setSoTimeout(ctx.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
final String username = ctx.getProperty(USERNAME).getValue(); final String username = ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
final String password = ctx.getProperty(PASSWORD).getValue(); final String password = ctx.getProperty(PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
final boolean loggedIn = client.login(username, password); final boolean loggedIn = client.login(username, password);
if (!loggedIn) { if (!loggedIn) {
throw new IOException("Could not login for user '" + username + "'"); throw new IOException("Could not login for user '" + username + "'");
@ -532,7 +537,7 @@ public class FTPTransfer implements FileTransfer {
client.enterLocalPassiveMode(); client.enterLocalPassiveMode();
} }
final String transferMode = ctx.getProperty(TRANSFER_MODE).getValue(); final String transferMode = ctx.getProperty(TRANSFER_MODE).evaluateAttributeExpressions(flowFile).getValue();
final int fileType = (transferMode.equalsIgnoreCase(TRANSFER_MODE_ASCII)) ? FTPClient.ASCII_FILE_TYPE : FTPClient.BINARY_FILE_TYPE; final int fileType = (transferMode.equalsIgnoreCase(TRANSFER_MODE_ASCII)) ? FTPClient.ASCII_FILE_TYPE : FTPClient.BINARY_FILE_TYPE;
if (!client.setFileType(fileType)) { if (!client.setFileType(fileType)) {
throw new IOException("Unable to set transfer mode to type " + transferMode); throw new IOException("Unable to set transfer mode to type " + transferMode);

View File

@ -18,7 +18,7 @@ package org.apache.nifi.processors.standard.util;
import java.io.Serializable; import java.io.Serializable;
public class FileInfo implements Comparable<FileInfo>, Serializable { public class FileInfo implements Comparable<FileInfo>, Serializable, ListableEntity {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@ -164,4 +164,20 @@ public class FileInfo implements Comparable<FileInfo>, Serializable {
return this; return this;
} }
} }
@Override
public String getName() {
return getFileName();
}
@Override
public String getIdentifier() {
final String fullPathName = getFullPathFileName();
return fullPathName == null ? getName() : fullPathName;
}
@Override
public long getTimestamp() {
return getLastModifiedTime();
}
} }

View File

@ -23,6 +23,7 @@ import java.io.InputStream;
import java.util.List; import java.util.List;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
@ -34,6 +35,8 @@ public interface FileTransfer extends Closeable {
InputStream getInputStream(String remoteFileName) throws IOException; InputStream getInputStream(String remoteFileName) throws IOException;
InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException;
void flush() throws IOException; void flush() throws IOException;
FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException; FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException;
@ -51,127 +54,127 @@ public interface FileTransfer extends Closeable {
void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException; void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException;
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname") .name("Hostname")
.description("The fully qualified hostname or IP address of the remote system") .description("The fully qualified hostname or IP address of the remote system")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true) .required(true)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("Username") .name("Username")
.description("Username") .description("Username")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true) .required(true)
.build(); .build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password") .name("Password")
.description("Password for the user account") .description("Password for the user account")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(Validator.VALID)
.required(false) .required(false)
.sensitive(true) .sensitive(true)
.build(); .build();
public static final PropertyDescriptor DATA_TIMEOUT = new PropertyDescriptor.Builder() public static final PropertyDescriptor DATA_TIMEOUT = new PropertyDescriptor.Builder()
.name("Data Timeout") .name("Data Timeout")
.description("Amount of time to wait before timing out while transferring data") .description("When transferring a file between the local and remote system, this value specifies how long is allowed to elapse without any data being transferred between systems")
.required(true) .required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 sec") .defaultValue("30 sec")
.build(); .build();
public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder() public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
.name("Connection Timeout") .name("Connection Timeout")
.description("Amount of time to wait before timing out while creating a connection") .description("Amount of time to wait before timing out while creating a connection")
.required(true) .required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 sec") .defaultValue("30 sec")
.build(); .build();
public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder() public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder()
.name("Remote Path") .name("Remote Path")
.description("The path on the remote system from which to pull or push files") .description("The path on the remote system from which to pull or push files")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor CREATE_DIRECTORY = new PropertyDescriptor.Builder() public static final PropertyDescriptor CREATE_DIRECTORY = new PropertyDescriptor.Builder()
.name("Create Directory") .name("Create Directory")
.description("Specifies whether or not the remote directory should be created if it does not exist.") .description("Specifies whether or not the remote directory should be created if it does not exist.")
.required(true) .required(true)
.allowableValues("true", "false") .allowableValues("true", "false")
.defaultValue("false") .defaultValue("false")
.build(); .build();
public static final PropertyDescriptor USE_COMPRESSION = new PropertyDescriptor.Builder() public static final PropertyDescriptor USE_COMPRESSION = new PropertyDescriptor.Builder()
.name("Use Compression") .name("Use Compression")
.description("Indicates whether or not ZLIB compression should be used when transferring files") .description("Indicates whether or not ZLIB compression should be used when transferring files")
.allowableValues("true", "false") .allowableValues("true", "false")
.defaultValue("false") .defaultValue("false")
.required(true) .required(true)
.build(); .build();
// GET-specific properties // GET-specific properties
public static final PropertyDescriptor RECURSIVE_SEARCH = new PropertyDescriptor.Builder() public static final PropertyDescriptor RECURSIVE_SEARCH = new PropertyDescriptor.Builder()
.name("Search Recursively") .name("Search Recursively")
.description("If true, will pull files from arbitrarily nested subdirectories; otherwise, will not traverse subdirectories") .description("If true, will pull files from arbitrarily nested subdirectories; otherwise, will not traverse subdirectories")
.required(true) .required(true)
.defaultValue("false") .defaultValue("false")
.allowableValues("true", "false") .allowableValues("true", "false")
.build(); .build();
public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder() public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder()
.name("File Filter Regex") .name("File Filter Regex")
.description("Provides a Java Regular Expression for filtering Filenames; if a filter is supplied, only files whose names match that Regular Expression will be fetched") .description("Provides a Java Regular Expression for filtering Filenames; if a filter is supplied, only files whose names match that Regular Expression will be fetched")
.required(false) .required(false)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor PATH_FILTER_REGEX = new PropertyDescriptor.Builder() public static final PropertyDescriptor PATH_FILTER_REGEX = new PropertyDescriptor.Builder()
.name("Path Filter Regex") .name("Path Filter Regex")
.description("When " + RECURSIVE_SEARCH.getName() + " is true, then only subdirectories whose path matches the given Regular Expression will be scanned") .description("When " + RECURSIVE_SEARCH.getName() + " is true, then only subdirectories whose path matches the given Regular Expression will be scanned")
.required(false) .required(false)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor MAX_SELECTS = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAX_SELECTS = new PropertyDescriptor.Builder()
.name("Max Selects") .name("Max Selects")
.description("The maximum number of files to pull in a single connection") .description("The maximum number of files to pull in a single connection")
.defaultValue("100") .defaultValue("100")
.required(true) .required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor REMOTE_POLL_BATCH_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor REMOTE_POLL_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Remote Poll Batch Size") .name("Remote Poll Batch Size")
.description("The value specifies how many file paths to find in a given directory on the remote system when doing a file listing. This value " .description("The value specifies how many file paths to find in a given directory on the remote system when doing a file listing. This value "
+ "in general should not need to be modified but when polling against a remote system with a tremendous number of files this value can " + "in general should not need to be modified but when polling against a remote system with a tremendous number of files this value can "
+ "be critical. Setting this value too high can result very poor performance and setting it too low can cause the flow to be slower " + "be critical. Setting this value too high can result very poor performance and setting it too low can cause the flow to be slower "
+ "than normal.") + "than normal.")
.defaultValue("5000") .defaultValue("5000")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.required(true) .required(true)
.build(); .build();
public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder() public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder()
.name("Delete Original") .name("Delete Original")
.description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred") .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred")
.defaultValue("true") .defaultValue("true")
.allowableValues("true", "false") .allowableValues("true", "false")
.required(true) .required(true)
.build(); .build();
public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder() public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder()
.name("Polling Interval") .name("Polling Interval")
.description("Determines how long to wait between fetching the listing for new files") .description("Determines how long to wait between fetching the listing for new files")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.required(true) .required(true)
.defaultValue("60 sec") .defaultValue("60 sec")
.build(); .build();
public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder() public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder()
.name("Ignore Dotted Files") .name("Ignore Dotted Files")
.description("If true, files whose names begin with a dot (\".\") will be ignored") .description("If true, files whose names begin with a dot (\".\") will be ignored")
.allowableValues("true", "false") .allowableValues("true", "false")
.defaultValue("true") .defaultValue("true")
.required(true) .required(true)
.build(); .build();
public static final PropertyDescriptor USE_NATURAL_ORDERING = new PropertyDescriptor.Builder() public static final PropertyDescriptor USE_NATURAL_ORDERING = new PropertyDescriptor.Builder()
.name("Use Natural Ordering") .name("Use Natural Ordering")
.description("If true, will pull files in the order in which they are naturally listed; otherwise, the order in which the files will be pulled is not defined") .description("If true, will pull files in the order in which they are naturally listed; otherwise, the order in which the files will be pulled is not defined")
.allowableValues("true", "false") .allowableValues("true", "false")
.defaultValue("false") .defaultValue("false")
.required(true) .required(true)
.build(); .build();
// PUT-specific properties // PUT-specific properties
public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
@ -183,77 +186,77 @@ public interface FileTransfer extends Closeable {
public static final String CONFLICT_RESOLUTION_FAIL = "FAIL"; public static final String CONFLICT_RESOLUTION_FAIL = "FAIL";
public static final String CONFLICT_RESOLUTION_NONE = "NONE"; public static final String CONFLICT_RESOLUTION_NONE = "NONE";
public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("Conflict Resolution") .name("Conflict Resolution")
.description("Determines how to handle the problem of filename collisions") .description("Determines how to handle the problem of filename collisions")
.required(true) .required(true)
.allowableValues(CONFLICT_RESOLUTION_REPLACE, CONFLICT_RESOLUTION_IGNORE, CONFLICT_RESOLUTION_RENAME, CONFLICT_RESOLUTION_REJECT, CONFLICT_RESOLUTION_FAIL, CONFLICT_RESOLUTION_NONE) .allowableValues(CONFLICT_RESOLUTION_REPLACE, CONFLICT_RESOLUTION_IGNORE, CONFLICT_RESOLUTION_RENAME, CONFLICT_RESOLUTION_REJECT, CONFLICT_RESOLUTION_FAIL, CONFLICT_RESOLUTION_NONE)
.defaultValue(CONFLICT_RESOLUTION_NONE) .defaultValue(CONFLICT_RESOLUTION_NONE)
.build(); .build();
public static final PropertyDescriptor REJECT_ZERO_BYTE = new PropertyDescriptor.Builder() public static final PropertyDescriptor REJECT_ZERO_BYTE = new PropertyDescriptor.Builder()
.name("Reject Zero-Byte Files") .name("Reject Zero-Byte Files")
.description("Determines whether or not Zero-byte files should be rejected without attempting to transfer") .description("Determines whether or not Zero-byte files should be rejected without attempting to transfer")
.allowableValues("true", "false") .allowableValues("true", "false")
.defaultValue("true") .defaultValue("true")
.build(); .build();
public static final PropertyDescriptor DOT_RENAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor DOT_RENAME = new PropertyDescriptor.Builder()
.name("Dot Rename") .name("Dot Rename")
.description("If true, then the filename of the sent file is prepended with a \".\" and then renamed back to the " .description("If true, then the filename of the sent file is prepended with a \".\" and then renamed back to the "
+ "original once the file is completely sent. Otherwise, there is no rename. This property is ignored if the " + "original once the file is completely sent. Otherwise, there is no rename. This property is ignored if the "
+ "Temporary Filename property is set.") + "Temporary Filename property is set.")
.allowableValues("true", "false") .allowableValues("true", "false")
.defaultValue("true") .defaultValue("true")
.build(); .build();
public static final PropertyDescriptor TEMP_FILENAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor TEMP_FILENAME = new PropertyDescriptor.Builder()
.name("Temporary Filename") .name("Temporary Filename")
.description("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful " .description("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful "
+ "completion will be renamed to the original filename. If this value is set, the Dot Rename property is ignored.") + "completion will be renamed to the original filename. If this value is set, the Dot Rename property is ignored.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.required(false) .required(false)
.build(); .build();
public static final PropertyDescriptor LAST_MODIFIED_TIME = new PropertyDescriptor.Builder() public static final PropertyDescriptor LAST_MODIFIED_TIME = new PropertyDescriptor.Builder()
.name("Last Modified Time") .name("Last Modified Time")
.description("The lastModifiedTime to assign to the file after transferring it. If not set, the lastModifiedTime will not be changed. " .description("The lastModifiedTime to assign to the file after transferring it. If not set, the lastModifiedTime will not be changed. "
+ "Format must be yyyy-MM-dd'T'HH:mm:ssZ. You may also use expression language such as ${file.lastModifiedTime}. If the value " + "Format must be yyyy-MM-dd'T'HH:mm:ssZ. You may also use expression language such as ${file.lastModifiedTime}. If the value "
+ "is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.") + "is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder() public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder()
.name("Permissions") .name("Permissions")
.description("The permissions to assign to the file after transferring it. Format must be either UNIX rwxrwxrwx with a - in place of " .description("The permissions to assign to the file after transferring it. Format must be either UNIX rwxrwxrwx with a - in place of "
+ "denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). If not set, the permissions will not be changed. You may " + "denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). If not set, the permissions will not be changed. You may "
+ "also use expression language such as ${file.permissions}. If the value is invalid, the processor will not be invalid but will " + "also use expression language such as ${file.permissions}. If the value is invalid, the processor will not be invalid but will "
+ "fail to change permissions of the file.") + "fail to change permissions of the file.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder() public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
.name("Remote Owner") .name("Remote Owner")
.description("Integer value representing the User ID to set on the file after transferring it. If not set, the owner will not be set. " .description("Integer value representing the User ID to set on the file after transferring it. If not set, the owner will not be set. "
+ "You may also use expression language such as ${file.owner}. If the value is invalid, the processor will not be invalid but " + "You may also use expression language such as ${file.owner}. If the value is invalid, the processor will not be invalid but "
+ "will fail to change the owner of the file.") + "will fail to change the owner of the file.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder() public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder()
.name("Remote Group") .name("Remote Group")
.description("Integer value representing the Group ID to set on the file after transferring it. If not set, the group will not be set. " .description("Integer value representing the Group ID to set on the file after transferring it. If not set, the group will not be set. "
+ "You may also use expression language such as ${file.group}. If the value is invalid, the processor will not be invalid but " + "You may also use expression language such as ${file.group}. If the value is invalid, the processor will not be invalid but "
+ "will fail to change the group of the file.") + "will fail to change the group of the file.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size") .name("Batch Size")
.description("The maximum number of FlowFiles to send in a single connection") .description("The maximum number of FlowFiles to send in a single connection")
.required(true) .required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("500") .defaultValue("500")
.build(); .build();
} }

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
public interface ListableEntity {
/**
* @return The name of the remote entity
*/
String getName();
/**
* @return the identifier of the remote entity. This may or may not be the same as the name of the
* entity but should be unique across all entities.
*/
String getIdentifier();
/**
* @return the timestamp for this entity so that we can be efficient about not performing listings of the same
* entities multiple times
*/
long getTimestamp();
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import java.io.IOException;
public class PermissionDeniedException extends IOException {
private static final long serialVersionUID = -6215434916883053982L;
public PermissionDeniedException(final String message) {
super(message);
}
public PermissionDeniedException(final String message, final Throwable t) {
super(message, t);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard.util; package org.apache.nifi.processors.standard.util;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.file.Path; import java.nio.file.Path;
@ -51,45 +52,45 @@ import com.jcraft.jsch.SftpException;
public class SFTPTransfer implements FileTransfer { public class SFTPTransfer implements FileTransfer {
public static final PropertyDescriptor PRIVATE_KEY_PATH = new PropertyDescriptor.Builder() public static final PropertyDescriptor PRIVATE_KEY_PATH = new PropertyDescriptor.Builder()
.name("Private Key Path") .name("Private Key Path")
.description("The fully qualified path to the Private Key file") .description("The fully qualified path to the Private Key file")
.required(false) .required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor PRIVATE_KEY_PASSPHRASE = new PropertyDescriptor.Builder() public static final PropertyDescriptor PRIVATE_KEY_PASSPHRASE = new PropertyDescriptor.Builder()
.name("Private Key Passphrase") .name("Private Key Passphrase")
.description("Password for the private key") .description("Password for the private key")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true) .sensitive(true)
.build(); .build();
public static final PropertyDescriptor HOST_KEY_FILE = new PropertyDescriptor.Builder() public static final PropertyDescriptor HOST_KEY_FILE = new PropertyDescriptor.Builder()
.name("Host Key File") .name("Host Key File")
.description("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used") .description("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used")
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.required(false) .required(false)
.build(); .build();
public static final PropertyDescriptor STRICT_HOST_KEY_CHECKING = new PropertyDescriptor.Builder() public static final PropertyDescriptor STRICT_HOST_KEY_CHECKING = new PropertyDescriptor.Builder()
.name("Strict Host Key Checking") .name("Strict Host Key Checking")
.description("Indicates whether or not strict enforcement of hosts keys should be applied") .description("Indicates whether or not strict enforcement of hosts keys should be applied")
.allowableValues("true", "false") .allowableValues("true", "false")
.defaultValue("false") .defaultValue("false")
.required(true) .required(true)
.build(); .build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("Port") .name("Port")
.description("The port that the remote system is listening on for file transfers") .description("The port that the remote system is listening on for file transfers")
.addValidator(StandardValidators.PORT_VALIDATOR) .addValidator(StandardValidators.PORT_VALIDATOR)
.required(true) .required(true)
.defaultValue("22") .defaultValue("22")
.build(); .build();
public static final PropertyDescriptor USE_KEEPALIVE_ON_TIMEOUT = new PropertyDescriptor.Builder() public static final PropertyDescriptor USE_KEEPALIVE_ON_TIMEOUT = new PropertyDescriptor.Builder()
.name("Send Keep Alive On Timeout") .name("Send Keep Alive On Timeout")
.description("Indicates whether or not to send a single Keep Alive message when SSH socket times out") .description("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
.allowableValues("true", "false") .allowableValues("true", "false")
.defaultValue("true") .defaultValue("true")
.required(true) .required(true)
.build(); .build();
/** /**
* Dynamic property which is used to decide if the {@link #ensureDirectoryExists(FlowFile, File)} method should perform a {@link ChannelSftp#ls(String)} before calling * Dynamic property which is used to decide if the {@link #ensureDirectoryExists(FlowFile, File)} method should perform a {@link ChannelSftp#ls(String)} before calling
@ -99,12 +100,12 @@ public class SFTPTransfer implements FileTransfer {
* This property is dynamic until deemed a worthy inclusion as proper. * This property is dynamic until deemed a worthy inclusion as proper.
*/ */
public static final PropertyDescriptor DISABLE_DIRECTORY_LISTING = new PropertyDescriptor.Builder() public static final PropertyDescriptor DISABLE_DIRECTORY_LISTING = new PropertyDescriptor.Builder()
.name("Disable Directory Listing") .name("Disable Directory Listing")
.description("Disables directory listings before operations which might fail, such as configurations which create directory structures.") .description("Disables directory listings before operations which might fail, such as configurations which create directory structures.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR) .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.dynamic(true) .dynamic(true)
.defaultValue("false") .defaultValue("false")
.build(); .build();
private final ProcessorLog logger; private final ProcessorLog logger;
@ -133,7 +134,16 @@ public class SFTPTransfer implements FileTransfer {
public List<FileInfo> getListing() throws IOException { public List<FileInfo> getListing() throws IOException {
final String path = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue(); final String path = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue();
final int depth = 0; final int depth = 0;
final int maxResults = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE).asInteger();
final int maxResults;
final PropertyValue batchSizeValue = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE);
if (batchSizeValue == null) {
maxResults = Integer.MAX_VALUE;
} else {
final Integer configuredValue = batchSizeValue.asInteger();
maxResults = configuredValue == null ? Integer.MAX_VALUE : configuredValue;
}
final List<FileInfo> listing = new ArrayList<>(1000); final List<FileInfo> listing = new ArrayList<>(1000);
getListing(path, depth, maxResults, listing); getListing(path, depth, maxResults, listing);
return listing; return listing;
@ -222,7 +232,15 @@ public class SFTPTransfer implements FileTransfer {
sftp.ls(path, filter); sftp.ls(path, filter);
} }
} catch (final SftpException e) { } catch (final SftpException e) {
throw new IOException("Failed to obtain file listing for " + (path == null ? "current directory" : path), e); final String pathDesc = path == null ? "current directory" : path;
switch (e.id) {
case ChannelSftp.SSH_FX_NO_SUCH_FILE:
throw new FileNotFoundException("Could not perform listing on " + pathDesc + " because could not find the file on the remote server");
case ChannelSftp.SSH_FX_PERMISSION_DENIED:
throw new PermissionDeniedException("Could not perform listing on " + pathDesc + " due to insufficient permissions");
default:
throw new IOException("Failed to obtain file listing for " + pathDesc, e);
}
} }
for (final LsEntry entry : subDirs) { for (final LsEntry entry : subDirs) {
@ -251,24 +269,36 @@ public class SFTPTransfer implements FileTransfer {
} }
FileInfo.Builder builder = new FileInfo.Builder() FileInfo.Builder builder = new FileInfo.Builder()
.filename(entry.getFilename()) .filename(entry.getFilename())
.fullPathFileName(newFullForwardPath) .fullPathFileName(newFullForwardPath)
.directory(entry.getAttrs().isDir()) .directory(entry.getAttrs().isDir())
.size(entry.getAttrs().getSize()) .size(entry.getAttrs().getSize())
.lastModifiedTime(entry.getAttrs().getMTime() * 1000L) .lastModifiedTime(entry.getAttrs().getMTime() * 1000L)
.permissions(perms) .permissions(perms)
.owner(Integer.toString(entry.getAttrs().getUId())) .owner(Integer.toString(entry.getAttrs().getUId()))
.group(Integer.toString(entry.getAttrs().getGId())); .group(Integer.toString(entry.getAttrs().getGId()));
return builder.build(); return builder.build();
} }
@Override @Override
public InputStream getInputStream(final String remoteFileName) throws IOException { public InputStream getInputStream(final String remoteFileName) throws IOException {
final ChannelSftp sftp = getChannel(null); return getInputStream(remoteFileName, null);
}
@Override
public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
final ChannelSftp sftp = getChannel(flowFile);
try { try {
return sftp.get(remoteFileName); return sftp.get(remoteFileName);
} catch (final SftpException e) { } catch (final SftpException e) {
throw new IOException("Failed to obtain file content for " + remoteFileName, e); switch (e.id) {
case ChannelSftp.SSH_FX_NO_SUCH_FILE:
throw new FileNotFoundException("Could not find file " + remoteFileName + " on remote SFTP Server");
case ChannelSftp.SSH_FX_PERMISSION_DENIED:
throw new PermissionDeniedException("Insufficient permissions to read file " + remoteFileName + " from remote SFTP Server", e);
default:
throw new IOException("Failed to obtain file content for " + remoteFileName, e);
}
} }
} }
@ -283,7 +313,14 @@ public class SFTPTransfer implements FileTransfer {
try { try {
sftp.rm(fullPath); sftp.rm(fullPath);
} catch (final SftpException e) { } catch (final SftpException e) {
throw new IOException("Failed to delete remote file " + fullPath, e); switch (e.id) {
case ChannelSftp.SSH_FX_NO_SUCH_FILE:
throw new FileNotFoundException("Could not find file " + remoteFileName + " to remove from remote SFTP Server");
case ChannelSftp.SSH_FX_PERMISSION_DENIED:
throw new PermissionDeniedException("Insufficient permissions to delete file " + remoteFileName + " from remote SFTP Server", e);
default:
throw new IOException("Failed to delete remote file " + fullPath, e);
}
} }
} }
@ -333,10 +370,10 @@ public class SFTPTransfer implements FileTransfer {
if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) { if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) {
ensureDirectoryExists(flowFile, directoryName.getParentFile()); ensureDirectoryExists(flowFile, directoryName.getParentFile());
} }
logger.debug("Remote Directory {} does not exist; creating it", new Object[]{remoteDirectory}); logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
try { try {
channel.mkdir(remoteDirectory); channel.mkdir(remoteDirectory);
logger.debug("Created {}", new Object[]{remoteDirectory}); logger.debug("Created {}", new Object[] {remoteDirectory});
} catch (final SftpException e) { } catch (final SftpException e) {
throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e); throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e);
} }
@ -358,9 +395,9 @@ public class SFTPTransfer implements FileTransfer {
final JSch jsch = new JSch(); final JSch jsch = new JSch();
try { try {
final Session session = jsch.getSession(ctx.getProperty(USERNAME).getValue(), final Session session = jsch.getSession(ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue(),
ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(), ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(),
ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue()); ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue());
final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE).getValue(); final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE).getValue();
if (hostKeyVal != null) { if (hostKeyVal != null) {
@ -371,7 +408,8 @@ public class SFTPTransfer implements FileTransfer {
properties.setProperty("StrictHostKeyChecking", ctx.getProperty(STRICT_HOST_KEY_CHECKING).asBoolean() ? "yes" : "no"); properties.setProperty("StrictHostKeyChecking", ctx.getProperty(STRICT_HOST_KEY_CHECKING).asBoolean() ? "yes" : "no");
properties.setProperty("PreferredAuthentications", "publickey,password"); properties.setProperty("PreferredAuthentications", "publickey,password");
if (ctx.getProperty(FileTransfer.USE_COMPRESSION).asBoolean()) { final PropertyValue compressionValue = ctx.getProperty(FileTransfer.USE_COMPRESSION);
if (compressionValue != null && "true".equalsIgnoreCase(compressionValue.getValue())) {
properties.setProperty("compression.s2c", "zlib@openssh.com,zlib,none"); properties.setProperty("compression.s2c", "zlib@openssh.com,zlib,none");
properties.setProperty("compression.c2s", "zlib@openssh.com,zlib,none"); properties.setProperty("compression.c2s", "zlib@openssh.com,zlib,none");
} else { } else {
@ -381,12 +419,12 @@ public class SFTPTransfer implements FileTransfer {
session.setConfig(properties); session.setConfig(properties);
final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH).getValue(); final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH).evaluateAttributeExpressions(flowFile).getValue();
if (privateKeyFile != null) { if (privateKeyFile != null) {
jsch.addIdentity(privateKeyFile, ctx.getProperty(PRIVATE_KEY_PASSPHRASE).getValue()); jsch.addIdentity(privateKeyFile, ctx.getProperty(PRIVATE_KEY_PASSPHRASE).evaluateAttributeExpressions(flowFile).getValue());
} }
final String password = ctx.getProperty(FileTransfer.PASSWORD).getValue(); final String password = ctx.getProperty(FileTransfer.PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
if (password != null) { if (password != null) {
session.setPassword(password); session.setPassword(password);
} }
@ -428,7 +466,7 @@ public class SFTPTransfer implements FileTransfer {
sftp.exit(); sftp.exit();
} }
} catch (final Exception ex) { } catch (final Exception ex) {
logger.warn("Failed to close ChannelSftp due to {}", new Object[]{ex.toString()}, ex); logger.warn("Failed to close ChannelSftp due to {}", new Object[] {ex.toString()}, ex);
} }
sftp = null; sftp = null;
@ -437,7 +475,7 @@ public class SFTPTransfer implements FileTransfer {
session.disconnect(); session.disconnect();
} }
} catch (final Exception ex) { } catch (final Exception ex) {
logger.warn("Failed to close session due to {}", new Object[]{ex.toString()}, ex); logger.warn("Failed to close session due to {}", new Object[] {ex.toString()}, ex);
} }
session = null; session = null;
} }
@ -515,7 +553,7 @@ public class SFTPTransfer implements FileTransfer {
int time = (int) (fileModifyTime.getTime() / 1000L); int time = (int) (fileModifyTime.getTime() / 1000L);
sftp.setMtime(tempPath, time); sftp.setMtime(tempPath, time);
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[]{tempPath, lastModifiedTime, e}); logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] {tempPath, lastModifiedTime, e});
} }
} }
@ -527,7 +565,7 @@ public class SFTPTransfer implements FileTransfer {
sftp.chmod(perms, tempPath); sftp.chmod(perms, tempPath);
} }
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to set permission on {} to {} due to {}", new Object[]{tempPath, permissions, e}); logger.error("Failed to set permission on {} to {} due to {}", new Object[] {tempPath, permissions, e});
} }
} }
@ -536,7 +574,7 @@ public class SFTPTransfer implements FileTransfer {
try { try {
sftp.chown(Integer.parseInt(owner), tempPath); sftp.chown(Integer.parseInt(owner), tempPath);
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to set owner on {} to {} due to {}", new Object[]{tempPath, owner, e}); logger.error("Failed to set owner on {} to {} due to {}", new Object[] {tempPath, owner, e});
} }
} }
@ -545,7 +583,7 @@ public class SFTPTransfer implements FileTransfer {
try { try {
sftp.chgrp(Integer.parseInt(group), tempPath); sftp.chgrp(Integer.parseInt(group), tempPath);
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to set group on {} to {} due to {}", new Object[]{tempPath, group, e}); logger.error("Failed to set group on {} to {} due to {}", new Object[] {tempPath, group, e});
} }
} }

View File

@ -28,6 +28,7 @@ org.apache.nifi.processors.standard.EvaluateXQuery
org.apache.nifi.processors.standard.ExecuteStreamCommand org.apache.nifi.processors.standard.ExecuteStreamCommand
org.apache.nifi.processors.standard.ExecuteProcess org.apache.nifi.processors.standard.ExecuteProcess
org.apache.nifi.processors.standard.ExtractText org.apache.nifi.processors.standard.ExtractText
org.apache.nifi.processors.standard.FetchSFTP
org.apache.nifi.processors.standard.GenerateFlowFile org.apache.nifi.processors.standard.GenerateFlowFile
org.apache.nifi.processors.standard.GetFile org.apache.nifi.processors.standard.GetFile
org.apache.nifi.processors.standard.GetFTP org.apache.nifi.processors.standard.GetFTP
@ -43,6 +44,7 @@ org.apache.nifi.processors.standard.GetJMSQueue
org.apache.nifi.processors.standard.GetJMSTopic org.apache.nifi.processors.standard.GetJMSTopic
org.apache.nifi.processors.standard.ListenHTTP org.apache.nifi.processors.standard.ListenHTTP
org.apache.nifi.processors.standard.ListenUDP org.apache.nifi.processors.standard.ListenUDP
org.apache.nifi.processors.standard.ListSFTP
org.apache.nifi.processors.standard.LogAttribute org.apache.nifi.processors.standard.LogAttribute
org.apache.nifi.processors.standard.MergeContent org.apache.nifi.processors.standard.MergeContent
org.apache.nifi.processors.standard.ModifyBytes org.apache.nifi.processors.standard.ModifyBytes

View File

@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.ListableEntity;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestAbstractListProcessor {
@Test
public void testOnlyNewEntriesEmitted() {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", 1492L);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
runner.clearTransferState();
proc.addEntity("name", "id2", 1492L);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
runner.clearTransferState();
proc.addEntity("name", "id2", 1492L);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
proc.addEntity("name", "id3", 1491L);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
proc.addEntity("name", "id2", 1492L);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
proc.addEntity("name", "id2", 1493L);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
runner.clearTransferState();
proc.addEntity("name", "id2", 1493L);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
proc.addEntity("name", "id2", 1493L);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
proc.addEntity("name", "id", 1494L);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
runner.clearTransferState();
}
@Test
public void testStateStoredInDistributedService() throws InitializationException {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
final DistributedCache cache = new DistributedCache();
runner.addControllerService("cache", cache);
runner.enableControllerService(cache);
runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
runner.run();
proc.addEntity("name", "id", 1492L);
runner.run();
assertEquals(1, cache.stored.size());
}
@Test
public void testFetchOnStart() throws InitializationException {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
final DistributedCache cache = new DistributedCache();
runner.addControllerService("cache", cache);
runner.enableControllerService(cache);
runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
runner.run();
assertEquals(1, cache.fetchCount);
}
private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
private final Map<Object, Object> stored = new HashMap<>();
private int fetchCount = 0;
@Override
public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
return false;
}
@Override
public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
return null;
}
@Override
public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
return false;
}
@Override
public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
stored.put(key, value);
}
@Override
@SuppressWarnings("unchecked")
public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
fetchCount++;
return (V) stored.get(key);
}
@Override
public void close() throws IOException {
}
@Override
public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
final Object value = stored.remove(key);
return value != null;
}
}
private static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> {
private final List<ListableEntity> entities = new ArrayList<>();
@Override
protected File getPersistenceFile() {
return new File("target/ListProcessor-local-state.json");
}
public void addEntity(final String name, final String identifier, final long timestamp) {
final ListableEntity entity = new ListableEntity() {
@Override
public String getName() {
return name;
}
@Override
public String getIdentifier() {
return identifier;
}
@Override
public long getTimestamp() {
return timestamp;
}
};
entities.add(entity);
}
@Override
protected Map<String, String> createAttributes(final ListableEntity entity, final ProcessContext context) {
return Collections.emptyMap();
}
@Override
protected String getPath(final ProcessContext context) {
return "/path";
}
@Override
protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
return Collections.unmodifiableList(entities);
}
@Override
protected boolean isListingResetNecessary(PropertyDescriptor property) {
return false;
}
}
}

View File

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertFalse;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.PermissionDeniedException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestFetchFileTransfer {
@Test
public void testContentFetched() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
assertFalse(proc.closed);
runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0).assertContentEquals("world");
}
@Test
public void testContentNotFound() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 1);
}
@Test
public void testInsufficientPermissions() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
proc.addContent("hello.txt", "world".getBytes());
proc.allowAccess = false;
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1);
}
private static class TestableFetchFileTransfer extends FetchFileTransfer {
private boolean allowAccess = true;
private boolean closed = false;
private final Map<String, byte[]> fileContents = new HashMap<>();
public void addContent(final String filename, final byte[] content) {
this.fileContents.put(filename, content);
}
@Override
protected FileTransfer createFileTransfer(final ProcessContext context) {
return new FileTransfer() {
@Override
public void close() throws IOException {
closed = true;
}
@Override
public String getHomeDirectory(FlowFile flowFile) throws IOException {
return null;
}
@Override
public List<FileInfo> getListing() throws IOException {
return null;
}
@Override
public InputStream getInputStream(final String remoteFileName) throws IOException {
return getInputStream(remoteFileName, null);
}
@Override
public InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException {
if (!allowAccess) {
throw new PermissionDeniedException("test permission denied");
}
final byte[] content = fileContents.get(remoteFileName);
if (content == null) {
throw new FileNotFoundException();
}
return new ByteArrayInputStream(content);
}
@Override
public void flush() throws IOException {
}
@Override
public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException {
return null;
}
@Override
public String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException {
return null;
}
@Override
public void deleteFile(String path, String remoteFileName) throws IOException {
if (!fileContents.containsKey(remoteFileName)) {
throw new FileNotFoundException();
}
fileContents.remove(remoteFileName);
}
@Override
public void deleteDirectory(String remoteDirectoryName) throws IOException {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public String getProtocolName() {
return "test";
}
@Override
public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException {
}
};
}
}
}