mirror of https://github.com/apache/nifi.git
NIFI-1484 Making use of timestamps at various points of execution to provide listing of all but the latest files which are held until a subsequent execution.
Correcting nifi-amqp-nar bundle's pom description. This closes #212.
This commit is contained in:
parent
d01449ee72
commit
1a512cd1e6
|
@ -23,7 +23,7 @@
|
|||
<artifactId>nifi-amqp-bundle</artifactId>
|
||||
<version>0.5.0-SNAPSHOT</version>
|
||||
<packaging>pom</packaging>
|
||||
<description>A bundle of processors that run Flume sources/sinks</description>
|
||||
<description>A bundle of processors that publish to and consume messages from AMQP.</description>
|
||||
<modules>
|
||||
<module>nifi-amqp-processors</module>
|
||||
<module>nifi-amqp-nar</module>
|
||||
|
|
|
@ -23,13 +23,13 @@ import java.io.IOException;
|
|||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.Stateful;
|
||||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
|
@ -64,59 +64,44 @@ import org.codehaus.jackson.map.ObjectMapper;
|
|||
* Those remote resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that
|
||||
* we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* This class is responsible for triggering the listing to occur, filtering the results returned such that only new (unlisted) entities
|
||||
* or entities that have been modified will be emitted from the Processor.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* In order to make use of this abstract class, the entities listed must meet the following criteria:
|
||||
* </p>
|
||||
*
|
||||
* <ul>
|
||||
* <li>
|
||||
* Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
|
||||
* returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
|
||||
* </li>
|
||||
* <li>
|
||||
* Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is
|
||||
* new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
|
||||
* than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's
|
||||
* identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been
|
||||
* seen already.
|
||||
* If the timestamp of an entity is before OR equal to the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
|
||||
* than the last timestamp pulled, then the entity is considered new.
|
||||
* </li>
|
||||
* <li>
|
||||
* Entity must have a user-readable name that can be used for logging purposes.
|
||||
* </li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>
|
||||
* This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the remote system. This is performed using
|
||||
* two different mechanisms. First, state is stored locally. This allows the system to be restarted and begin processing where it left off. The state that is
|
||||
* stored is the latest timestamp that has been pulled (as determined by the timestamps of the entities that are returned), as well as the unique identifier of
|
||||
* each entity that has that timestamp. See the section above for information about how these pieces of information are used in order to determine entity uniqueness.
|
||||
* This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the target system given the above criteria. This is
|
||||
* performed using the {@link StateManager}. This allows the system to be restarted and begin processing where it left off. The state that is stored is the latest timestamp
|
||||
* that has been pulled (as determined by the timestamps of the entities that are returned). See the section above for information about how this information isused in order to
|
||||
* determine new entities.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* In addition to storing state locally, the Processor exposes an optional <code>Distributed Cache Service</code> property. In standalone deployment of NiFi, this is
|
||||
* not necessary. However, in a clustered environment, subclasses of this class are expected to be run only on primary node. While this means that the local state is
|
||||
* accurate as long as the primary node remains constant, the primary node in the cluster can be changed. As a result, if running in a clustered environment, it is
|
||||
* recommended that this property be set. This allows the same state that is described above to also be replicated across the cluster. If this property is set, then
|
||||
* on restart the Processor will not begin listing until it has retrieved an updated state from this service, as it does not know whether or not another node has
|
||||
* modified the state in the mean time.
|
||||
* NOTE: This processor performs migrations of legacy state mechanisms inclusive of locally stored, file-based state and the optional utilization of the <code>Distributed Cache
|
||||
* Service</code> property to the new {@link StateManager} functionality. Upon successful migration, the associated data from one or both of the legacy mechanisms is purged.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set
|
||||
* of attributes (defined by the concrete implementation) that can be used to fetch those remote resources or interact with them in whatever way makes sense for
|
||||
* the configured dataflow.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* Subclasses are responsible for the following:
|
||||
* </p>
|
||||
*
|
||||
* <ul>
|
||||
* <li>
|
||||
* Perform a listing of remote resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all
|
||||
|
@ -141,9 +126,10 @@ import org.codehaus.jackson.map.ObjectMapper;
|
|||
* </ul>
|
||||
*/
|
||||
@TriggerSerially
|
||||
@Stateful(scopes={Scope.LOCAL, Scope.CLUSTER}, description="After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state, "
|
||||
+ "along with all resources that have that same timestmap so that the Processor can avoid data duplication. The scope used depends on the implementation.")
|
||||
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. "
|
||||
+ "The scope used depends on the implementation.")
|
||||
public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
|
||||
|
||||
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("Distributed Cache Service")
|
||||
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node "
|
||||
|
@ -153,21 +139,25 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
.identifiesControllerService(DistributedMapCacheClient.class)
|
||||
.build();
|
||||
|
||||
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("All FlowFiles that are received are routed to success")
|
||||
.build();
|
||||
|
||||
|
||||
private volatile Set<String> latestIdentifiersListed = new HashSet<>();
|
||||
private volatile Long lastListingTime = null;
|
||||
private volatile Long lastProcessedTime = 0L;
|
||||
private volatile Long lastRunTime = 0L;
|
||||
private volatile boolean justElectedPrimaryNode = false;
|
||||
private volatile boolean resetListing = false;
|
||||
private volatile boolean resetState = false;
|
||||
|
||||
static final String TIMESTAMP = "timestamp";
|
||||
static final String IDENTIFIER_PREFIX = "id";
|
||||
/*
|
||||
* A constant used in determining an internal "yield" of processing files. Given the logic to provide a pause on the newest
|
||||
* files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled
|
||||
* near instantaneously after the prior iteration effectively voiding the built in buffer
|
||||
*/
|
||||
static final long LISTING_LAG_MILLIS = 100L;
|
||||
static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
|
||||
static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp";
|
||||
|
||||
protected File getPersistenceFile() {
|
||||
return new File("conf/state/" + getIdentifier());
|
||||
|
@ -183,12 +173,12 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
@Override
|
||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||
if (isConfigurationRestored() && isListingResetNecessary(descriptor)) {
|
||||
lastListingTime = null; // clear lastListingTime so that we have to fetch new time
|
||||
latestIdentifiersListed = new HashSet<>();
|
||||
resetListing = true;
|
||||
resetTimeStates(); // clear lastListingTime so that we have to fetch new time
|
||||
resetState = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
|
@ -218,26 +208,22 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
}
|
||||
}
|
||||
|
||||
// remove entry from Distributed cache server
|
||||
if (client != null) {
|
||||
try {
|
||||
client.remove(path, new StringSerDe());
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new "
|
||||
+ "State Management service, so the Distributed Cache Service is no longer needed.");
|
||||
}
|
||||
// When scheduled to run, check if the associated timestamp is null, signifying a clearing of state and reset the internal timestamp
|
||||
if (lastListingTime != null && stateMap.get(LISTING_TIMESTAMP_KEY) == null) {
|
||||
getLogger().info("Detected that state was cleared for this component. Resetting internal values.");
|
||||
resetTimeStates();
|
||||
}
|
||||
|
||||
if (resetListing) {
|
||||
if (resetState) {
|
||||
context.getStateManager().clear(getStateScope(context));
|
||||
resetListing = false;
|
||||
resetState = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This processor used to use the DistributedMapCacheClient in order to store cluster-wide state, before the introduction of
|
||||
* the StateManager. This method will migrate state from that DistributedMapCacheClient, or from a local file, to the StateManager,
|
||||
* if any state already exists
|
||||
* if any state already exists. More specifically, this will extract out the relevant timestamp for when the processor last ran
|
||||
*
|
||||
* @param path the path to migrate state for
|
||||
* @param client the DistributedMapCacheClient that is capable of obtaining the current state
|
||||
|
@ -247,20 +233,28 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
*/
|
||||
private void migrateState(final String path, final DistributedMapCacheClient client, final StateManager stateManager, final Scope scope) throws IOException {
|
||||
Long minTimestamp = null;
|
||||
final Set<String> latestIdentifiersListed = new HashSet<>();
|
||||
|
||||
// Retrieve state from Distributed Cache Client
|
||||
// Retrieve state from Distributed Cache Client, establishing the latest file seen
|
||||
if (client != null) {
|
||||
final StringSerDe serde = new StringSerDe();
|
||||
final String serializedState = client.get(getKey(path), serde, serde);
|
||||
if (serializedState != null && !serializedState.isEmpty()) {
|
||||
final EntityListing listing = deserialize(serializedState);
|
||||
minTimestamp = listing.getLatestTimestamp().getTime();
|
||||
latestIdentifiersListed.addAll(listing.getMatchingIdentifiers());
|
||||
}
|
||||
|
||||
// remove entry from distributed cache server
|
||||
if (client != null) {
|
||||
try {
|
||||
client.remove(path, new StringSerDe());
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new "
|
||||
+ "State Management service, so the Distributed Cache Service is no longer needed.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Retrieve state from locally persisted file
|
||||
// Retrieve state from locally persisted file, and compare these to the minTimestamp established from the distributedCache, if there was one
|
||||
final File persistenceFile = getPersistenceFile();
|
||||
if (persistenceFile.exists()) {
|
||||
final Properties props = new Properties();
|
||||
|
@ -273,10 +267,9 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
if (locallyPersistedValue != null) {
|
||||
final EntityListing listing = deserialize(locallyPersistedValue);
|
||||
final long localTimestamp = listing.getLatestTimestamp().getTime();
|
||||
// if the local file's latest timestamp is beyond that of the value provided from the cache, replace
|
||||
if (minTimestamp == null || localTimestamp > minTimestamp) {
|
||||
minTimestamp = localTimestamp;
|
||||
latestIdentifiersListed.clear();
|
||||
latestIdentifiersListed.addAll(listing.getMatchingIdentifiers());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -287,18 +280,14 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
}
|
||||
|
||||
if (minTimestamp != null) {
|
||||
persist(minTimestamp, latestIdentifiersListed, stateManager, scope);
|
||||
persist(minTimestamp, minTimestamp, stateManager, scope);
|
||||
}
|
||||
}
|
||||
|
||||
private void persist(final long timestamp, final Collection<String> identifiers, final StateManager stateManager, final Scope scope) throws IOException {
|
||||
final Map<String, String> updatedState = new HashMap<>(identifiers.size() + 1);
|
||||
updatedState.put(TIMESTAMP, String.valueOf(timestamp));
|
||||
int counter = 0;
|
||||
for (final String identifier : identifiers) {
|
||||
final String index = String.valueOf(++counter);
|
||||
updatedState.put(IDENTIFIER_PREFIX + "." + index, identifier);
|
||||
}
|
||||
private void persist(final long listingTimestamp, final long processedTimestamp, final StateManager stateManager, final Scope scope) throws IOException {
|
||||
final Map<String, String> updatedState = new HashMap<>(1);
|
||||
updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(listingTimestamp));
|
||||
updatedState.put(PROCESSED_TIMESTAMP_KEY, String.valueOf(processedTimestamp));
|
||||
stateManager.setState(updatedState, scope);
|
||||
}
|
||||
|
||||
|
@ -316,41 +305,38 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
Long minTimestamp = lastListingTime;
|
||||
|
||||
if (this.lastListingTime == null || this.lastProcessedTime == null || justElectedPrimaryNode) {
|
||||
try {
|
||||
// We need to fetch the state from the cluster if we don't yet know the last listing time,
|
||||
// or if we were just elected the primary node
|
||||
if (this.lastListingTime == null || justElectedPrimaryNode) {
|
||||
// Attempt to retrieve state from the state manager if a last listing was not yet established or
|
||||
// if just elected the primary node
|
||||
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
|
||||
final Map<String, String> stateValues = stateMap.toMap();
|
||||
final String timestamp = stateValues.get(TIMESTAMP);
|
||||
|
||||
if (timestamp == null) {
|
||||
minTimestamp = 0L;
|
||||
latestIdentifiersListed.clear();
|
||||
final String listingTimestampString = stateMap.get(LISTING_TIMESTAMP_KEY);
|
||||
final String lastProcessedString= stateMap.get(PROCESSED_TIMESTAMP_KEY);
|
||||
if (lastProcessedString != null) {
|
||||
this.lastProcessedTime = Long.parseLong(lastProcessedString);
|
||||
}
|
||||
if (listingTimestampString != null) {
|
||||
minTimestamp = Long.parseLong(listingTimestampString);
|
||||
// If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
|
||||
if (minTimestamp == this.lastListingTime) {
|
||||
context.yield();
|
||||
return;
|
||||
} else {
|
||||
minTimestamp = this.lastListingTime = Long.parseLong(timestamp);
|
||||
latestIdentifiersListed.clear();
|
||||
for (final Map.Entry<String, String> entry : stateValues.entrySet()) {
|
||||
final String key = entry.getKey();
|
||||
final String value = entry.getValue();
|
||||
if (TIMESTAMP.equals(key)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
latestIdentifiersListed.add(value);
|
||||
this.lastListingTime = minTimestamp;
|
||||
}
|
||||
}
|
||||
|
||||
justElectedPrimaryNode = false;
|
||||
}
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
final List<T> entityList;
|
||||
try {
|
||||
// track of when this last executed for consideration of the lag millis
|
||||
entityList = performListing(context, minTimestamp);
|
||||
} catch (final IOException e) {
|
||||
getLogger().error("Failed to perform listing on remote host due to {}", e);
|
||||
|
@ -358,65 +344,93 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
return;
|
||||
}
|
||||
|
||||
if (entityList == null) {
|
||||
if (entityList == null || entityList.isEmpty()) {
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
Long latestListingTimestamp = null;
|
||||
final List<T> newEntries = new ArrayList<>();
|
||||
for (final T entity : entityList) {
|
||||
final boolean newTimestamp = minTimestamp == null || entity.getTimestamp() > minTimestamp;
|
||||
final boolean newEntryForTimestamp = minTimestamp != null && entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier());
|
||||
final boolean list = newTimestamp || newEntryForTimestamp;
|
||||
final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
|
||||
|
||||
// Build a sorted map to determine the latest possible entries
|
||||
for (final T entity : entityList) {
|
||||
final long entityTimestamp = entity.getTimestamp();
|
||||
// New entries are all those that occur at or after the associated timestamp
|
||||
final boolean newEntry = minTimestamp == null || entityTimestamp >= minTimestamp && entityTimestamp > lastProcessedTime;
|
||||
|
||||
if (newEntry) {
|
||||
List<T> entitiesForTimestamp = orderedEntries.get(entity.getTimestamp());
|
||||
if (entitiesForTimestamp == null) {
|
||||
entitiesForTimestamp = new ArrayList<T>();
|
||||
orderedEntries.put(entity.getTimestamp(), entitiesForTimestamp);
|
||||
}
|
||||
entitiesForTimestamp.add(entity);
|
||||
}
|
||||
}
|
||||
|
||||
int flowfilesCreated = 0;
|
||||
|
||||
if (orderedEntries.size() > 0) {
|
||||
latestListingTimestamp = orderedEntries.lastKey();
|
||||
|
||||
// If the last listing time is equal to the newest entries previously seen,
|
||||
// another iteration has occurred without new files and special handling is needed to avoid starvation
|
||||
if (latestListingTimestamp.equals(lastListingTime)) {
|
||||
/* We are done when either:
|
||||
* - the latest listing timestamp is If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run
|
||||
* - the latest listing timestamp is equal to the last processed time, meaning we handled those items originally passed over
|
||||
*/
|
||||
if (System.currentTimeMillis() - lastRunTime < LISTING_LAG_MILLIS || latestListingTimestamp.equals(lastProcessedTime)) {
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
|
||||
orderedEntries.remove(latestListingTimestamp);
|
||||
}
|
||||
|
||||
for (List<T> timestampEntities : orderedEntries.values()) {
|
||||
for (T entity : timestampEntities) {
|
||||
// Create the FlowFile for this path.
|
||||
if (list) {
|
||||
final Map<String, String> attributes = createAttributes(entity, context);
|
||||
FlowFile flowFile = session.create();
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
// If we don't have a new timestamp but just have a new entry, we need to
|
||||
// add all of the previous entries to our entityList. If we have a new timestamp,
|
||||
// then the previous entries can go away.
|
||||
if (!newTimestamp) {
|
||||
newEntries.addAll(entityList);
|
||||
}
|
||||
newEntries.add(entity);
|
||||
|
||||
if (latestListingTimestamp == null || entity.getTimestamp() > latestListingTimestamp) {
|
||||
latestListingTimestamp = entity.getTimestamp();
|
||||
flowfilesCreated++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final int listCount = newEntries.size();
|
||||
if (listCount > 0) {
|
||||
getLogger().info("Successfully created listing with {} new objects", new Object[] {listCount});
|
||||
// As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated
|
||||
if (latestListingTimestamp != null) {
|
||||
boolean processedNewFiles = flowfilesCreated > 0;
|
||||
if (processedNewFiles) {
|
||||
// If there have been files created, update the last timestamp we processed
|
||||
lastProcessedTime = orderedEntries.lastKey();
|
||||
getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated});
|
||||
session.commit();
|
||||
}
|
||||
|
||||
// We have performed a listing and pushed the FlowFiles out.
|
||||
lastRunTime = System.currentTimeMillis();
|
||||
|
||||
if (!latestListingTimestamp.equals(lastListingTime) || processedNewFiles) {
|
||||
// We have performed a listing and pushed any FlowFiles out that may have been generated
|
||||
// Now, we need to persist state about the Last Modified timestamp of the newest file
|
||||
// that we pulled in. We do this in order to avoid pulling in the same file twice.
|
||||
// that we evaluated. We do this in order to avoid pulling in the same file twice.
|
||||
// However, we want to save the state both locally and remotely.
|
||||
// We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
|
||||
// previously Primary Node left off.
|
||||
// We also store the state locally so that if the node is restarted, and the node cannot contact
|
||||
// the distributed state cache, the node can continue to run (if it is primary node).
|
||||
final Set<String> identifiers = new HashSet<>(newEntries.size());
|
||||
try {
|
||||
for (final T entity : newEntries) {
|
||||
identifiers.add(entity.getIdentifier());
|
||||
}
|
||||
persist(latestListingTimestamp, identifiers, context.getStateManager(), getStateScope(context));
|
||||
lastListingTime = latestListingTimestamp;
|
||||
persist(latestListingTimestamp, lastProcessedTime, context.getStateManager(), getStateScope(context));
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
|
||||
+ "if another node begins executing this Processor, data duplication may occur.", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
lastListingTime = latestListingTimestamp;
|
||||
latestIdentifiersListed = identifiers;
|
||||
} else {
|
||||
getLogger().debug("There is no data to list. Yielding.");
|
||||
context.yield();
|
||||
|
@ -430,6 +444,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
}
|
||||
}
|
||||
|
||||
private void resetTimeStates() {
|
||||
lastListingTime = null;
|
||||
lastProcessedTime = 0L;
|
||||
lastRunTime = 0L;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Map of attributes that should be applied to the FlowFile to represent this entity. This processor will emit a FlowFile for each "new" entity
|
||||
|
@ -462,7 +481,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
*
|
||||
* @param context the ProcessContex to use in order to pull the appropriate entities
|
||||
* @param minTimestamp the minimum timestamp of entities that should be returned.
|
||||
*
|
||||
* @return a Listing of entities that have a timestamp >= minTimestamp
|
||||
*/
|
||||
protected abstract List<T> performListing(final ProcessContext context, final Long minTimestamp) throws IOException;
|
||||
|
|
|
@ -95,8 +95,8 @@ import org.apache.nifi.processors.standard.util.FileInfo;
|
|||
"rw-rw-r--")
|
||||
})
|
||||
@SeeAlso({GetFile.class, PutFile.class, FetchFile.class})
|
||||
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored, "
|
||||
+ "along with the filenames of all files that share that same timestamp. This allows the Processor to list only files that have been added or modified after "
|
||||
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored. "
|
||||
+ "This allows the Processor to list only files that have been added or modified after "
|
||||
+ "this date the next time that the Processor is run. Whether the state is stored with a Local or Cluster scope depends on the value of the "
|
||||
+ "<Input Directory Location> property.")
|
||||
public class ListFile extends AbstractListProcessor<FileInfo> {
|
||||
|
|
|
@ -51,8 +51,8 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer;
|
|||
@WritesAttribute(attribute = "filename", description = "The name of the file on the SFTP Server"),
|
||||
@WritesAttribute(attribute = "path", description = "The fully qualified name of the directory on the SFTP Server from which the file was pulled"),
|
||||
})
|
||||
@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored, "
|
||||
+ "along with the filename all files that share that same timestamp. This allows the Processor to list only files that have been added or modified after "
|
||||
@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored. "
|
||||
+ "This allows the Processor to list only files that have been added or modified after "
|
||||
+ "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if "
|
||||
+ "a new Primary Node is selected, the new node will not duplicate the data that was listed by the previous Primary Node.")
|
||||
public class ListSFTP extends ListFileTransfer {
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
|
@ -55,61 +54,127 @@ public class TestAbstractListProcessor {
|
|||
public final TemporaryFolder testFolder = new TemporaryFolder();
|
||||
|
||||
@Test
|
||||
public void testOnlyNewEntriesEmitted() {
|
||||
public void testOnlyNewEntriesEmitted() throws Exception {
|
||||
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
||||
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.run();
|
||||
|
||||
final long initialTimestamp = System.currentTimeMillis();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
proc.addEntity("name", "id", 1492L);
|
||||
proc.addEntity("name", "id", initialTimestamp);
|
||||
proc.addEntity("name", "id2", initialTimestamp);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
|
||||
// First run, the above listed entries would be skipped to avoid write synchronization issues
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
proc.addEntity("name", "id2", 1492L);
|
||||
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
// Running again, our two previously seen files are now cleared to be released
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
|
||||
runner.clearTransferState();
|
||||
|
||||
proc.addEntity("name", "id2", 1492L);
|
||||
// Verify no new old files show up
|
||||
proc.addEntity("name", "id2", initialTimestamp);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
proc.addEntity("name", "id3", 1491L);
|
||||
proc.addEntity("name", "id3", initialTimestamp - 1);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
proc.addEntity("name", "id2", 1492L);
|
||||
proc.addEntity("name", "id2", initialTimestamp);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
proc.addEntity("name", "id2", 1493L);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
|
||||
runner.clearTransferState();
|
||||
// Now a new file beyond the current time enters
|
||||
proc.addEntity("name", "id2", initialTimestamp + 1);
|
||||
|
||||
proc.addEntity("name", "id2", 1493L);
|
||||
// Nothing occurs for the first iteration as it is withheld
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
proc.addEntity("name", "id2", 1493L);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
proc.addEntity("name", "id", 1494L);
|
||||
// But it should now show up that the appropriate pause has been eclipsed
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
|
||||
runner.clearTransferState();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStateStoredInClusterStateManagement() throws InitializationException {
|
||||
public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception {
|
||||
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
||||
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
|
||||
final long initialTimestamp = System.currentTimeMillis();
|
||||
|
||||
proc.addEntity("name", "id", initialTimestamp);
|
||||
proc.addEntity("name", "id2", initialTimestamp);
|
||||
|
||||
// Emulate having state but not having had the processor run such as in a restart
|
||||
final Map<String, String> preexistingState = new HashMap<>();
|
||||
preexistingState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, Long.toString(initialTimestamp));
|
||||
preexistingState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, Long.toString(initialTimestamp));
|
||||
runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
|
||||
|
||||
// run for the first time
|
||||
runner.run();
|
||||
|
||||
// First run, the above listed entries would be skipped
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
// Running again, these files should be eligible for transfer and again skipped
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Verify no new old files show up
|
||||
proc.addEntity("name", "id2", initialTimestamp);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
proc.addEntity("name", "id3", initialTimestamp - 1);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
proc.addEntity("name", "id2", initialTimestamp);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Now a new file beyond the current time enters
|
||||
proc.addEntity("name", "id2", initialTimestamp + 1);
|
||||
|
||||
// Nothing occurs for the first iteration as it is withheld
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
// But it should now show up that the appropriate pause has been eclipsed
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
|
||||
runner.clearTransferState();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStateStoredInClusterStateManagement() throws Exception {
|
||||
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
||||
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
final DistributedCache cache = new DistributedCache();
|
||||
|
@ -123,8 +188,17 @@ public class TestAbstractListProcessor {
|
|||
runner.run();
|
||||
|
||||
final Map<String, String> expectedState = new HashMap<>();
|
||||
expectedState.put(AbstractListProcessor.TIMESTAMP, "1492");
|
||||
expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id");
|
||||
// Ensure only timestamp is migrated
|
||||
expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
|
||||
expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0");
|
||||
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS);
|
||||
|
||||
runner.run();
|
||||
// Ensure only timestamp is migrated
|
||||
expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
|
||||
expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492");
|
||||
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
|
||||
}
|
||||
|
||||
|
@ -145,8 +219,9 @@ public class TestAbstractListProcessor {
|
|||
|
||||
final MockStateManager stateManager = runner.getStateManager();
|
||||
final Map<String, String> expectedState = new HashMap<>();
|
||||
expectedState.put(AbstractListProcessor.TIMESTAMP, "1492");
|
||||
expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id");
|
||||
// Ensure only timestamp is migrated
|
||||
expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
|
||||
expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492");
|
||||
stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
|
||||
}
|
||||
|
||||
|
@ -188,12 +263,58 @@ public class TestAbstractListProcessor {
|
|||
|
||||
// Verify the state manager now maintains the associated state
|
||||
final Map<String, String> expectedState = new HashMap<>();
|
||||
expectedState.put(AbstractListProcessor.TIMESTAMP, "1492");
|
||||
expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id");
|
||||
|
||||
// Ensure only timestamp is migrated
|
||||
expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
|
||||
expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492");
|
||||
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResumeListingAfterClearingState() throws Exception {
|
||||
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
||||
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
|
||||
final long initialEventTimestamp = System.currentTimeMillis();
|
||||
proc.addEntity("name", "id", initialEventTimestamp);
|
||||
proc.addEntity("name", "id2", initialEventTimestamp);
|
||||
|
||||
// Add entities but these should not be transferred as they are the latest values
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
|
||||
// after providing a pause in listings, the files should now transfer
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Verify entities are not transferred again for the given state
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Clear state for this processor, eradicating timestamp
|
||||
runner.getStateManager().clear(Scope.CLUSTER);
|
||||
Assert.assertEquals("State is not empty for this component after clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size());
|
||||
|
||||
|
||||
// As before, we are unsure of when these files were delivered relative to system time, and additional cycle(s) need to occur before transfer
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
// Ensure the original files are now transferred again.
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
|
||||
runner.clearTransferState();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchOnStart() throws InitializationException {
|
||||
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
||||
|
@ -209,42 +330,66 @@ public class TestAbstractListProcessor {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testOnlyNewStateStored() throws IOException {
|
||||
public void testOnlyNewStateStored() throws Exception {
|
||||
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
||||
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.run();
|
||||
|
||||
final long initialTimestamp = System.currentTimeMillis();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
proc.addEntity("name", "id", 1492L);
|
||||
proc.addEntity("name", "id2", 1492L);
|
||||
proc.addEntity("name", "id", initialTimestamp);
|
||||
proc.addEntity("name", "id2", initialTimestamp);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
|
||||
runner.clearTransferState();
|
||||
|
||||
final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER);
|
||||
assertEquals(1, stateMap.getVersion());
|
||||
assertEquals(2, stateMap.getVersion());
|
||||
|
||||
final Map<String, String> map = stateMap.toMap();
|
||||
assertEquals(3, map.size());
|
||||
assertEquals("1492", map.get("timestamp"));
|
||||
assertTrue(map.containsKey("id.1"));
|
||||
assertTrue(map.containsKey("id.2"));
|
||||
// Ensure only timestamp is migrated
|
||||
assertEquals(2, map.size());
|
||||
assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
|
||||
assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
|
||||
|
||||
proc.addEntity("new name", "new id", 1493L);
|
||||
proc.addEntity("new name", "new id", initialTimestamp + 1);
|
||||
runner.run();
|
||||
|
||||
// Verify that the new entry has not been emitted but it has triggered an updated state
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER);
|
||||
assertEquals(3, updatedStateMap.getVersion());
|
||||
|
||||
assertEquals(2, updatedStateMap.toMap().size());
|
||||
assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
|
||||
// Processed timestamp is lagging behind currently
|
||||
assertEquals(Long.toString(initialTimestamp), updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
|
||||
final StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER);
|
||||
assertEquals(2, updatedStateMap.getVersion());
|
||||
runner.clearTransferState();
|
||||
|
||||
final Map<String, String> updatedValues = updatedStateMap.toMap();
|
||||
assertEquals(2, updatedValues.size());
|
||||
assertEquals("1493", updatedValues.get("timestamp"));
|
||||
assertEquals("new id", updatedValues.get("id.1"));
|
||||
updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER);
|
||||
assertEquals(4, updatedStateMap.getVersion());
|
||||
|
||||
assertEquals(2, updatedStateMap.toMap().size());
|
||||
assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
|
||||
// Processed timestamp is now caught up
|
||||
assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
|
||||
}
|
||||
|
||||
|
||||
private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
|
||||
private final Map<Object, Object> stored = new HashMap<>();
|
||||
private int fetchCount = 0;
|
||||
|
|
|
@ -111,10 +111,17 @@ public class TestListFile {
|
|||
|
||||
// process first file and set new timestamp
|
||||
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
assertEquals(1, successFiles.size());
|
||||
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
assertEquals(1, successFiles1.size());
|
||||
|
||||
// create second file
|
||||
final File file2 = new File(TESTDIR + "/listing2.txt");
|
||||
|
@ -123,6 +130,12 @@ public class TestListFile {
|
|||
|
||||
// process second file after timestamp
|
||||
runner.clearTransferState();
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
|
@ -147,11 +160,18 @@ public class TestListFile {
|
|||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
assertEquals(3, successFiles4.size());
|
||||
assertEquals(2, successFiles4.size());
|
||||
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterAge() throws IOException {
|
||||
public void testFilterAge() throws Exception {
|
||||
final File file1 = new File(TESTDIR + "/age1.txt");
|
||||
assertTrue(file1.createNewFile());
|
||||
assertTrue(file1.setLastModified(time0millis));
|
||||
|
@ -166,10 +186,16 @@ public class TestListFile {
|
|||
|
||||
// check all files
|
||||
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 2);
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
assertEquals(3, successFiles1.size());
|
||||
assertEquals(1, successFiles1.size());
|
||||
|
||||
// exclude oldest
|
||||
runner.clearTransferState();
|
||||
|
@ -178,7 +204,13 @@ public class TestListFile {
|
|||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
assertEquals(2, successFiles2.size());
|
||||
assertEquals(1, successFiles2.size());
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 1);
|
||||
|
||||
// exclude newest
|
||||
runner.clearTransferState();
|
||||
|
@ -187,7 +219,7 @@ public class TestListFile {
|
|||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
assertEquals(2, successFiles3.size());
|
||||
assertEquals(1, successFiles3.size());
|
||||
|
||||
// exclude oldest and newest
|
||||
runner.clearTransferState();
|
||||
|
@ -196,11 +228,18 @@ public class TestListFile {
|
|||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
assertEquals(1, successFiles4.size());
|
||||
assertEquals(0, successFiles4.size());
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterSize() throws IOException {
|
||||
public void testFilterSize() throws Exception {
|
||||
final byte[] bytes1000 = new byte[1000];
|
||||
final byte[] bytes5000 = new byte[5000];
|
||||
final byte[] bytes10000 = new byte[10000];
|
||||
|
@ -227,6 +266,13 @@ public class TestListFile {
|
|||
// check all files
|
||||
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
assertEquals(3, successFiles1.size());
|
||||
|
@ -238,6 +284,14 @@ public class TestListFile {
|
|||
runner.setProperty(ListFile.MIN_SIZE, "0 b");
|
||||
runner.setProperty(ListFile.MAX_SIZE, "7500 b");
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
assertEquals(2, successFiles2.size());
|
||||
|
@ -248,6 +302,12 @@ public class TestListFile {
|
|||
runner.removeProperty(ListFile.MAX_AGE);
|
||||
runner.setProperty(ListFile.MIN_SIZE, "2500 b");
|
||||
runner.removeProperty(ListFile.MAX_SIZE);
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
|
@ -260,13 +320,20 @@ public class TestListFile {
|
|||
runner.setProperty(ListFile.MIN_SIZE, "2500 b");
|
||||
runner.setProperty(ListFile.MAX_SIZE, "7500 b");
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
assertEquals(1, successFiles4.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterHidden() throws IOException {
|
||||
public void testFilterHidden() throws Exception {
|
||||
FileOutputStream fos;
|
||||
|
||||
final File file1 = new File(TESTDIR + "/hidden1.txt");
|
||||
|
@ -292,6 +359,11 @@ public class TestListFile {
|
|||
runner.removeProperty(ListFile.MIN_SIZE);
|
||||
runner.removeProperty(ListFile.MAX_SIZE);
|
||||
runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "false");
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
|
@ -300,6 +372,11 @@ public class TestListFile {
|
|||
// exclude hidden
|
||||
runner.clearTransferState();
|
||||
runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "true");
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
|
@ -307,7 +384,7 @@ public class TestListFile {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testFilterFilePattern() throws IOException {
|
||||
public void testFilterFilePattern() throws Exception {
|
||||
final File file1 = new File(TESTDIR + "/file1-abc-apple.txt");
|
||||
assertTrue(file1.createNewFile());
|
||||
|
||||
|
@ -324,6 +401,11 @@ public class TestListFile {
|
|||
runner.clearTransferState();
|
||||
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||
runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue());
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
|
@ -332,6 +414,11 @@ public class TestListFile {
|
|||
// filter file on pattern
|
||||
runner.clearTransferState();
|
||||
runner.setProperty(ListFile.FILE_FILTER, ".*-xyz-.*");
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
|
@ -339,7 +426,7 @@ public class TestListFile {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testFilterPathPattern() throws IOException {
|
||||
public void testFilterPathPattern() throws Exception {
|
||||
final File subdir1 = new File(TESTDIR + "/subdir1");
|
||||
assertTrue(subdir1.mkdirs());
|
||||
|
||||
|
@ -362,6 +449,11 @@ public class TestListFile {
|
|||
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||
runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue());
|
||||
runner.setProperty(ListFile.RECURSE, "true");
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
|
@ -371,6 +463,11 @@ public class TestListFile {
|
|||
runner.clearTransferState();
|
||||
runner.setProperty(ListFile.PATH_FILTER, "subdir1");
|
||||
runner.setProperty(ListFile.RECURSE, "true");
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
|
@ -380,6 +477,11 @@ public class TestListFile {
|
|||
runner.clearTransferState();
|
||||
runner.setProperty(ListFile.PATH_FILTER, "subdir2");
|
||||
runner.setProperty(ListFile.RECURSE, "true");
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
|
@ -387,7 +489,7 @@ public class TestListFile {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testRecurse() throws IOException {
|
||||
public void testRecurse() throws Exception {
|
||||
final File subdir1 = new File(TESTDIR + "/subdir1");
|
||||
assertTrue(subdir1.mkdirs());
|
||||
|
||||
|
@ -407,6 +509,12 @@ public class TestListFile {
|
|||
runner.clearTransferState();
|
||||
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||
runner.setProperty(ListFile.RECURSE, "true");
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 3);
|
||||
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
|
@ -435,13 +543,21 @@ public class TestListFile {
|
|||
runner.clearTransferState();
|
||||
runner.setProperty(ListFile.RECURSE, "false");
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
assertEquals(1, successFiles2.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadable() throws IOException {
|
||||
public void testReadable() throws Exception {
|
||||
final File file1 = new File(TESTDIR + "/file1.txt");
|
||||
assertTrue(file1.createNewFile());
|
||||
|
||||
|
@ -455,14 +571,19 @@ public class TestListFile {
|
|||
runner.clearTransferState();
|
||||
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||
runner.setProperty(ListFile.RECURSE, "true");
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
assertEquals(3, successFiles1.size());
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAttributesSet() throws IOException {
|
||||
public void testAttributesSet() throws Exception {
|
||||
// create temp file and time constant
|
||||
final File file1 = new File(TESTDIR + "/file1.txt");
|
||||
assertTrue(file1.createNewFile());
|
||||
|
@ -477,6 +598,13 @@ public class TestListFile {
|
|||
runner.clearTransferState();
|
||||
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||
runner.run();
|
||||
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
assertEquals(1, successFiles1.size());
|
||||
|
|
Loading…
Reference in New Issue