diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java index 9888d163e8..9a0d02c540 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java @@ -64,6 +64,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.TreeMap; @@ -190,11 +191,13 @@ public abstract class AbstractListProcessor extends Ab " Since it only tracks few timestamps, it can manage listing state efficiently." + " However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy." + " For example, such situation can happen in a file system if a file with old timestamp" + - " is copied or moved into the target directory without its last modified timestamp being updated."); + " is copied or moved into the target directory without its last modified timestamp being updated." + + " Also may miss files when multiple subdirectories are being written at the same time while listing is running."); public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities", "This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities." + " This strategy can pick entities having old timestamp that can be missed with 'Tracing Timestamps'." + + " Works even when multiple subdirectories are being written at the same time while listing is running." + " However additional DistributedMapCache controller service is required and more JVM heap memory is used." + " See the description of 'Entity Tracking Time Window' property for further details on how it works."); @@ -203,7 +206,14 @@ public abstract class AbstractListProcessor extends Ab " on executing this processor. It is recommended to change the default run schedule value." + " Any property that related to the persisting state will be disregarded."); - public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder() + public static final AllowableValue BY_TIME_WINDOW = new AllowableValue("time-window", "Time Window", + "This strategy uses a sliding time window. The window starts where the previous window ended and ends with the 'current time'." + + " One cycle will list files with modification time falling within the time window." + + " Works even when multiple subdirectories are being written at the same time while listing is running." + + " IMPORTANT: This strategy works properly only if the time on both the system hosting NiFi and the one hosting the files" + + " are accurate."); + + public static final PropertyDescriptor LISTING_STRATEGY = new Builder() .name("listing-strategy") .displayName("Listing Strategy") .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.") @@ -449,6 +459,9 @@ public abstract class AbstractListProcessor extends Ab } else if (NO_TRACKING.equals(listingStrategy)) { listByNoTracking(context, session); + } else if (BY_TIME_WINDOW.equals(listingStrategy)) { + listByTimeWindow(context, session); + } else { throw new ProcessException("Unknown listing strategy: " + listingStrategy); } @@ -502,6 +515,119 @@ public abstract class AbstractListProcessor extends Ab } } + public void listByTimeWindow(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (this.lastListedLatestEntryTimestampMillis == null || justElectedPrimaryNode) { + try { + final StateMap stateMap = context.getStateManager().getState(getStateScope(context)); + Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY)) + .map(Long::parseLong) + .ifPresent(lastTimestamp -> this.lastListedLatestEntryTimestampMillis = lastTimestamp); + + justElectedPrimaryNode = false; + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished."); + context.yield(); + return; + } + } + + long lowerBoundInclusiveTimestamp = Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L); + long upperBoundExclusiveTimestamp; + + long currentTime = getCurrentTime(); + + final TreeMap> orderedEntries = new TreeMap<>(); + try { + List entityList = performListing(context, lowerBoundInclusiveTimestamp); + + boolean targetSystemHasMilliseconds = false; + boolean targetSystemHasSeconds = false; + for (final T entity : entityList) { + final long entityTimestampMillis = entity.getTimestamp(); + if (!targetSystemHasMilliseconds) { + targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0; + } + if (!targetSystemHasSeconds) { + targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0; + } + } + + // Determine target system time precision. + String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue(); + if (StringUtils.isBlank(specifiedPrecision)) { + // If TARGET_SYSTEM_TIMESTAMP_PRECISION is not supported by the Processor, then specifiedPrecision can be null, instead of its default value. + specifiedPrecision = getDefaultTimePrecision(); + } + final TimeUnit targetSystemTimePrecision + = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision) + ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES + : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS + : PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES; + final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision); + + upperBoundExclusiveTimestamp = currentTime - listingLagMillis; + + if (getLogger().isTraceEnabled()) { + getLogger().trace("interval: " + lowerBoundInclusiveTimestamp + " - " + upperBoundExclusiveTimestamp); + getLogger().trace("entityList: " + entityList.stream().map(entity -> entity.getName() + "_" + entity.getTimestamp()).collect(Collectors.joining(", "))); + } + entityList + .stream() + .filter(entity -> entity.getTimestamp() >= lowerBoundInclusiveTimestamp) + .filter(entity -> entity.getTimestamp() < upperBoundExclusiveTimestamp) + .forEach(entity -> orderedEntries + .computeIfAbsent(entity.getTimestamp(), __ -> new ArrayList<>()) + .add(entity) + ); + if (getLogger().isTraceEnabled()) { + getLogger().trace("orderedEntries: " + + orderedEntries.values().stream() + .flatMap(List::stream) + .map(entity -> entity.getName() + "_" + entity.getTimestamp()) + .collect(Collectors.joining(", ")) + ); + } + } catch (final IOException e) { + getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e); + context.yield(); + return; + } + + if (orderedEntries.isEmpty()) { + getLogger().debug("There is no data to list. Yielding."); + context.yield(); + return; + } + + final boolean writerSet = context.getProperty(RECORD_WRITER).isSet(); + if (writerSet) { + try { + createRecordsForEntities(context, session, orderedEntries); + } catch (final IOException | SchemaNotFoundException e) { + getLogger().error("Failed to write listing to FlowFile", e); + context.yield(); + return; + } + } else { + createFlowFilesForEntities(context, session, orderedEntries); + } + + try { + if (getLogger().isTraceEnabled()) { + getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: " + this.lastListedLatestEntryTimestampMillis + " = " + upperBoundExclusiveTimestamp); + } + this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp; + persist(upperBoundExclusiveTimestamp, upperBoundExclusiveTimestamp, latestIdentifiersProcessed, session, getStateScope(context)); + } catch (final IOException ioe) { + getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or " + + "if another node begins executing this Processor, data duplication may occur.", ioe); + } + } + + protected long getCurrentTime() { + return System.currentTimeMillis(); + } + public void listByTrackingTimestamps(final ProcessContext context, final ProcessSession session) throws ProcessException { Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java index 65afde4470..440a08d968 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java @@ -54,11 +54,11 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.List; import java.util.ArrayList; -import java.util.HashMap; import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java index f8282d4af4..e2e7b1ebed 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java @@ -72,7 +72,7 @@ public class ListFTP extends ListFileTransfer { final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("21").build(); final List properties = new ArrayList<>(); - properties.add(LISTING_STRATEGY); + properties.add(FILE_TRANSFER_LISTING_STRATEGY); properties.add(HOSTNAME); properties.add(port); properties.add(USERNAME); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java index 3ee554ceb7..95d475c088 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java @@ -69,7 +69,10 @@ public abstract class ListFileTransfer extends AbstractListProcessor { .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue(".") .build(); - + public static final PropertyDescriptor FILE_TRANSFER_LISTING_STRATEGY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(LISTING_STRATEGY) + .allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING, BY_TIME_WINDOW) + .build(); @Override protected Map createAttributes(final FileInfo fileInfo, final ProcessContext context) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java index 67c0d05521..c345b3b69a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java @@ -82,7 +82,7 @@ public class ListSFTP extends ListFileTransfer { final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build(); final List properties = new ArrayList<>(); - properties.add(LISTING_STRATEGY); + properties.add(FILE_TRANSFER_LISTING_STRATEGY); properties.add(HOSTNAME); properties.add(port); properties.add(USERNAME); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java index 0b832a3db5..83827011fc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java @@ -202,7 +202,7 @@ public class SFTPTransfer implements FileTransfer { return listing; } - private void getListing(final String path, final int depth, final int maxResults, final List listing) throws IOException { + protected void getListing(final String path, final int depth, final int maxResults, final List listing) throws IOException { if (maxResults < 1 || listing.size() >= maxResults) { return; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java index 68a217d08e..f864e12db1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java @@ -19,11 +19,21 @@ package org.apache.nifi.processors.standard; import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule; +import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.list.AbstractListProcessor; import org.apache.nifi.processors.standard.util.FTPTransfer; +import org.apache.nifi.processors.standard.util.FileInfo; +import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -33,6 +43,10 @@ import org.junit.Before; import org.junit.Test; import org.junit.Rule; import java.security.SecureRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; public class TestListSFTP { @Rule @@ -63,6 +77,106 @@ public class TestListSFTP { sftpServer.deleteAllFilesAndDirectories(); } + @Test + public void testListingWhileConcurrentlyWritingIntoMultipleDirectories() throws Exception { + AtomicInteger fileCounter = new AtomicInteger(1); + + List createdFileNames = new ArrayList<>(); + + CountDownLatch finishScheduledRun = new CountDownLatch(1); + CountDownLatch reachScanningSubDir = new CountDownLatch(1); + CountDownLatch writeMoreFiles = new CountDownLatch(1); + + String baseDir = "/base/"; + String subDir = "/base/subdir/"; + + TestRunner runner = TestRunners.newTestRunner(new ListSFTP() { + @Override + protected FileTransfer getFileTransfer(ProcessContext context) { + return new SFTPTransfer(context, getLogger()){ + @Override + protected void getListing(String path, int depth, int maxResults, List listing) throws IOException { + if (path.contains("subdir")) { + reachScanningSubDir.countDown(); + try { + writeMoreFiles.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + super.getListing(path, depth, maxResults, listing); + } + }; + } + }); + + // This test fails with BY_TIMESTAMPS +// runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIMESTAMPS.getValue()); + runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIME_WINDOW.getValue()); + runner.setProperty(ListSFTP.HOSTNAME, "localhost"); + runner.setProperty(ListSFTP.USERNAME, username); + runner.setProperty(SFTPTransfer.PASSWORD, password); + runner.setProperty(FTPTransfer.PORT, Integer.toString(port)); + runner.setProperty(ListSFTP.REMOTE_PATH, baseDir); + runner.setProperty(FileTransfer.RECURSIVE_SEARCH, "true"); + + runner.assertValid(); + + ExecutorService executorService = null; + try { + executorService = Executors.newFixedThreadPool(1); + sftpServer.createDirectory("/base"); + + uploadFile(baseDir, fileCounter.getAndIncrement(), createdFileNames); + uploadFile(subDir, "sub." + fileCounter.getAndIncrement(), createdFileNames); + + executorService.submit(() -> { + try { + runner.run(1, false); + } finally { + finishScheduledRun.countDown(); + } + }); + + reachScanningSubDir.await(); + + uploadFile(baseDir, fileCounter.getAndIncrement(), createdFileNames); + Thread.sleep(1100); // Make sure the next file has greater timestamp + uploadFile(subDir, "sub." + fileCounter.getAndIncrement(), createdFileNames); + + writeMoreFiles.countDown(); + + Thread.sleep(1100); // Need to wait for 1+ sec if the file timestamps have only sec precision. + finishScheduledRun.await(); + runner.run(); + + List successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + + List successFileNames = successFiles.stream() + .map(MockFlowFile::getAttributes) + .map(attributes -> attributes.get("filename")) + .sorted() + .collect(Collectors.toList()); + + Collections.sort(createdFileNames); + + assertEquals(createdFileNames, successFileNames); + } finally { + if (executorService != null) { + executorService.shutdown(); + } + } + } + + private void uploadFile(String baseDir, Object fileSuffix, List createdFileNames) throws Exception { + String fileName = "file." + fileSuffix; + + sftpServer.putFile(baseDir + fileName, "unimportant", StandardCharsets.UTF_8); + + createdFileNames.add(fileName); + } + @Test public void basicFileList() throws InterruptedException { TestRunner runner = TestRunners.newTestRunner(ListSFTP.class);