NIFI-4069: Make ListXXX work with timestamp precision in seconds or minutes

- Refactored variable names to better represents what those are meant for.
- Added deterministic logic which detects target filesystem timestamp precision and adjust lag time based on it.
- Changed from using System.nanoTime() to System.currentTimeMillis in test because Java File API reports timestamp in milliseconds at the best granularity. Also, System.nanoTime should not be used in mix with epoch milliseconds because it uses arbitrary origin and measured differently.
- Changed TestListFile to use more longer interval between file timestamps those are used by testFilterAge to provide more consistent test result because sleep time can be longer with filesystems whose timestamp in seconds precision.
- Added logging at TestListFile.
- Added TestWatcher to dump state in case assertion fails for further investigation.
- Added Timestamp Precision property so that user can set if auto-detect is not enough
- Adjust timestamps for ages test

This closes #1915.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Koji Kawamura 2017-06-14 15:21:01 +09:00 committed by Bryan Bende
parent a4e729c7a7
commit 28ee70222b
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
9 changed files with 463 additions and 130 deletions

View File

@ -70,7 +70,7 @@
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>compile</scope>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -37,6 +38,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManager;
@ -138,14 +140,42 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
.identifiesControllerService(DistributedMapCacheClient.class) .identifiesControllerService(DistributedMapCacheClient.class)
.build(); .build();
public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect",
"Automatically detect time unit deterministically based on candidate entries timestamp."
+ " Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp."
+ " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'.");
public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds",
"This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options.");
public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds","For a target system that does not have millis precision, but has in seconds.");
public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");
public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new PropertyDescriptor.Builder()
.name("target-system-timestamp-precision")
.displayName("Target System Timestamp Precision")
.description("Specify timestamp precision at the target system."
+ " Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.")
.required(true)
.allowableValues(PRECISION_AUTO_DETECT, PRECISION_MILLIS, PRECISION_SECONDS, PRECISION_MINUTES)
.defaultValue(PRECISION_AUTO_DETECT.getValue())
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("All FlowFiles that are received are routed to success") .description("All FlowFiles that are received are routed to success")
.build(); .build();
private volatile Long lastListingTime = null; /**
private volatile Long lastProcessedTime = 0L; * Represents the timestamp of an entity which was the latest one within those listed at the previous cycle.
private volatile Long lastRunTime = 0L; * It does not necessary mean it has been processed as well.
* Whether it was processed or not depends on target system time precision and how old the entity timestamp was.
*/
private volatile Long lastListedLatestEntryTimestampMillis = null;
/**
* Represents the timestamp of an entity which was the latest one
* within those picked up and written to the output relationship at the previous cycle.
*/
private volatile Long lastProcessedLatestEntryTimestampMillis = 0L;
private volatile Long lastRunTimeNanos = 0L;
private volatile boolean justElectedPrimaryNode = false; private volatile boolean justElectedPrimaryNode = false;
private volatile boolean resetState = false; private volatile boolean resetState = false;
@ -154,9 +184,16 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
* files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled * files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled
* near instantaneously after the prior iteration effectively voiding the built in buffer * near instantaneously after the prior iteration effectively voiding the built in buffer
*/ */
public static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L); public static final Map<TimeUnit, Long> LISTING_LAG_MILLIS;
static final String LISTING_TIMESTAMP_KEY = "listing.timestamp"; static {
static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp"; final Map<TimeUnit, Long> nanos = new HashMap<>();
nanos.put(TimeUnit.MILLISECONDS, 100L);
nanos.put(TimeUnit.SECONDS, 1_000L);
nanos.put(TimeUnit.MINUTES, 60_000L);
LISTING_LAG_MILLIS = Collections.unmodifiableMap(nanos);
}
static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
public File getPersistenceFile() { public File getPersistenceFile() {
return new File("conf/state/" + getIdentifier()); return new File("conf/state/" + getIdentifier());
@ -166,6 +203,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DISTRIBUTED_CACHE_SERVICE); properties.add(DISTRIBUTED_CACHE_SERVICE);
properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
return properties; return properties;
} }
@ -208,7 +246,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
} }
// When scheduled to run, check if the associated timestamp is null, signifying a clearing of state and reset the internal timestamp // When scheduled to run, check if the associated timestamp is null, signifying a clearing of state and reset the internal timestamp
if (lastListingTime != null && stateMap.get(LISTING_TIMESTAMP_KEY) == null) { if (lastListedLatestEntryTimestampMillis != null && stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY) == null) {
getLogger().info("Detected that state was cleared for this component. Resetting internal values."); getLogger().info("Detected that state was cleared for this component. Resetting internal values.");
resetTimeStates(); resetTimeStates();
} }
@ -283,10 +321,12 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
} }
} }
private void persist(final long listingTimestamp, final long processedTimestamp, final StateManager stateManager, final Scope scope) throws IOException { private void persist(final long latestListedEntryTimestampThisCycleMillis,
final long lastProcessedLatestEntryTimestampMillis,
final StateManager stateManager, final Scope scope) throws IOException {
final Map<String, String> updatedState = new HashMap<>(1); final Map<String, String> updatedState = new HashMap<>(1);
updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(listingTimestamp)); updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
updatedState.put(PROCESSED_TIMESTAMP_KEY, String.valueOf(processedTimestamp)); updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis));
stateManager.setState(updatedState, scope); stateManager.setState(updatedState, scope);
} }
@ -303,26 +343,26 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
Long minTimestamp = lastListingTime; Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;
if (this.lastListingTime == null || this.lastProcessedTime == null || justElectedPrimaryNode) { if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
try { try {
// Attempt to retrieve state from the state manager if a last listing was not yet established or // Attempt to retrieve state from the state manager if a last listing was not yet established or
// if just elected the primary node // if just elected the primary node
final StateMap stateMap = context.getStateManager().getState(getStateScope(context)); final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
final String listingTimestampString = stateMap.get(LISTING_TIMESTAMP_KEY); final String latestListedEntryTimestampString = stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY);
final String lastProcessedString= stateMap.get(PROCESSED_TIMESTAMP_KEY); final String lastProcessedLatestEntryTimestampString= stateMap.get(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY);
if (lastProcessedString != null) { if (lastProcessedLatestEntryTimestampString != null) {
this.lastProcessedTime = Long.parseLong(lastProcessedString); this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(lastProcessedLatestEntryTimestampString);
} }
if (listingTimestampString != null) { if (latestListedEntryTimestampString != null) {
minTimestamp = Long.parseLong(listingTimestampString); minTimestampToListMillis = Long.parseLong(latestListedEntryTimestampString);
// If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates // If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
if (minTimestamp == this.lastListingTime) { if (minTimestampToListMillis == this.lastListedLatestEntryTimestampMillis) {
context.yield(); context.yield();
return; return;
} else { } else {
this.lastListingTime = minTimestamp; this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
} }
} }
justElectedPrimaryNode = false; justElectedPrimaryNode = false;
@ -334,10 +374,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
} }
final List<T> entityList; final List<T> entityList;
final long currentListingTimestamp = System.nanoTime(); final long currentRunTimeNanos = System.nanoTime();
final long currentRunTimeMillis = System.currentTimeMillis();
try { try {
// track of when this last executed for consideration of the lag nanos // track of when this last executed for consideration of the lag nanos
entityList = performListing(context, minTimestamp); entityList = performListing(context, minTimestampToListMillis);
} catch (final IOException e) { } catch (final IOException e) {
getLogger().error("Failed to perform listing on remote host due to {}", e); getLogger().error("Failed to perform listing on remote host due to {}", e);
context.yield(); context.yield();
@ -349,14 +390,22 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
return; return;
} }
Long latestListingTimestamp = null; Long latestListedEntryTimestampThisCycleMillis = null;
final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>(); final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
// Build a sorted map to determine the latest possible entries // Build a sorted map to determine the latest possible entries
boolean targetSystemHasMilliseconds = false;
boolean targetSystemHasSeconds = false;
for (final T entity : entityList) { for (final T entity : entityList) {
final long entityTimestamp = entity.getTimestamp(); final long entityTimestampMillis = entity.getTimestamp();
if (!targetSystemHasMilliseconds) {
targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0;
}
if (!targetSystemHasSeconds) {
targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
}
// New entries are all those that occur at or after the associated timestamp // New entries are all those that occur at or after the associated timestamp
final boolean newEntry = minTimestamp == null || entityTimestamp >= minTimestamp && entityTimestamp > lastProcessedTime; final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis > lastProcessedLatestEntryTimestampMillis;
if (newEntry) { if (newEntry) {
List<T> entitiesForTimestamp = orderedEntries.get(entity.getTimestamp()); List<T> entitiesForTimestamp = orderedEntries.get(entity.getTimestamp());
@ -371,24 +420,42 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
int flowfilesCreated = 0; int flowfilesCreated = 0;
if (orderedEntries.size() > 0) { if (orderedEntries.size() > 0) {
latestListingTimestamp = orderedEntries.lastKey(); latestListedEntryTimestampThisCycleMillis = orderedEntries.lastKey();
// Determine target system time precision.
final String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
final TimeUnit targetSystemTimePrecision
= PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
: PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS
: PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES;
final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
// If the last listing time is equal to the newest entries previously seen, // If the last listing time is equal to the newest entries previously seen,
// another iteration has occurred without new files and special handling is needed to avoid starvation // another iteration has occurred without new files and special handling is needed to avoid starvation
if (latestListingTimestamp.equals(lastListingTime)) { if (latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis)) {
/* We are done when either: /* We need to wait for another cycle when either:
* - the latest listing timestamp is If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run * - If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run
* - the latest listing timestamp is equal to the last processed time, meaning we handled those items originally passed over * - The latest listed entity timestamp is equal to the last processed time, meaning we handled those items originally passed over. No need to process it again.
*/ */
if (System.nanoTime() - lastRunTime < LISTING_LAG_NANOS || latestListingTimestamp.equals(lastProcessedTime)) { final long listingLagNanos = TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos || latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)) {
context.yield(); context.yield();
return; return;
} }
} else if (latestListingTimestamp >= currentListingTimestamp - LISTING_LAG_NANOS) { } else {
// Convert minimum reliable timestamp into target system time unit, in order to truncate unreliable digits.
final long minimumReliableTimestampInFilesystemTimeUnit = targetSystemTimePrecision.convert(currentRunTimeMillis - listingLagMillis, TimeUnit.MILLISECONDS);
final long minimumReliableTimestampMillis = targetSystemTimePrecision.toMillis(minimumReliableTimestampInFilesystemTimeUnit);
// If the latest listed entity is not old enough, compared with the minimum timestamp, then wait for another cycle.
// The minimum timestamp should be reliable to determine that no further entries will be added with the same timestamp based on the target system time precision.
if (minimumReliableTimestampMillis < latestListedEntryTimestampThisCycleMillis) {
// Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
orderedEntries.remove(latestListingTimestamp); orderedEntries.remove(latestListedEntryTimestampThisCycleMillis);
} }
}
for (List<T> timestampEntities : orderedEntries.values()) { for (List<T> timestampEntities : orderedEntries.values()) {
for (T entity : timestampEntities) { for (T entity : timestampEntities) {
@ -403,18 +470,20 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
} }
// As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated // As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated
if (latestListingTimestamp != null) { if (latestListedEntryTimestampThisCycleMillis != null) {
boolean processedNewFiles = flowfilesCreated > 0; boolean processedNewFiles = flowfilesCreated > 0;
if (processedNewFiles) { if (processedNewFiles) {
// If there have been files created, update the last timestamp we processed // If there have been files created, update the last timestamp we processed.
lastProcessedTime = orderedEntries.lastKey(); // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
// because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated}); getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated});
session.commit(); session.commit();
} }
lastRunTime = System.nanoTime(); lastRunTimeNanos = currentRunTimeNanos;
if (!latestListingTimestamp.equals(lastListingTime) || processedNewFiles) { if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
// We have performed a listing and pushed any FlowFiles out that may have been generated // We have performed a listing and pushed any FlowFiles out that may have been generated
// Now, we need to persist state about the Last Modified timestamp of the newest file // Now, we need to persist state about the Last Modified timestamp of the newest file
// that we evaluated. We do this in order to avoid pulling in the same file twice. // that we evaluated. We do this in order to avoid pulling in the same file twice.
@ -424,8 +493,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
// We also store the state locally so that if the node is restarted, and the node cannot contact // We also store the state locally so that if the node is restarted, and the node cannot contact
// the distributed state cache, the node can continue to run (if it is primary node). // the distributed state cache, the node can continue to run (if it is primary node).
try { try {
lastListingTime = latestListingTimestamp; lastListedLatestEntryTimestampMillis = latestListedEntryTimestampThisCycleMillis;
persist(latestListingTimestamp, lastProcessedTime, context.getStateManager(), getStateScope(context)); persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, context.getStateManager(), getStateScope(context));
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or " getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
+ "if another node begins executing this Processor, data duplication may occur.", ioe); + "if another node begins executing this Processor, data duplication may occur.", ioe);
@ -437,8 +506,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
context.yield(); context.yield();
// lastListingTime = 0 so that we don't continually poll the distributed cache / local file system // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
if (lastListingTime == null) { if (lastListedLatestEntryTimestampMillis == null) {
lastListingTime = 0L; lastListedLatestEntryTimestampMillis = 0L;
} }
return; return;
@ -446,9 +515,9 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
} }
private void resetTimeStates() { private void resetTimeStates() {
lastListingTime = null; lastListedLatestEntryTimestampMillis = null;
lastProcessedTime = 0L; lastProcessedLatestEntryTimestampMillis = 0L;
lastRunTime = 0L; lastRunTimeNanos = 0L;
} }
/** /**

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.list;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* This class provides a way to dump list-able entities, processor state and transferred FlowFiles into 'success' relationship,
* which is useful to debug test issues especially at automation test environment such as Travis that is difficult to debug.
*/
public class ListProcessorTestWatcher extends TestWatcher {
private static final Logger logger = LoggerFactory.getLogger(ListProcessorTestWatcher.class);
private static final Consumer<String> logStateDump = logger::info;
@FunctionalInterface
public interface Provider<T> {
T provide();
}
private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
private final Provider<Map<String, String>> stateMapProvider;
private final Provider<List<ListableEntity>> entitiesProvider;
private final Provider<List<FlowFile>> successFlowFilesProvider;
private long startedAtMillis;
public ListProcessorTestWatcher(Provider<Map<String, String>> stateMapProvider, Provider<List<ListableEntity>> entitiesProvider, Provider<List<FlowFile>> successFlowFilesProvider) {
this.stateMapProvider = stateMapProvider;
this.entitiesProvider = entitiesProvider;
this.successFlowFilesProvider = successFlowFilesProvider;
}
private void log(Consumer<String> dumper, String format, Object ... args) {
dumper.accept(String.format(format, args));
}
public void dumpState(final long start) {
dumpState(logStateDump, stateMapProvider.provide(), entitiesProvider.provide(), successFlowFilesProvider.provide(), start);
}
private void dumpState(Consumer<String> d, final Map<String, String> state, final List<ListableEntity> entities, final List<FlowFile> flowFiles, final long start) {
final long nTime = System.currentTimeMillis();
log(d, "--------------------------------------------------------------------");
log(d, "%-19s %-13s %-23s %s", "", "timestamp", "date from timestamp", "t0 delta");
log(d, "%-19s %-13s %-23s %s", "-------------------", "-------------", "-----------------------", "--------");
log(d, "%-19s = %13d %s %8d", "started at", start, dateFormat.format(start), 0);
log(d, "%-19s = %13d %s %8d", "current time", nTime, dateFormat.format(nTime), 0);
log(d, "---- processor state -----------------------------------------------");
if (state.containsKey("processed.timestamp")) {
final long pTime = Long.parseLong(state.get("processed.timestamp"));
log(d, "%19s = %13d %s %8d", "processed.timestamp", pTime, dateFormat.format(pTime), pTime - nTime);
} else {
log(d, "%19s = na", "processed.timestamp");
}
if (state.containsKey("listing.timestamp")) {
final long lTime = Long.parseLong(state.get("listing.timestamp"));
log(d, "%19s = %13d %s %8d", "listing.timestamp", lTime, dateFormat.format(lTime), lTime - nTime);
} else {
log(d, "%19s = na", "listing.timestamp");
}
log(d, "---- input folder contents -----------------------------------------");
entities.sort(Comparator.comparing(ListableEntity::getIdentifier));
for (ListableEntity entity : entities) {
log(d, "%19s = %12d %s %8d", entity.getIdentifier(), entity.getTimestamp(), dateFormat.format(entity.getTimestamp()), entity.getTimestamp() - nTime);
}
log(d, "---- output flowfiles ----------------------------------------------");
final Map<String, Long> fileTimes = entities.stream().collect(Collectors.toMap(ListableEntity::getIdentifier, ListableEntity::getTimestamp));
for (FlowFile ff : flowFiles) {
String fName = ff.getAttribute(CoreAttributes.FILENAME.key());
Long fTime = fileTimes.get(fName);
log(d, "%19s = %13d %s %8d", fName, fTime, dateFormat.format(fTime), fTime - nTime);
}
log(d, "REL_SUCCESS count = " + flowFiles.size());
log(d, "--------------------------------------------------------------------");
log(d, "");
}
@Override
protected void starting(Description description) {
startedAtMillis = System.currentTimeMillis();
}
/**
* Throw additional AssertionError with stateDump as its message.
*/
@Override
protected void failed(Throwable e, Description description) {
if (!(e instanceof AssertionError)) {
return;
}
final StringBuilder msg = new StringBuilder("State dump:\n");
dumpState(s -> msg.append(s).append("\n"),
stateMapProvider.provide(),
entitiesProvider.provide(),
successFlowFilesProvider.provide(),
startedAtMillis);
throw new AssertionError(msg);
}
}

View File

@ -32,7 +32,7 @@ public interface ListableEntity {
/** /**
* @return the timestamp for this entity so that we can be efficient about not performing listings of the same * @return the timestamp for this entity in milliseconds so that we can be efficient about not performing listings of the same
* entities multiple times * entities multiple times
*/ */
long getTimestamp(); long getTimestamp();

View File

@ -31,6 +31,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.io.Charsets; import org.apache.commons.io.Charsets;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
@ -40,37 +41,78 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processor.util.list.ListableEntity;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager; import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestWatcher;
public class TestAbstractListProcessor { public class TestAbstractListProcessor {
static final long DEFAULT_SLEEP_MILLIS = TimeUnit.NANOSECONDS.toMillis(AbstractListProcessor.LISTING_LAG_NANOS * 2); /**
* @return current timestamp in milliseconds, but truncated at specified target precision (e.g. SECONDS or MINUTES).
*/
private static long getCurrentTimestampMillis(final TimeUnit targetPrecision) {
final long timestampInTargetPrecision = targetPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return TimeUnit.MILLISECONDS.convert(timestampInTargetPrecision, targetPrecision);
}
private static long getSleepMillis(final TimeUnit targetPrecision) {
return AbstractListProcessor.LISTING_LAG_MILLIS.get(targetPrecision) * 2;
}
private static final long DEFAULT_SLEEP_MILLIS = getSleepMillis(TimeUnit.MILLISECONDS);
private ConcreteListProcessor proc;
private TestRunner runner;
@Rule
public TestWatcher dumpState = new ListProcessorTestWatcher(
() -> {
try {
return runner.getStateManager().getState(Scope.LOCAL).toMap();
} catch (IOException e) {
throw new RuntimeException("Failed to retrieve state", e);
}
},
() -> proc.entities,
() -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList())
);
@Before
public void setup() {
proc = new ConcreteListProcessor();
runner = TestRunners.newTestRunner(proc);
}
@Rule @Rule
public final TemporaryFolder testFolder = new TemporaryFolder(); public final TemporaryFolder testFolder = new TemporaryFolder();
/**
* <p>Ensures that files are listed when those are old enough:
* <li>Files with last modified timestamp those are old enough to determine that those are completely written
* and no further files are expected to be added with the same timestamp.</li>
* <li>This behavior is expected when a processor is scheduled less frequently, such as hourly or daily.</li>
* </p>
*/
@Test @Test
public void testAllExistingEntriesEmittedOnFirstIteration() throws Exception { public void testAllExistingEntriesEmittedOnFirstIteration() throws Exception {
final long oldTimestamp = System.nanoTime() - (AbstractListProcessor.LISTING_LAG_NANOS * 2); final long oldTimestamp = System.currentTimeMillis() - getSleepMillis(TimeUnit.MILLISECONDS);
// These entries have existed before the processor runs at the first time. // These entries have existed before the processor runs at the first time.
final ConcreteListProcessor proc = new ConcreteListProcessor();
proc.addEntity("name", "id", oldTimestamp); proc.addEntity("name", "id", oldTimestamp);
proc.addEntity("name", "id2", oldTimestamp); proc.addEntity("name", "id2", oldTimestamp);
// First run, the above listed entries should be emitted since it has existed. // First run, the above listed entries should be emitted since it has existed.
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
runner.clearTransferState(); runner.clearTransferState();
@ -83,13 +125,10 @@ public class TestAbstractListProcessor {
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
} }
@Test private void testPreviouslySkippedEntriesEmmitedOnNextIteration(final TimeUnit targetPrecision) throws InterruptedException {
public void testPreviouslySkippedEntriesEmittedOnNextIteration() throws Exception {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.run(); runner.run();
final long initialTimestamp = System.nanoTime(); final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp); proc.addEntity("name", "id", initialTimestamp);
@ -101,20 +140,47 @@ public class TestAbstractListProcessor {
runner.clearTransferState(); runner.clearTransferState();
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
Thread.sleep(DEFAULT_SLEEP_MILLIS); Thread.sleep(getSleepMillis(targetPrecision));
// Run again without introducing any new entries // Run again without introducing any new entries
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
} }
/**
* <p>Ensures that newly created files should wait to confirm there is no more files created with the same timestamp:
* <li>If files have the latest modified timestamp at an iteration, then those should be postponed to be listed</li>
* <li>If those files still are the latest files at the next iteration, then those should be listed</li>
* </p>
*/
@Test @Test
public void testOnlyNewEntriesEmitted() throws Exception { public void testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision() throws Exception {
final ConcreteListProcessor proc = new ConcreteListProcessor(); testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MILLISECONDS);
final TestRunner runner = TestRunners.newTestRunner(proc); }
runner.run();
final long initialTimestamp = System.nanoTime(); /**
* Same as {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but simulates that the target
* filesystem only provide timestamp precision in Seconds.
*/
@Test
public void testPreviouslySkippedEntriesEmittedOnNextIterationSecondPrecision() throws Exception {
testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.SECONDS);
}
/**
* Same as {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but simulates that the target
* filesystem only provide timestamp precision in Minutes.
* This test is ignored because it needs to wait two minutes. Not good for automated unit testing, but still valuable when executed manually.
*/
@Ignore
@Test
public void testPreviouslySkippedEntriesEmittedOnNextIterationMinutesPrecision() throws Exception {
testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MINUTES);
}
private void testOnlyNewEntriesEmitted(final TimeUnit targetPrecision) throws InterruptedException {
final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp); proc.addEntity("name", "id", initialTimestamp);
@ -126,7 +192,7 @@ public class TestAbstractListProcessor {
runner.clearTransferState(); runner.clearTransferState();
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
Thread.sleep(DEFAULT_SLEEP_MILLIS); Thread.sleep(getSleepMillis(targetPrecision));
// Running again, our two previously seen files are now cleared to be released // Running again, our two previously seen files are now cleared to be released
runner.run(); runner.run();
@ -139,18 +205,20 @@ public class TestAbstractListProcessor {
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState(); runner.clearTransferState();
proc.addEntity("name", "id3", initialTimestamp - 1); // An entry that is older than already processed entry should not be listed.
proc.addEntity("name", "id3", initialTimestamp - targetPrecision.toMillis(1));
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState(); runner.clearTransferState();
// If an entry whose timestamp is the same with the last processed timestamp should not be listed.
proc.addEntity("name", "id2", initialTimestamp); proc.addEntity("name", "id2", initialTimestamp);
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState(); runner.clearTransferState();
// Now a new file beyond the current time enters // Now a new file beyond the current time enters
proc.addEntity("name", "id2", initialTimestamp + 1); proc.addEntity("name", "id2", initialTimestamp + targetPrecision.toMillis(1));
// It should show up // It should show up
runner.run(); runner.run();
@ -159,19 +227,36 @@ public class TestAbstractListProcessor {
} }
@Test @Test
public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception { public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception {
final ConcreteListProcessor proc = new ConcreteListProcessor(); testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS);
final TestRunner runner = TestRunners.newTestRunner(proc); }
final long initialTimestamp = System.nanoTime(); @Test
public void testOnlyNewEntriesEmittedSecondPrecision() throws Exception {
testOnlyNewEntriesEmitted(TimeUnit.SECONDS);
}
/**
* This test is ignored because it needs to wait two minutes. Not good for automated unit testing, but still valuable when executed manually.
*/
@Ignore
@Test
public void testOnlyNewEntriesEmittedMinutesPrecision() throws Exception {
testOnlyNewEntriesEmitted(TimeUnit.MINUTES);
}
@Test
public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception {
final long initialTimestamp = System.currentTimeMillis();
proc.addEntity("name", "id", initialTimestamp); proc.addEntity("name", "id", initialTimestamp);
proc.addEntity("name", "id2", initialTimestamp); proc.addEntity("name", "id2", initialTimestamp);
// Emulate having state but not having had the processor run such as in a restart // Emulate having state but not having had the processor run such as in a restart
final Map<String, String> preexistingState = new HashMap<>(); final Map<String, String> preexistingState = new HashMap<>();
preexistingState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, Long.toString(initialTimestamp)); preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
preexistingState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, Long.toString(initialTimestamp)); preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
runner.getStateManager().setState(preexistingState, Scope.CLUSTER); runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
// run for the first time // run for the first time
@ -216,37 +301,35 @@ public class TestAbstractListProcessor {
@Test @Test
public void testStateStoredInClusterStateManagement() throws Exception { public void testStateStoredInClusterStateManagement() throws Exception {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
final DistributedCache cache = new DistributedCache(); final DistributedCache cache = new DistributedCache();
runner.addControllerService("cache", cache); runner.addControllerService("cache", cache);
runner.enableControllerService(cache); runner.enableControllerService(cache);
runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache"); runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
final long initialTimestamp = System.nanoTime(); final long initialTimestamp = System.currentTimeMillis();
proc.addEntity("name", "id", initialTimestamp); proc.addEntity("name", "id", initialTimestamp);
runner.run(); runner.run();
final Map<String, String> expectedState = new HashMap<>(); final Map<String, String> expectedState = new HashMap<>();
// Ensure only timestamp is migrated // Ensure only timestamp is migrated
expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0"); expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "0");
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
Thread.sleep(DEFAULT_SLEEP_MILLIS); Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run(); runner.run();
// Ensure only timestamp is migrated // Ensure only timestamp is migrated
expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
} }
@Test @Test
public void testStateMigratedFromCacheService() throws InitializationException { public void testStateMigratedFromCacheService() throws InitializationException {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
final DistributedCache cache = new DistributedCache(); final DistributedCache cache = new DistributedCache();
runner.addControllerService("cache", cache); runner.addControllerService("cache", cache);
runner.enableControllerService(cache); runner.enableControllerService(cache);
@ -261,15 +344,13 @@ public class TestAbstractListProcessor {
final MockStateManager stateManager = runner.getStateManager(); final MockStateManager stateManager = runner.getStateManager();
final Map<String, String> expectedState = new HashMap<>(); final Map<String, String> expectedState = new HashMap<>();
// Ensure only timestamp is migrated // Ensure only timestamp is migrated
expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492"); expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, "1492");
expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492"); expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "1492");
stateManager.assertStateEquals(expectedState, Scope.CLUSTER); stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
} }
@Test @Test
public void testNoStateToMigrate() throws Exception { public void testNoStateToMigrate() throws Exception {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.run(); runner.run();
@ -280,8 +361,6 @@ public class TestAbstractListProcessor {
@Test @Test
public void testStateMigratedFromLocalFile() throws Exception { public void testStateMigratedFromLocalFile() throws Exception {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
// Create a file that we will populate with the desired state // Create a file that we will populate with the desired state
File persistenceFile = testFolder.newFile(proc.persistenceFilename); File persistenceFile = testFolder.newFile(proc.persistenceFilename);
@ -305,20 +384,17 @@ public class TestAbstractListProcessor {
// Verify the state manager now maintains the associated state // Verify the state manager now maintains the associated state
final Map<String, String> expectedState = new HashMap<>(); final Map<String, String> expectedState = new HashMap<>();
// Ensure only timestamp is migrated // Ensure only timestamp is migrated
expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492"); expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, "1492");
expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492"); expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "1492");
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
} }
@Test @Test
public void testResumeListingAfterClearingState() throws Exception { public void testResumeListingAfterClearingState() throws Exception {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
final long initialEventTimestamp = System.nanoTime(); final long initialEventTimestamp = System.currentTimeMillis();
proc.addEntity("name", "id", initialEventTimestamp); proc.addEntity("name", "id", initialEventTimestamp);
proc.addEntity("name", "id2", initialEventTimestamp); proc.addEntity("name", "id2", initialEventTimestamp);
@ -350,8 +426,7 @@ public class TestAbstractListProcessor {
@Test @Test
public void testFetchOnStart() throws InitializationException { public void testFetchOnStart() throws InitializationException {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
final DistributedCache cache = new DistributedCache(); final DistributedCache cache = new DistributedCache();
runner.addControllerService("cache", cache); runner.addControllerService("cache", cache);
runner.enableControllerService(cache); runner.enableControllerService(cache);
@ -364,11 +439,10 @@ public class TestAbstractListProcessor {
@Test @Test
public void testOnlyNewStateStored() throws Exception { public void testOnlyNewStateStored() throws Exception {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.run(); runner.run();
final long initialTimestamp = System.nanoTime(); final long initialTimestamp = System.currentTimeMillis();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp); proc.addEntity("name", "id", initialTimestamp);
@ -390,8 +464,8 @@ public class TestAbstractListProcessor {
final Map<String, String> map = stateMap.toMap(); final Map<String, String> map = stateMap.toMap();
// Ensure only timestamp is migrated // Ensure only timestamp is migrated
assertEquals(2, map.size()); assertEquals(2, map.size());
assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY)); assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY)); assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
proc.addEntity("new name", "new id", initialTimestamp + 1); proc.addEntity("new name", "new id", initialTimestamp + 1);
runner.run(); runner.run();
@ -403,9 +477,9 @@ public class TestAbstractListProcessor {
assertEquals(3, updatedStateMap.getVersion()); assertEquals(3, updatedStateMap.getVersion());
assertEquals(2, updatedStateMap.toMap().size()); assertEquals(2, updatedStateMap.toMap().size());
assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY)); assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
// Processed timestamp is now caught up // Processed timestamp is now caught up
assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY)); assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
} }
private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient { private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
@ -502,7 +576,9 @@ public class TestAbstractListProcessor {
@Override @Override
protected Map<String, String> createAttributes(final ListableEntity entity, final ProcessContext context) { protected Map<String, String> createAttributes(final ListableEntity entity, final ProcessContext context) {
return Collections.emptyMap(); final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), entity.getIdentifier());
return attributes;
} }
@Override @Override

View File

@ -84,6 +84,7 @@ public class ListFTP extends ListFileTransfer {
properties.add(FTPTransfer.PROXY_PORT); properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME); properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD); properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
return properties; return properties;
} }

View File

@ -213,6 +213,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
properties.add(MIN_SIZE); properties.add(MIN_SIZE);
properties.add(MAX_SIZE); properties.add(MAX_SIZE);
properties.add(IGNORE_HIDDEN_FILES); properties.add(IGNORE_HIDDEN_FILES);
properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>(); final Set<Relationship> relationships = new HashSet<>();

View File

@ -82,6 +82,7 @@ public class ListSFTP extends ListFileTransfer {
properties.add(SFTPTransfer.CONNECTION_TIMEOUT); properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
properties.add(SFTPTransfer.DATA_TIMEOUT); properties.add(SFTPTransfer.DATA_TIMEOUT);
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT); properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
return properties; return properties;
} }

View File

@ -30,54 +30,87 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.list.AbstractListProcessor; import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processor.util.list.ListProcessorTestWatcher;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.Description;
public class TestListFile { public class TestListFile {
final String TESTDIR = "target/test/data/in"; private final String TESTDIR = "target/test/data/in";
final File testDir = new File(TESTDIR); private final File testDir = new File(TESTDIR);
ListFile processor; private ListFile processor;
TestRunner runner; private TestRunner runner;
ProcessContext context; private ProcessContext context;
// Testing factors in milliseconds for file ages that are configured on each run by resetAges() // Testing factors in milliseconds for file ages that are configured on each run by resetAges()
// age#millis are relative time references // age#millis are relative time references
// time#millis are absolute time references // time#millis are absolute time references
// age#filter are filter label strings for the filter properties // age#filter are filter label strings for the filter properties
Long syncTime = System.currentTimeMillis(); private Long syncTime = getTestModifiedTime();
Long time0millis, time1millis, time2millis, time3millis, time4millis, time5millis; private Long time0millis, time1millis, time2millis, time3millis, time4millis, time5millis;
Long age0millis, age1millis, age2millis, age3millis, age4millis, age5millis; private Long age0millis, age1millis, age2millis, age3millis, age4millis, age5millis;
String age0, age1, age2, age3, age4, age5; private String age0, age1, age2, age3, age4, age5;
static final long DEFAULT_SLEEP_MILLIS = TimeUnit.NANOSECONDS.toMillis(AbstractListProcessor.LISTING_LAG_NANOS * 2); @Rule
public ListProcessorTestWatcher dumpState = new ListProcessorTestWatcher(
() -> {
try {
return runner.getStateManager().getState(Scope.LOCAL).toMap();
} catch (IOException e) {
throw new RuntimeException("Failed to retrieve state", e);
}
},
() -> listFiles(testDir).stream()
.map(f -> new FileInfo.Builder().filename(f.getName()).lastModifiedTime(f.lastModified()).build()).collect(Collectors.toList()),
() -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList())
) {
@Override
protected void finished(Description description) {
try {
// In order to refer files in testDir, we want to execute this rule before tearDown, because tearDown removes files.
// And @After is always executed before @Rule.
tearDown();
} catch (Exception e) {
throw new RuntimeException("Failed to tearDown.", e);
}
}
};
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
processor = new ListFile(); processor = new ListFile();
runner = TestRunners.newTestRunner(processor); runner = TestRunners.newTestRunner(processor);
runner.setProperty(AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, AbstractListProcessor.PRECISION_SECONDS.getValue());
context = runner.getProcessContext(); context = runner.getProcessContext();
deleteDirectory(testDir); deleteDirectory(testDir);
assertTrue("Unable to create test data directory " + testDir.getAbsolutePath(), testDir.exists() || testDir.mkdirs()); assertTrue("Unable to create test data directory " + testDir.getAbsolutePath(), testDir.exists() || testDir.mkdirs());
resetAges(); resetAges();
} }
@After
public void tearDown() throws Exception { public void tearDown() throws Exception {
deleteDirectory(testDir); deleteDirectory(testDir);
File tempFile = processor.getPersistenceFile(); File tempFile = processor.getPersistenceFile();
@ -91,14 +124,38 @@ public class TestListFile {
} }
} }
private List<File> listFiles(final File file) {
if (file.isDirectory()) {
final List<File> result = new ArrayList<>();
Optional.ofNullable(file.listFiles()).ifPresent(files -> Arrays.stream(files).forEach(f -> result.addAll(listFiles(f))));
return result;
} else {
return Collections.singletonList(file);
}
}
/** /**
* This method ensures runner clears transfer state, * This method ensures runner clears transfer state,
* and sleeps the current thread for DEFAULT_SLEEP_MILLIS before executing runner.run(). * and sleeps the current thread for specific period defined at {@link AbstractListProcessor#LISTING_LAG_MILLIS}
* based on local filesystem timestamp precision before executing runner.run().
*/ */
private void runNext() throws InterruptedException { private void runNext() throws InterruptedException {
runner.clearTransferState(); runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
final List<File> files = listFiles(testDir);
final boolean isMillisecondSupported = files.stream().anyMatch(file -> file.lastModified() % 1_000 > 0);
final Long lagMillis;
if (isMillisecondSupported) {
lagMillis = AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS);
} else {
// Filesystems such as Mac OS X HFS (Hierarchical File System) or EXT3 are known that only support timestamp in seconds precision.
lagMillis = AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS);
}
Thread.sleep(lagMillis * 2);
final long startedAtMillis = System.currentTimeMillis();
runner.run(); runner.run();
dumpState.dumpState(startedAtMillis);
} }
@Test @Test
@ -305,7 +362,7 @@ public class TestListFile {
@Test @Test
public void testFilterHidden() throws Exception { public void testFilterHidden() throws Exception {
final long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); final long now = getTestModifiedTime();
FileOutputStream fos; FileOutputStream fos;
@ -388,7 +445,7 @@ public class TestListFile {
@Test @Test
public void testFilterPathPattern() throws Exception { public void testFilterPathPattern() throws Exception {
final long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); final long now = getTestModifiedTime();
final File subdir1 = new File(TESTDIR + "/subdir1"); final File subdir1 = new File(TESTDIR + "/subdir1");
assertTrue(subdir1.mkdirs()); assertTrue(subdir1.mkdirs());
@ -595,20 +652,20 @@ public class TestListFile {
* Provides "now" minus 1 second in millis * Provides "now" minus 1 second in millis
*/ */
private static long getTestModifiedTime() { private static long getTestModifiedTime() {
final long nowNanos = System.nanoTime(); final long nowMillis = System.currentTimeMillis();
// Subtract a second to avoid possible rounding issues // Subtract a second to avoid possible rounding issues
final long nowSeconds = TimeUnit.SECONDS.convert(nowNanos, TimeUnit.NANOSECONDS) - 1; final long nowSeconds = TimeUnit.SECONDS.convert(nowMillis, TimeUnit.MILLISECONDS) - 1;
return TimeUnit.MILLISECONDS.convert(nowSeconds, TimeUnit.SECONDS); return TimeUnit.MILLISECONDS.convert(nowSeconds, TimeUnit.SECONDS);
} }
public void resetAges() { private void resetAges() {
syncTime = System.currentTimeMillis(); syncTime = getTestModifiedTime();
age0millis = 0L; age0millis = 0L;
age1millis = 2000L; age1millis = 5000L;
age2millis = 5000L; age2millis = 10000L;
age3millis = 7000L; age3millis = 15000L;
age4millis = 10000L; age4millis = 20000L;
age5millis = 100000L; age5millis = 100000L;
time0millis = syncTime - age0millis; time0millis = syncTime - age0millis;