mirror of https://github.com/apache/nifi.git
NIFI-8081 Added new Listing Strategy to ListFTP and ListSFTP: Time Window
NIFI-8081 Added new Listing Strategy to ListFTP and ListSFTP: Adjusted Time Window. User can specify the time zone or time difference (compared to where NiFi runs) of the system hosting the files and based on the calculates the current time there. Lists files modified before this adjusted current time (and after the last listing). NIFI-8081 'Time Adjustment' validated not to be set if listing strategy is not 'Adjusted Time Window'. Extracted validator to a separate class. Added more tests. Minor refactor. Typo fix. NIFI-8081 Improved validation. NIFI-8081 'Time Adjustment' is not necessary - in fact it can cause problems. SFTP (and usually FTP - which has a more general bug at the moment) returns a timestamp that doesn't really need adjustment. (SFTP in particular returns the an 'epoch' time.) Everything remains the same - the new listing strategy relies on a sliding time window, but without the unnecessary option to adjust for the modification time. NIFI-8081 Resolved conflicts after rebasing to main. NIFI-8081 Renamed 'AbstractListProcessor.listByAdjustedSlidingTimeWindow' to 'listByTimeWindow'. Post main rebase correction. NIFI-8081 Updated user doc for the BY_TIME_WINDOW strategy to warn user on it's reliance of accurate time. This closes #4721. Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
parent
c1f88ec740
commit
b55998afc1
|
@ -64,6 +64,7 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
@ -190,11 +191,13 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
" Since it only tracks few timestamps, it can manage listing state efficiently." +
|
||||
" However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy." +
|
||||
" For example, such situation can happen in a file system if a file with old timestamp" +
|
||||
" is copied or moved into the target directory without its last modified timestamp being updated.");
|
||||
" is copied or moved into the target directory without its last modified timestamp being updated." +
|
||||
" Also may miss files when multiple subdirectories are being written at the same time while listing is running.");
|
||||
|
||||
public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities",
|
||||
"This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities." +
|
||||
" This strategy can pick entities having old timestamp that can be missed with 'Tracing Timestamps'." +
|
||||
" Works even when multiple subdirectories are being written at the same time while listing is running." +
|
||||
" However additional DistributedMapCache controller service is required and more JVM heap memory is used." +
|
||||
" See the description of 'Entity Tracking Time Window' property for further details on how it works.");
|
||||
|
||||
|
@ -203,7 +206,14 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
" on executing this processor. It is recommended to change the default run schedule value." +
|
||||
" Any property that related to the persisting state will be disregarded.");
|
||||
|
||||
public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder()
|
||||
public static final AllowableValue BY_TIME_WINDOW = new AllowableValue("time-window", "Time Window",
|
||||
"This strategy uses a sliding time window. The window starts where the previous window ended and ends with the 'current time'." +
|
||||
" One cycle will list files with modification time falling within the time window." +
|
||||
" Works even when multiple subdirectories are being written at the same time while listing is running." +
|
||||
" IMPORTANT: This strategy works properly only if the time on both the system hosting NiFi and the one hosting the files" +
|
||||
" are accurate.");
|
||||
|
||||
public static final PropertyDescriptor LISTING_STRATEGY = new Builder()
|
||||
.name("listing-strategy")
|
||||
.displayName("Listing Strategy")
|
||||
.description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
|
||||
|
@ -449,6 +459,9 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
} else if (NO_TRACKING.equals(listingStrategy)) {
|
||||
listByNoTracking(context, session);
|
||||
|
||||
} else if (BY_TIME_WINDOW.equals(listingStrategy)) {
|
||||
listByTimeWindow(context, session);
|
||||
|
||||
} else {
|
||||
throw new ProcessException("Unknown listing strategy: " + listingStrategy);
|
||||
}
|
||||
|
@ -502,6 +515,119 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
}
|
||||
}
|
||||
|
||||
public void listByTimeWindow(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
if (this.lastListedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
|
||||
try {
|
||||
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
|
||||
Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY))
|
||||
.map(Long::parseLong)
|
||||
.ifPresent(lastTimestamp -> this.lastListedLatestEntryTimestampMillis = lastTimestamp);
|
||||
|
||||
justElectedPrimaryNode = false;
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
long lowerBoundInclusiveTimestamp = Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L);
|
||||
long upperBoundExclusiveTimestamp;
|
||||
|
||||
long currentTime = getCurrentTime();
|
||||
|
||||
final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
|
||||
try {
|
||||
List<T> entityList = performListing(context, lowerBoundInclusiveTimestamp);
|
||||
|
||||
boolean targetSystemHasMilliseconds = false;
|
||||
boolean targetSystemHasSeconds = false;
|
||||
for (final T entity : entityList) {
|
||||
final long entityTimestampMillis = entity.getTimestamp();
|
||||
if (!targetSystemHasMilliseconds) {
|
||||
targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0;
|
||||
}
|
||||
if (!targetSystemHasSeconds) {
|
||||
targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Determine target system time precision.
|
||||
String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
|
||||
if (StringUtils.isBlank(specifiedPrecision)) {
|
||||
// If TARGET_SYSTEM_TIMESTAMP_PRECISION is not supported by the Processor, then specifiedPrecision can be null, instead of its default value.
|
||||
specifiedPrecision = getDefaultTimePrecision();
|
||||
}
|
||||
final TimeUnit targetSystemTimePrecision
|
||||
= PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
|
||||
? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
|
||||
: PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS
|
||||
: PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES;
|
||||
final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
|
||||
|
||||
upperBoundExclusiveTimestamp = currentTime - listingLagMillis;
|
||||
|
||||
if (getLogger().isTraceEnabled()) {
|
||||
getLogger().trace("interval: " + lowerBoundInclusiveTimestamp + " - " + upperBoundExclusiveTimestamp);
|
||||
getLogger().trace("entityList: " + entityList.stream().map(entity -> entity.getName() + "_" + entity.getTimestamp()).collect(Collectors.joining(", ")));
|
||||
}
|
||||
entityList
|
||||
.stream()
|
||||
.filter(entity -> entity.getTimestamp() >= lowerBoundInclusiveTimestamp)
|
||||
.filter(entity -> entity.getTimestamp() < upperBoundExclusiveTimestamp)
|
||||
.forEach(entity -> orderedEntries
|
||||
.computeIfAbsent(entity.getTimestamp(), __ -> new ArrayList<>())
|
||||
.add(entity)
|
||||
);
|
||||
if (getLogger().isTraceEnabled()) {
|
||||
getLogger().trace("orderedEntries: " +
|
||||
orderedEntries.values().stream()
|
||||
.flatMap(List::stream)
|
||||
.map(entity -> entity.getName() + "_" + entity.getTimestamp())
|
||||
.collect(Collectors.joining(", "))
|
||||
);
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
if (orderedEntries.isEmpty()) {
|
||||
getLogger().debug("There is no data to list. Yielding.");
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
final boolean writerSet = context.getProperty(RECORD_WRITER).isSet();
|
||||
if (writerSet) {
|
||||
try {
|
||||
createRecordsForEntities(context, session, orderedEntries);
|
||||
} catch (final IOException | SchemaNotFoundException e) {
|
||||
getLogger().error("Failed to write listing to FlowFile", e);
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
createFlowFilesForEntities(context, session, orderedEntries);
|
||||
}
|
||||
|
||||
try {
|
||||
if (getLogger().isTraceEnabled()) {
|
||||
getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: " + this.lastListedLatestEntryTimestampMillis + " = " + upperBoundExclusiveTimestamp);
|
||||
}
|
||||
this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp;
|
||||
persist(upperBoundExclusiveTimestamp, upperBoundExclusiveTimestamp, latestIdentifiersProcessed, session, getStateScope(context));
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
|
||||
+ "if another node begins executing this Processor, data duplication may occur.", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
protected long getCurrentTime() {
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public void listByTrackingTimestamps(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;
|
||||
|
||||
|
|
|
@ -54,11 +54,11 @@ import java.io.File;
|
|||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
|
|
@ -72,7 +72,7 @@ public class ListFTP extends ListFileTransfer {
|
|||
final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("21").build();
|
||||
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(LISTING_STRATEGY);
|
||||
properties.add(FILE_TRANSFER_LISTING_STRATEGY);
|
||||
properties.add(HOSTNAME);
|
||||
properties.add(port);
|
||||
properties.add(USERNAME);
|
||||
|
|
|
@ -69,7 +69,10 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
|
|||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.defaultValue(".")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor FILE_TRANSFER_LISTING_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(LISTING_STRATEGY)
|
||||
.allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING, BY_TIME_WINDOW)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
|
||||
|
|
|
@ -82,7 +82,7 @@ public class ListSFTP extends ListFileTransfer {
|
|||
final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build();
|
||||
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(LISTING_STRATEGY);
|
||||
properties.add(FILE_TRANSFER_LISTING_STRATEGY);
|
||||
properties.add(HOSTNAME);
|
||||
properties.add(port);
|
||||
properties.add(USERNAME);
|
||||
|
|
|
@ -202,7 +202,7 @@ public class SFTPTransfer implements FileTransfer {
|
|||
return listing;
|
||||
}
|
||||
|
||||
private void getListing(final String path, final int depth, final int maxResults, final List<FileInfo> listing) throws IOException {
|
||||
protected void getListing(final String path, final int depth, final int maxResults, final List<FileInfo> listing) throws IOException {
|
||||
if (maxResults < 1 || listing.size() >= maxResults) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -19,11 +19,21 @@ package org.apache.nifi.processors.standard;
|
|||
|
||||
import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.list.AbstractListProcessor;
|
||||
import org.apache.nifi.processors.standard.util.FTPTransfer;
|
||||
import org.apache.nifi.processors.standard.util.FileInfo;
|
||||
import org.apache.nifi.processors.standard.util.FileTransfer;
|
||||
import org.apache.nifi.processors.standard.util.SFTPTransfer;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
|
@ -33,6 +43,10 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import org.junit.Rule;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class TestListSFTP {
|
||||
@Rule
|
||||
|
@ -63,6 +77,106 @@ public class TestListSFTP {
|
|||
sftpServer.deleteAllFilesAndDirectories();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListingWhileConcurrentlyWritingIntoMultipleDirectories() throws Exception {
|
||||
AtomicInteger fileCounter = new AtomicInteger(1);
|
||||
|
||||
List<String> createdFileNames = new ArrayList<>();
|
||||
|
||||
CountDownLatch finishScheduledRun = new CountDownLatch(1);
|
||||
CountDownLatch reachScanningSubDir = new CountDownLatch(1);
|
||||
CountDownLatch writeMoreFiles = new CountDownLatch(1);
|
||||
|
||||
String baseDir = "/base/";
|
||||
String subDir = "/base/subdir/";
|
||||
|
||||
TestRunner runner = TestRunners.newTestRunner(new ListSFTP() {
|
||||
@Override
|
||||
protected FileTransfer getFileTransfer(ProcessContext context) {
|
||||
return new SFTPTransfer(context, getLogger()){
|
||||
@Override
|
||||
protected void getListing(String path, int depth, int maxResults, List<FileInfo> listing) throws IOException {
|
||||
if (path.contains("subdir")) {
|
||||
reachScanningSubDir.countDown();
|
||||
try {
|
||||
writeMoreFiles.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
super.getListing(path, depth, maxResults, listing);
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
// This test fails with BY_TIMESTAMPS
|
||||
// runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIMESTAMPS.getValue());
|
||||
runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIME_WINDOW.getValue());
|
||||
runner.setProperty(ListSFTP.HOSTNAME, "localhost");
|
||||
runner.setProperty(ListSFTP.USERNAME, username);
|
||||
runner.setProperty(SFTPTransfer.PASSWORD, password);
|
||||
runner.setProperty(FTPTransfer.PORT, Integer.toString(port));
|
||||
runner.setProperty(ListSFTP.REMOTE_PATH, baseDir);
|
||||
runner.setProperty(FileTransfer.RECURSIVE_SEARCH, "true");
|
||||
|
||||
runner.assertValid();
|
||||
|
||||
ExecutorService executorService = null;
|
||||
try {
|
||||
executorService = Executors.newFixedThreadPool(1);
|
||||
sftpServer.createDirectory("/base");
|
||||
|
||||
uploadFile(baseDir, fileCounter.getAndIncrement(), createdFileNames);
|
||||
uploadFile(subDir, "sub." + fileCounter.getAndIncrement(), createdFileNames);
|
||||
|
||||
executorService.submit(() -> {
|
||||
try {
|
||||
runner.run(1, false);
|
||||
} finally {
|
||||
finishScheduledRun.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
reachScanningSubDir.await();
|
||||
|
||||
uploadFile(baseDir, fileCounter.getAndIncrement(), createdFileNames);
|
||||
Thread.sleep(1100); // Make sure the next file has greater timestamp
|
||||
uploadFile(subDir, "sub." + fileCounter.getAndIncrement(), createdFileNames);
|
||||
|
||||
writeMoreFiles.countDown();
|
||||
|
||||
Thread.sleep(1100); // Need to wait for 1+ sec if the file timestamps have only sec precision.
|
||||
finishScheduledRun.await();
|
||||
runner.run();
|
||||
|
||||
List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||
|
||||
List<String> successFileNames = successFiles.stream()
|
||||
.map(MockFlowFile::getAttributes)
|
||||
.map(attributes -> attributes.get("filename"))
|
||||
.sorted()
|
||||
.collect(Collectors.toList());
|
||||
|
||||
Collections.sort(createdFileNames);
|
||||
|
||||
assertEquals(createdFileNames, successFileNames);
|
||||
} finally {
|
||||
if (executorService != null) {
|
||||
executorService.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void uploadFile(String baseDir, Object fileSuffix, List<String> createdFileNames) throws Exception {
|
||||
String fileName = "file." + fileSuffix;
|
||||
|
||||
sftpServer.putFile(baseDir + fileName, "unimportant", StandardCharsets.UTF_8);
|
||||
|
||||
createdFileNames.add(fileName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void basicFileList() throws InterruptedException {
|
||||
TestRunner runner = TestRunners.newTestRunner(ListSFTP.class);
|
||||
|
|
Loading…
Reference in New Issue