diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index b300b8858a..2100f488d4 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -19,12 +19,14 @@ package org.apache.nifi.processors.hadoop; import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Date; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -57,6 +59,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.hadoop.util.HDFSListing; import org.apache.nifi.processors.hadoop.util.StringSerDe; +import org.apache.nifi.processors.hadoop.util.HDFSListing.StateKeys; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.map.JsonMappingException; @@ -119,8 +122,15 @@ public class ListHDFS extends AbstractHadoopProcessor { .description("All FlowFiles are transferred to this relationship") .build(); - private volatile Long lastListingTime = null; - private volatile Set latestPathsListed = new HashSet<>(); + private volatile long latestTimestampListed = -1L; + private volatile long latestTimestampEmitted = -1L; + private volatile boolean electedPrimaryNodeSinceLastIteration = false; + private volatile long lastRunTimestamp = -1L; + + static final String LISTING_TIMESTAMP_KEY = "listing.timestamp"; + static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp"; + + static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L); @Override protected void init(final ProcessorInitializationContext context) { @@ -155,12 +165,11 @@ public class ListHDFS extends AbstractHadoopProcessor { return getIdentifier() + ".lastListingTime." + directory; } - @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { if (isConfigurationRestored() && descriptor.equals(DIRECTORY)) { - lastListingTime = null; // clear lastListingTime so that we have to fetch new time - latestPathsListed = new HashSet<>(); + latestTimestampEmitted = -1L; + latestTimestampListed = -1L; } } @@ -170,6 +179,14 @@ public class ListHDFS extends AbstractHadoopProcessor { return mapper.readValue(jsonNode, HDFSListing.class); } + /** + * Transitions state from the Distributed cache service to the state manager. This will be + * removed in NiFi 1.x + * + * @param context the ProcessContext + * @throws IOException if unable to communicate with state manager or controller service + */ + @Deprecated @OnScheduled public void moveStateToStateManager(final ProcessContext context) throws IOException { final StateManager stateManager = context.getStateManager(); @@ -184,6 +201,7 @@ public class ListHDFS extends AbstractHadoopProcessor { } } + @Deprecated private HDFSListing getListingFromService(final ProcessContext context) throws IOException { final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); if (client == null) { @@ -204,28 +222,149 @@ public class ListHDFS extends AbstractHadoopProcessor { } } + /** + * Restores state information from the 'old' style of storing state. This is deprecated and will no longer be supported + * in the 1.x NiFi baseline + * + * @param directory the directory that the listing was performed against + * @param remoteListing the remote listing + * @return the minimum timestamp that should be used for new entries + */ + @Deprecated + private Long restoreTimestampFromOldStateFormat(final String directory, final HDFSListing remoteListing) { + // No cluster-wide state has been recovered. Just use whatever values we already have. + if (remoteListing == null) { + return latestTimestampListed; + } + + // If our local timestamp is already later than the remote listing's timestamp, use our local info. + Long minTimestamp = latestTimestampListed; + if (minTimestamp != null && minTimestamp > remoteListing.getLatestTimestamp().getTime()) { + return minTimestamp; + } + + // Use the remote listing's information. + if (minTimestamp == null || electedPrimaryNodeSinceLastIteration) { + this.latestTimestampListed = remoteListing.getLatestTimestamp().getTime(); + this.latestTimestampEmitted = this.latestTimestampListed; + } + + return minTimestamp; + } + + + /** + * Determines which of the given FileStatus's describes a File that should be listed. + * + * @param statuses the eligible FileStatus objects that we could potentially list + * @return a Set containing only those FileStatus objects that we want to list + */ + Set determineListable(final Set statuses) { + final long minTimestamp = this.latestTimestampListed; + final TreeMap> orderedEntries = new TreeMap<>(); + + // Build a sorted map to determine the latest possible entries + for (final FileStatus status : statuses) { + if (status.getPath().getName().endsWith("_COPYING_")) { + continue; + } + + final long entityTimestamp = status.getModificationTime(); + + if (entityTimestamp > latestTimestampListed) { + latestTimestampListed = entityTimestamp; + } + + // New entries are all those that occur at or after the associated timestamp + final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted; + + if (newEntry) { + List entitiesForTimestamp = orderedEntries.get(status.getModificationTime()); + if (entitiesForTimestamp == null) { + entitiesForTimestamp = new ArrayList(); + orderedEntries.put(status.getModificationTime(), entitiesForTimestamp); + } + entitiesForTimestamp.add(status); + } + } + + final Set toList = new HashSet<>(); + + if (orderedEntries.size() > 0) { + long latestListingTimestamp = orderedEntries.lastKey(); + + // If the last listing time is equal to the newest entries previously seen, + // another iteration has occurred without new files and special handling is needed to avoid starvation + if (latestListingTimestamp == minTimestamp) { + // We are done if the latest listing timestamp is equal to the last processed time, + // meaning we handled those items originally passed over + if (latestListingTimestamp == latestTimestampEmitted) { + return Collections.emptySet(); + } + } else { + // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data + orderedEntries.remove(latestListingTimestamp); + } + + for (List timestampEntities : orderedEntries.values()) { + for (FileStatus status : timestampEntities) { + toList.add(status); + } + } + } + + return toList; + } + + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + // We have to ensure that we don't continually perform listings, because if we perform two listings within + // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do + // not let that happen. + final long now = System.nanoTime(); + if (now - lastRunTimestamp < LISTING_LAG_NANOS) { + lastRunTimestamp = now; + context.yield(); + return; + } + lastRunTimestamp = now; + + final String directory = context.getProperty(DIRECTORY).getValue(); + // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files. - Long minTimestamp = null; try { - final HDFSListing stateListing; final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER); if (stateMap.getVersion() == -1L) { - stateListing = null; - latestPathsListed = new HashSet<>(); - lastListingTime = null; + latestTimestampEmitted = -1L; + latestTimestampListed = -1L; + getLogger().debug("Found no state stored"); } else { - final Map stateValues = stateMap.toMap(); - stateListing = HDFSListing.fromMap(stateValues); + // Determine if state is stored in the 'new' format or the 'old' format + final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY); + if (emittedString == null && stateMap.get(StateKeys.TIMESTAMP) != null) { + // state is stored in the old format with XML + final Map stateValues = stateMap.toMap(); + final HDFSListing stateListing = HDFSListing.fromMap(stateValues); + getLogger().debug("Found old-style state stored"); + restoreTimestampFromOldStateFormat(directory, stateListing); + } else if (emittedString == null) { + latestTimestampEmitted = -1L; + latestTimestampListed = -1L; + getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1"); + } else { + // state is stored in the new format, using just two timestamps + latestTimestampEmitted = Long.parseLong(emittedString); + final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY); + if (listingTimestmapString != null) { + latestTimestampListed = Long.parseLong(listingTimestmapString); + } - if (stateListing != null) { - latestPathsListed = stateListing.toPaths(); - lastListingTime = minTimestamp = stateListing.getLatestTimestamp().getTime(); + getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}", + new Object[] {latestTimestampEmitted, latestTimestampListed}); } } - } catch (final IOException ioe) { getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); context.yield(); @@ -234,80 +373,51 @@ public class ListHDFS extends AbstractHadoopProcessor { // Pull in any file that is newer than the timestamp that we have. final FileSystem hdfs = getFileSystem(); - final String directory = context.getProperty(DIRECTORY).getValue(); final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); final Path rootPath = new Path(directory); - int listCount = 0; - Long latestListingModTime = null; final Set statuses; try { statuses = getStatuses(rootPath, recursive, hdfs); - for ( final FileStatus status : statuses ) { - // don't get anything where the last modified timestamp is equal to our current timestamp. - // if we do, then we run the risk of multiple files having the same last mod date but us only - // seeing a portion of them. - // I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe - // only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a - // modified date not before the current time. - final long fileModTime = status.getModificationTime(); - - // we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time. - // Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those. - boolean fetch = !status.getPath().getName().endsWith("_COPYING_") - && (minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath()))); - - // Create the FlowFile for this path. - if ( fetch ) { - final Map attributes = createAttributes(status); - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, REL_SUCCESS); - listCount++; - - if ( latestListingModTime == null || fileModTime > latestListingModTime ) { - latestListingModTime = fileModTime; - } - } - } + getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()}); } catch (final IOException ioe) { getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe}); return; } + final Set listable = determineListable(statuses); + getLogger().debug("Of the {} files found in HDFS, {} are listable", new Object[] {statuses.size(), listable.size()}); + + for (final FileStatus status : listable) { + final Map attributes = createAttributes(status); + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + + final long fileModTime = status.getModificationTime(); + if (fileModTime > latestTimestampEmitted) { + latestTimestampEmitted = fileModTime; + } + } + + final int listCount = listable.size(); if ( listCount > 0 ) { getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount}); session.commit(); - - // We have performed a listing and pushed the FlowFiles out. - // Now, we need to persist state about the Last Modified timestamp of the newest file - // that we pulled in. We do this in order to avoid pulling in the same file twice. - // However, we want to save the state both locally and remotely. - // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the - // previously Primary Node left off. - final HDFSListing latestListing = createListing(latestListingModTime, statuses); - - try { - context.getStateManager().setState(latestListing.toMap(), Scope.CLUSTER); - } catch (final IOException ioe) { - getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe); - } - - lastListingTime = latestListingModTime; - latestPathsListed.clear(); - for (final FileStatus status : statuses) { - latestPathsListed.add(status.getPath()); - } } else { getLogger().debug("There is no data to list. Yielding."); context.yield(); + } - // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system - if ( lastListingTime == null ) { - lastListingTime = 0L; - } + final Map updatedState = new HashMap<>(1); + updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(latestTimestampListed)); + updatedState.put(EMITTED_TIMESTAMP_KEY, String.valueOf(latestTimestampEmitted)); + getLogger().debug("New state map: {}", new Object[] {updatedState}); - return; + try { + context.getStateManager().setState(updatedState, Scope.CLUSTER); + } catch (final IOException ioe) { + getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe); } } @@ -334,20 +444,6 @@ public class ListHDFS extends AbstractHadoopProcessor { return statusSet; } - private HDFSListing createListing(final long latestListingModTime, final Set statuses) { - final Set paths = new HashSet<>(); - for (final FileStatus status : statuses) { - final String path = status.getPath().toUri().toString(); - paths.add(path); - } - - final HDFSListing listing = new HDFSListing(); - listing.setLatestTimestamp(new Date(latestListingModTime)); - listing.setMatchingPaths(paths); - - return listing; - } - private String getAbsolutePath(final Path path) { final Path parent = path.getParent(); final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent); @@ -373,19 +469,19 @@ public class ListHDFS extends AbstractHadoopProcessor { private String getPerms(final FsAction action) { final StringBuilder sb = new StringBuilder(); - if ( action.implies(FsAction.READ) ) { + if (action.implies(FsAction.READ)) { sb.append("r"); } else { sb.append("-"); } - if ( action.implies(FsAction.WRITE) ) { + if (action.implies(FsAction.WRITE)) { sb.append("w"); } else { sb.append("-"); } - if ( action.implies(FsAction.EXECUTE) ) { + if (action.implies(FsAction.EXECUTE)) { sb.append("x"); } else { sb.append("-"); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java index 6c944c822b..7a77f068aa 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -17,8 +17,6 @@ package org.apache.nifi.processors.hadoop; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileNotFoundException; @@ -31,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -74,9 +73,14 @@ public class TestListHDFS { } @Test - public void testListingHasCorrectAttributes() { + public void testListingHasCorrectAttributes() throws InterruptedException { proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + // first iteration will not pick up files because it has to instead check timestamps. + // We must then wait long enough to ensure that the listing can be performed safely and + // run the Processor again. + runner.run(); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); runner.run(); runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); @@ -87,12 +91,17 @@ public class TestListHDFS { @Test - public void testRecursive() { + public void testRecursive() throws InterruptedException { proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt"))); + // first iteration will not pick up files because it has to instead check timestamps. + // We must then wait long enough to ensure that the listing can be performed safely and + // run the Processor again. + runner.run(); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); runner.run(); runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); @@ -113,13 +122,18 @@ public class TestListHDFS { } @Test - public void testNotRecursive() { + public void testNotRecursive() throws InterruptedException { runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false"); proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt"))); + // first iteration will not pick up files because it has to instead check timestamps. + // We must then wait long enough to ensure that the listing can be performed safely and + // run the Processor again. + runner.run(); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); runner.run(); runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); @@ -131,9 +145,14 @@ public class TestListHDFS { @Test - public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() throws IOException { + public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() throws IOException, InterruptedException { proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + // first iteration will not pick up files because it has to instead check timestamps. + // We must then wait long enough to ensure that the listing can be performed safely and + // run the Processor again. + runner.run(); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); runner.run(); runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); @@ -145,7 +164,7 @@ public class TestListHDFS { runner.clearTransferState(); // add new file to pull - proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt"))); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 2000L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt"))); // cause calls to service to fail service.failOnCalls = true; @@ -172,23 +191,55 @@ public class TestListHDFS { service.failOnCalls = false; runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, false); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); + Map newState = runner.getStateManager().getState(Scope.CLUSTER).toMap(); + assertEquals("2000", newState.get(ListHDFS.LISTING_TIMESTAMP_KEY)); + assertEquals("1999", newState.get(ListHDFS.EMITTED_TIMESTAMP_KEY)); + + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + newState = runner.getStateManager().getState(Scope.CLUSTER).toMap(); + assertEquals("2000", newState.get(ListHDFS.LISTING_TIMESTAMP_KEY)); + assertEquals("2000", newState.get(ListHDFS.EMITTED_TIMESTAMP_KEY)); + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + } - // ensure state saved - runner.getStateManager().assertStateSet(Scope.CLUSTER); - final Map newState = runner.getStateManager().getState(Scope.CLUSTER).toMap(); - assertEquals(3, newState.size()); + @Test + public void testOnlyNewestEntriesHeldBack() throws InterruptedException { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 8L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt"))); - final String path0 = newState.get("path.0"); - final String path1 = newState.get("path.1"); - assertTrue(path0.equals("/test/testFile.txt") || path0.equals("/test/testFile2.txt")); - assertTrue(path1.equals("/test/testFile.txt") || path1.equals("/test/testFile2.txt")); - assertNotSame(path0, path1); + // this is a directory, so it won't be counted toward the entries + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 8L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 100L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 100L, 0L, create777(), "owner", "group", new Path("/test/testDir/2.txt"))); - final Long timestamp = Long.parseLong(newState.get("timestamp")); - assertEquals(1999L, timestamp.longValue()); + // The first iteration should pick up 2 files with the smaller timestamps. + runner.run(); + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); + + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + // Next iteration should pick up the other 2 files, since nothing else was added. + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 4); + + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 110L, 0L, create777(), "owner", "group", new Path("/test/testDir/3.txt"))); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 4); + + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 5); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java index a85ff2ad71..68ed22fb91 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java @@ -53,6 +53,30 @@ public class TestAbstractListProcessor { @Rule public final TemporaryFolder testFolder = new TemporaryFolder(); + @Test + public void testPreviouslySkippedEntriesEmittedOnNextIteration() throws Exception { + final ConcreteListProcessor proc = new ConcreteListProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.run(); + + final long initialTimestamp = System.currentTimeMillis(); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + proc.addEntity("name", "id", initialTimestamp); + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + + // First run, the above listed entries would be skipped to avoid write synchronization issues + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + + // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + // Run again without introducing any new entries + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + } + @Test public void testOnlyNewEntriesEmitted() throws Exception { final ConcreteListProcessor proc = new ConcreteListProcessor();