From 8c488d7e8efcc2029079bb33aa3b859ba44b931f Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 4 Mar 2016 16:06:42 -0500 Subject: [PATCH] NIFI-1587: Always poll state from State Manager when running ListHDFS instead of relying on local state over the cluster state Signed-off-by: joewitt --- .../nifi/processors/hadoop/ListHDFS.java | 44 +++++-------------- .../nifi/processors/hadoop/TestListHDFS.java | 4 -- 2 files changed, 10 insertions(+), 38 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index d624e6f591..b300b8858a 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -42,8 +42,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; -import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; @@ -123,7 +121,6 @@ public class ListHDFS extends AbstractHadoopProcessor { private volatile Long lastListingTime = null; private volatile Set latestPathsListed = new HashSet<>(); - private volatile boolean electedPrimaryNode = false; @Override protected void init(final ProcessorInitializationContext context) { @@ -158,12 +155,6 @@ public class ListHDFS extends AbstractHadoopProcessor { return getIdentifier() + ".lastListingTime." + directory; } - @OnPrimaryNodeStateChange - public void onPrimaryNodeChange(final PrimaryNodeState newState) { - if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) { - electedPrimaryNode = true; - } - } @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { @@ -214,44 +205,27 @@ public class ListHDFS extends AbstractHadoopProcessor { } - private Long getMinTimestamp(final String directory, final HDFSListing remoteListing) throws IOException { - // No cluster-wide state has been recovered. Just use whatever values we already have. - if (remoteListing == null) { - return lastListingTime; - } - - // If our local timestamp is already later than the remote listing's timestamp, use our local info. - Long minTimestamp = lastListingTime; - if (minTimestamp != null && minTimestamp > remoteListing.getLatestTimestamp().getTime()) { - return minTimestamp; - } - - // Use the remote listing's information. - if (minTimestamp == null || electedPrimaryNode) { - this.latestPathsListed = remoteListing.toPaths(); - this.lastListingTime = remoteListing.getLatestTimestamp().getTime(); - } - - return minTimestamp; - } - @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final String directory = context.getProperty(DIRECTORY).getValue(); - // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files. - final Long minTimestamp; + Long minTimestamp = null; try { final HDFSListing stateListing; final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER); if (stateMap.getVersion() == -1L) { stateListing = null; + latestPathsListed = new HashSet<>(); + lastListingTime = null; } else { final Map stateValues = stateMap.toMap(); stateListing = HDFSListing.fromMap(stateValues); + + if (stateListing != null) { + latestPathsListed = stateListing.toPaths(); + lastListingTime = minTimestamp = stateListing.getLatestTimestamp().getTime(); + } } - minTimestamp = getMinTimestamp(directory, stateListing); } catch (final IOException ioe) { getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); context.yield(); @@ -260,6 +234,7 @@ public class ListHDFS extends AbstractHadoopProcessor { // Pull in any file that is newer than the timestamp that we have. final FileSystem hdfs = getFileSystem(); + final String directory = context.getProperty(DIRECTORY).getValue(); final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); final Path rootPath = new Path(directory); @@ -339,6 +314,7 @@ public class ListHDFS extends AbstractHadoopProcessor { private Set getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException { final Set statusSet = new HashSet<>(); + getLogger().debug("Fetching listing for {}", new Object[] {path}); final FileStatus[] statuses = hdfs.listStatus(path); for ( final FileStatus status : statuses ) { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java index add89e8d68..6c944c822b 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; -import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.state.Scope; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; @@ -148,9 +147,6 @@ public class TestListHDFS { // add new file to pull proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt"))); - // trigger primary node change - proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE); - // cause calls to service to fail service.failOnCalls = true;