From e559c68766b3e7377f783f76ea1a980ecd18f35b Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 12 Jan 2016 10:21:47 -0500 Subject: [PATCH] NIFI-259: Began updating processors to use new state management --- .../apache/nifi/state/MockStateManager.java | 87 +++++- .../local/WriteAheadLocalStateProvider.java | 34 ++- .../nifi/processors/hadoop/ListHDFS.java | 240 ++++++----------- .../processors/hadoop/util/HDFSListing.java | 45 ++++ .../nifi/processors/hadoop/TestListHDFS.java | 62 +++-- .../standard/AbstractListProcessor.java | 7 + .../nifi/processors/standard/ListFile.java | 24 ++ .../nifi/processors/standard/ListSFTP.java | 8 + .../nifi/processors/standard/TailFile.java | 251 ++++++++++++------ .../standard/TestAbstractListProcessor.java | 5 + 10 files changed, 491 insertions(+), 272 deletions(-) diff --git a/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java index c014ed069c..cf9d50ccd8 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java +++ b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java @@ -17,6 +17,7 @@ package org.apache.nifi.state; +import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -33,8 +34,28 @@ public class MockStateManager implements StateManager { private StateMap localStateMap = new MockStateMap(null, -1L); private StateMap clusterStateMap = new MockStateMap(null, -1L); + private volatile boolean failToGetLocalState = false; + private volatile boolean failToSetLocalState = false; + private volatile boolean failToGetClusterState = false; + private volatile boolean failToSetClusterState = false; + + private void verifyCanSet(final Scope scope) throws IOException { + final boolean failToSet = (scope == Scope.LOCAL) ? failToSetLocalState : failToSetClusterState; + if (failToSet) { + throw new IOException("Unit Test configured to throw IOException if " + scope + " State is set"); + } + } + + private void verifyCanGet(final Scope scope) throws IOException { + final boolean failToGet = (scope == Scope.LOCAL) ? failToGetLocalState : failToGetClusterState; + if (failToGet) { + throw new IOException("Unit Test configured to throw IOException if " + scope + " State is retrieved"); + } + } + @Override - public synchronized void setState(final Map state, final Scope scope) { + public synchronized void setState(final Map state, final Scope scope) throws IOException { + verifyCanSet(scope); final StateMap stateMap = new MockStateMap(state, versionIndex.incrementAndGet()); if (scope == Scope.CLUSTER) { @@ -45,7 +66,12 @@ public class MockStateManager implements StateManager { } @Override - public synchronized StateMap getState(final Scope scope) { + public synchronized StateMap getState(final Scope scope) throws IOException { + verifyCanGet(scope); + return retrieveState(scope); + } + + private synchronized StateMap retrieveState(final Scope scope) { if (scope == Scope.CLUSTER) { return clusterStateMap; } else { @@ -54,9 +80,10 @@ public class MockStateManager implements StateManager { } @Override - public synchronized boolean replace(final StateMap oldValue, final Map newValue, final Scope scope) { + public synchronized boolean replace(final StateMap oldValue, final Map newValue, final Scope scope) throws IOException { if (scope == Scope.CLUSTER) { if (oldValue == clusterStateMap) { + verifyCanSet(scope); clusterStateMap = new MockStateMap(newValue, versionIndex.incrementAndGet()); return true; } @@ -64,6 +91,7 @@ public class MockStateManager implements StateManager { return false; } else { if (oldValue == localStateMap) { + verifyCanSet(scope); localStateMap = new MockStateMap(newValue, versionIndex.incrementAndGet()); return true; } @@ -73,13 +101,19 @@ public class MockStateManager implements StateManager { } @Override - public synchronized void clear(final Scope scope) { + public synchronized void clear(final Scope scope) throws IOException { setState(Collections. emptyMap(), scope); } private String getValue(final String key, final Scope scope) { - final StateMap stateMap = getState(scope); + final StateMap stateMap; + if (scope == Scope.CLUSTER) { + stateMap = clusterStateMap; + } else { + stateMap = localStateMap; + } + return stateMap.get(key); } @@ -104,7 +138,7 @@ public class MockStateManager implements StateManager { * @param scope the scope to compare the stateValues against */ public void assertStateEquals(final Map stateValues, final Scope scope) { - final StateMap stateMap = getState(scope); + final StateMap stateMap = retrieveState(scope); Assert.assertEquals(stateValues, stateMap.toMap()); } @@ -115,7 +149,7 @@ public class MockStateManager implements StateManager { * @param scope the scope to compare the stateValues against */ public void assertStateNotEquals(final Map stateValues, final Scope scope) { - final StateMap stateMap = getState(scope); + final StateMap stateMap = retrieveState(scope); Assert.assertNotSame(stateValues, stateMap.toMap()); } @@ -157,16 +191,49 @@ public class MockStateManager implements StateManager { */ public void assertStateSet(final Scope scope) { final StateMap stateMap = (scope == Scope.CLUSTER) ? clusterStateMap : localStateMap; - Assert.assertEquals("Expected state to be set for Scope " + scope + ", but it was not set", -1L, stateMap.getVersion()); + Assert.assertNotSame("Expected state to be set for Scope " + scope + ", but it was not set", -1L, stateMap.getVersion()); } /** * Ensures that the state was not set for the given scope - * + * * @param scope the scope */ public void assertStateNotSet(final Scope scope) { final StateMap stateMap = (scope == Scope.CLUSTER) ? clusterStateMap : localStateMap; - Assert.assertNotSame("Expected state not to be set for Scope " + scope + ", but it was set", -1L, stateMap.getVersion()); + Assert.assertEquals("Expected state not to be set for Scope " + scope + ", but it was set", -1L, stateMap.getVersion()); + } + + /** + * Specifies whether or not the State Manager should throw an IOException when state is set for the given scope. + * Note that calls to {@link #replace(StateMap, Map, Scope)} will fail only if the state would be set (i.e., if + * we call replace and the StateMap does not match the old value, it will not fail). + * + * Also note that if setting state is set to fail, clearing will also fail, as clearing is thought of as setting the + * state to empty + * + * @param scope the scope that should (or should not) fail + * @param fail whether or not setting state should fail + */ + public void setFailOnStateSet(final Scope scope, final boolean fail) { + if (scope == Scope.LOCAL) { + failToSetLocalState = fail; + } else { + failToSetClusterState = fail; + } + } + + /** + * Specifies whether or not the State Manager should throw an IOException when state is retrieved for the given scope. + * + * @param scope the scope that should (or should not) fail + * @param fail whether or not retrieving state should fail + */ + public void setFailOnStateGet(final Scope scope, final boolean fail) { + if (scope == Scope.LOCAL) { + failToGetLocalState = fail; + } else { + failToGetClusterState = fail; + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java index c23e517e72..ba8e4d9da9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java @@ -27,6 +27,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -50,11 +53,9 @@ import org.wali.WriteAheadRepository; public class WriteAheadLocalStateProvider extends AbstractStateProvider { private static final Logger logger = LoggerFactory.getLogger(WriteAheadLocalStateProvider.class); - // TODO: CREATE BACKGROUND THREAD OR USE EXECUTOR (in StateProviderInitializationContext?) to schedule checkpointing. - private static final long CHECKPOINT_NANOS = TimeUnit.MINUTES.toNanos(2); - private final StateMapSerDe serde; private final ConcurrentMap componentProviders = new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory()); static final PropertyDescriptor PATH = new PropertyDescriptor.Builder() .name("Directory") @@ -115,6 +116,8 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider { // expensive than just keeping track of a local 'long' variable. Since we won't actually increment this at any point until this after // the init() method completes, this is okay to do. versionGenerator.set(maxRecordVersion); + + executor.scheduleWithFixedDelay(new CheckpointTask(), 2, 2, TimeUnit.MINUTES); } @Override @@ -221,4 +224,29 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider { wal.update(Collections.singleton(update), false); } } + + private class CheckpointTask implements Runnable { + @Override + public void run() { + try { + logger.debug("Checkpointing Write-Ahead Log used to store components' state"); + + writeAheadLog.checkpoint(); + } catch (final IOException e) { + logger.error("Failed to checkpoint Write-Ahead Log used to store components' state", e); + } + } + } + + private static class NamedThreadFactory implements ThreadFactory { + private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(final Runnable r) { + final Thread t = defaultFactory.newThread(r); + t.setName("Write-Ahead Local State Provider Maintenance"); + t.setDaemon(true); + return t; + } + } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index ea2d39776e..44e4a036e2 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -17,18 +17,13 @@ package org.apache.nifi.processors.hadoop; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import org.apache.hadoop.fs.FileStatus; @@ -45,9 +40,13 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -59,7 +58,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.hadoop.util.HDFSListing; import org.apache.nifi.processors.hadoop.util.StringSerDe; -import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.map.JsonMappingException; @@ -93,7 +91,7 @@ public class ListHDFS extends AbstractHadoopProcessor { .name("Distributed Cache Service") .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node " + "begins pulling data, it won't duplicate all of the work that has been done.") - .required(true) + .required(false) .identifiesControllerService(DistributedMapCacheClient.class) .build(); @@ -176,70 +174,61 @@ public class ListHDFS extends AbstractHadoopProcessor { return mapper.readValue(jsonNode, HDFSListing.class); } + @OnScheduled + public void moveStateToStateManager(final ProcessContext context) throws IOException { + final StateManager stateManager = context.getStateManager(); + final StateMap stateMap = stateManager.getState(Scope.CLUSTER); - private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException { - // Determine the timestamp for the last file that we've listed. + // Check if we have already stored state in the cluster state manager. + if (stateMap.getVersion() == -1L) { + final HDFSListing serviceListing = getListingFromService(context); + if (serviceListing != null) { + persistState(serviceListing, context.getStateManager()); + } + } + } + + private HDFSListing getListingFromService(final ProcessContext context) throws IOException { + final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + if (client == null) { + return null; + } + + final String directory = context.getProperty(DIRECTORY).getValue(); + final String remoteValue = client.get(getKey(directory), new StringSerDe(), new StringSerDe()); + if (remoteValue == null) { + return null; + } + + try { + return deserialize(remoteValue); + } catch (final Exception e) { + getLogger().error("Failed to retrieve state from Distributed Map Cache because the content that was retrieved could not be understood", e); + return null; + } + } + + private void persistState(final HDFSListing listing, final StateManager stateManager) throws IOException { + final Map stateValues = listing.toMap(); + stateManager.setState(stateValues, Scope.CLUSTER); + } + + private Long getMinTimestamp(final String directory, final HDFSListing remoteListing) throws IOException { + // No cluster-wide state has been recovered. Just use whatever values we already have. + if (remoteListing == null) { + return lastListingTime; + } + + // If our local timestamp is already later than the remote listing's timestamp, use our local info. Long minTimestamp = lastListingTime; - if ( minTimestamp == null || electedPrimaryNode ) { - // We haven't yet restored any state from local or distributed state - or it's been at least a minute since - // we have performed a listing. In this case, - // First, attempt to get timestamp from distributed cache service. - try { - final StringSerDe serde = new StringSerDe(); - final String serializedState = client.get(getKey(directory), serde, serde); - if ( serializedState == null || serializedState.isEmpty() ) { - minTimestamp = null; - this.latestPathsListed = Collections.emptySet(); - } else { - final HDFSListing listing = deserialize(serializedState); - this.lastListingTime = listing.getLatestTimestamp().getTime(); - minTimestamp = listing.getLatestTimestamp().getTime(); - this.latestPathsListed = listing.toPaths(); - } + if (minTimestamp != null && minTimestamp > remoteListing.getLatestTimestamp().getTime()) { + return minTimestamp; + } - this.lastListingTime = minTimestamp; - electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore. - } catch (final IOException ioe) { - throw ioe; - } - - // Check the persistence file. We want to use the latest timestamp that we have so that - // we don't duplicate data. - try { - final File persistenceFile = getPersistenceFile(); - if ( persistenceFile.exists() ) { - try (final FileInputStream fis = new FileInputStream(persistenceFile)) { - final Properties props = new Properties(); - props.load(fis); - - // get the local timestamp for this directory, if it exists. - final String locallyPersistedValue = props.getProperty(directory); - if ( locallyPersistedValue != null ) { - final HDFSListing listing = deserialize(locallyPersistedValue); - final long localTimestamp = listing.getLatestTimestamp().getTime(); - - // If distributed state doesn't have an entry or the local entry is later than the distributed state, - // update the distributed state so that we are in sync. - if (minTimestamp == null || localTimestamp > minTimestamp) { - minTimestamp = localTimestamp; - - // Our local persistence file shows a later time than the Distributed service. - // Update the distributed service to match our local state. - try { - final StringSerDe serde = new StringSerDe(); - client.put(getKey(directory), locallyPersistedValue, serde, serde); - } catch (final IOException ioe) { - getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed " - + "state due to {}. If a new node performs HDFS Listing, data duplication may occur", - new Object[] {directory, locallyPersistedValue, ioe}); - } - } - } - } - } - } catch (final IOException ioe) { - getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe); - } + // Use the remote listing's information. + if (minTimestamp == null || electedPrimaryNode) { + this.latestPathsListed = remoteListing.toPaths(); + this.lastListingTime = remoteListing.getLatestTimestamp().getTime(); } return minTimestamp; @@ -248,11 +237,20 @@ public class ListHDFS extends AbstractHadoopProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final String directory = context.getProperty(DIRECTORY).getValue(); - final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files. final Long minTimestamp; try { - minTimestamp = getMinTimestamp(directory, client); + final HDFSListing stateListing; + final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER); + if (stateMap.getVersion() == -1L) { + stateListing = null; + } else { + final Map stateValues = stateMap.toMap(); + stateListing = HDFSListing.fromMap(stateValues); + } + + minTimestamp = getMinTimestamp(directory, stateListing); } catch (final IOException ioe) { getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); context.yield(); @@ -311,32 +309,19 @@ public class ListHDFS extends AbstractHadoopProcessor { // However, we want to save the state both locally and remotely. // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the // previously Primary Node left off. - // We also store the state locally so that if the node is restarted, and the node cannot contact - // the distributed state cache, the node can continue to run (if it is primary node). - String serializedState = null; + final HDFSListing latestListing = createListing(latestListingModTime, statuses); + try { - serializedState = serializeState(latestListingModTime, statuses); - } catch (final Exception e) { - getLogger().error("Failed to serialize state due to {}", new Object[] {e}); - } - - if ( serializedState != null ) { - // Save our state locally. - try { - persistLocalState(directory, serializedState); - } catch (final IOException ioe) { - getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe); - } - - // Attempt to save state to remote server. - try { - client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe()); - } catch (final IOException ioe) { - getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe); - } + context.getStateManager().setState(latestListing.toMap(), Scope.CLUSTER); + } catch (final IOException ioe) { + getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe); } lastListingTime = latestListingModTime; + latestPathsListed.clear(); + for (final FileStatus status : statuses) { + latestPathsListed.add(status.getPath()); + } } else { getLogger().debug("There is no data to list. Yielding."); context.yield(); @@ -372,71 +357,18 @@ public class ListHDFS extends AbstractHadoopProcessor { return statusSet; } - - private String serializeState(final long latestListingTime, final Set statuses) throws JsonGenerationException, JsonMappingException, IOException { - // we need to keep track of all files that we pulled in that had a modification time equal to - // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files - // that have a mod time equal to that timestamp because more files may come in with the same timestamp - // later in the same millisecond. - if ( statuses.isEmpty() ) { - return null; - } else { - final List sortedStatuses = new ArrayList<>(statuses); - Collections.sort(sortedStatuses, new Comparator() { - @Override - public int compare(final FileStatus o1, final FileStatus o2) { - return Long.compare(o1.getModificationTime(), o2.getModificationTime()); - } - }); - - final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime(); - final Set pathsWithModTimeEqualToListingModTime = new HashSet<>(); - for (int i=sortedStatuses.size() - 1; i >= 0; i--) { - final FileStatus status = sortedStatuses.get(i); - if (status.getModificationTime() == latestListingModTime) { - pathsWithModTimeEqualToListingModTime.add(status.getPath()); - } - } - - this.latestPathsListed = pathsWithModTimeEqualToListingModTime; - - final HDFSListing listing = new HDFSListing(); - listing.setLatestTimestamp(new Date(latestListingModTime)); - final Set paths = new HashSet<>(); - for ( final Path path : pathsWithModTimeEqualToListingModTime ) { - paths.add(path.toUri().toString()); - } - listing.setMatchingPaths(paths); - - final ObjectMapper mapper = new ObjectMapper(); - final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing); - return serializedState; - } - } - - protected void persistLocalState(final String directory, final String serializedState) throws IOException { - // we need to keep track of all files that we pulled in that had a modification time equal to - // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files - // that have a mod time equal to that timestamp because more files may come in with the same timestamp - // later in the same millisecond. - final File persistenceFile = getPersistenceFile(); - final File dir = persistenceFile.getParentFile(); - if ( !dir.exists() && !dir.mkdirs() ) { - throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state"); + private HDFSListing createListing(final long latestListingModTime, final Set statuses) { + final Set paths = new HashSet<>(); + for (final FileStatus status : statuses) { + final String path = status.getPath().toUri().toString(); + paths.add(path); } - final Properties props = new Properties(); - if ( persistenceFile.exists() ) { - try (final FileInputStream fis = new FileInputStream(persistenceFile)) { - props.load(fis); - } - } + final HDFSListing listing = new HDFSListing(); + listing.setLatestTimestamp(new Date(latestListingModTime)); + listing.setMatchingPaths(paths); - props.setProperty(directory, serializedState); - - try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) { - props.store(fos, null); - } + return listing; } private String getAbsolutePath(final Path path) { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java index a4d957a1a7..27040bf010 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java @@ -16,9 +16,12 @@ */ package org.apache.nifi.processors.hadoop.util; +import java.util.ArrayList; import java.util.Collection; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import javax.xml.bind.annotation.XmlTransient; @@ -35,6 +38,11 @@ public class HDFSListing { private Date latestTimestamp; private Collection matchingPaths; + public static class StateKeys { + public static final String TIMESTAMP = "timestamp"; + public static final String PATH_PREFIX = "path."; + } + /** * @return the modification date of the newest file that was contained in the HDFS Listing */ @@ -80,4 +88,41 @@ public class HDFSListing { this.matchingPaths = matchingPaths; } + /** + * Converts this HDFSListing into a Map so that it can be stored in a StateManager. + * + * @return a Map that represents the same information as this HDFSListing + */ + public Map toMap() { + final Map map = new HashMap<>(1 + matchingPaths.size()); + map.put(StateKeys.TIMESTAMP, String.valueOf(latestTimestamp.getTime())); + + int counter = 0; + for (final String path : matchingPaths) { + map.put(StateKeys.PATH_PREFIX + String.valueOf(counter++), path); + } + + return map; + } + + public static HDFSListing fromMap(final Map map) { + if (map == null || map.isEmpty()) { + return null; + } + + final String timestampValue = map.get(StateKeys.TIMESTAMP); + final long timestamp = Long.parseLong(timestampValue); + + final Collection matchingPaths = new ArrayList<>(map.size() - 1); + for (final Map.Entry entry : map.entrySet()) { + if (entry.getKey().startsWith(StateKeys.PATH_PREFIX)) { + matchingPaths.add(entry.getValue()); + } + } + + final HDFSListing listing = new HDFSListing(); + listing.setLatestTimestamp(new Date(timestamp)); + listing.setMatchingPaths(matchingPaths); + return listing; + } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java index c301bf3022..add89e8d68 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.processors.hadoop; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.components.state.Scope; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; @@ -131,8 +132,8 @@ public class TestListHDFS { @Test - public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() { - proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() throws IOException { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); runner.run(); @@ -145,7 +146,7 @@ public class TestListHDFS { runner.clearTransferState(); // add new file to pull - proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt"))); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt"))); // trigger primary node change proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE); @@ -153,29 +154,45 @@ public class TestListHDFS { // cause calls to service to fail service.failOnCalls = true; - runner.run(); - runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); + runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true); - runner.run(); - runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); - - final String key = proc.getKey("/test"); - - // wait just to a bit to ensure that the timestamp changes when we update the service - final Object curVal = service.values.get(key); + // Should fail to perform @OnScheduled methods. try { - Thread.sleep(10L); - } catch (final InterruptedException ie) { + runner.run(); + Assert.fail("Processor ran successfully"); + } catch (final AssertionError e) { } + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); + + // Should fail to perform @OnScheduled methods. + try { + runner.run(); + Assert.fail("Processor ran successfully"); + } catch (final AssertionError e) { + } + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); + service.failOnCalls = false; + runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, false); + runner.run(); runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); - // ensure state saved both locally & remotely - assertTrue(proc.localStateSaved); - assertNotNull(service.values.get(key)); - assertNotSame(curVal, service.values.get(key)); + // ensure state saved + runner.getStateManager().assertStateSet(Scope.CLUSTER); + final Map newState = runner.getStateManager().getState(Scope.CLUSTER).toMap(); + assertEquals(3, newState.size()); + + final String path0 = newState.get("path.0"); + final String path1 = newState.get("path.1"); + assertTrue(path0.equals("/test/testFile.txt") || path0.equals("/test/testFile2.txt")); + assertTrue(path1.equals("/test/testFile.txt") || path1.equals("/test/testFile2.txt")); + assertNotSame(path0, path1); + + final Long timestamp = Long.parseLong(newState.get("timestamp")); + assertEquals(1999L, timestamp.longValue()); } @@ -186,7 +203,6 @@ public class TestListHDFS { private class ListHDFSWithMockedFileSystem extends ListHDFS { private final MockFileSystem fileSystem = new MockFileSystem(); - private boolean localStateSaved = false; @Override protected FileSystem getFileSystem() { @@ -202,12 +218,6 @@ public class TestListHDFS { protected FileSystem getFileSystem(final Configuration config) throws IOException { return fileSystem; } - - @Override - protected void persistLocalState(final String directory, final String serializedState) throws IOException { - super.persistLocalState(directory, serializedState); - localStateSaved = true; - } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java index 20969c9c7a..e27e5feb1a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java @@ -463,6 +463,13 @@ public abstract class AbstractListProcessor extends Ab */ protected abstract boolean isListingResetNecessary(final PropertyDescriptor property); + /** + * Returns a Scope that specifies where the state should be managed for this Processor + * + * @param context the ProcessContext to use in order to make a determination + * @return a Scope that specifies where the state should be managed for this Processor + */ + protected abstract Scope getStateScope(final ProcessContext context); private static class StringSerDe implements Serializer, Deserializer { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java index c9f4482a99..ed661ae92a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java @@ -53,7 +53,9 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; @@ -93,6 +95,9 @@ import org.apache.nifi.processors.standard.util.FileInfo; }) @SeeAlso({GetFile.class, PutFile.class, FetchFile.class}) public class ListFile extends AbstractListProcessor { + static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "Input Directory is located on a local disk. State will be stored locally on each node in the cluster."); + static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote", "Input Directory is located on a remote system. State will be stored across the cluster so that " + + "the listing can be performed on Primary Node Only and another node can pick up where the last node left off, if the Primary Node changes"); public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() .name("Input Directory") @@ -110,6 +115,14 @@ public class ListFile extends AbstractListProcessor { .defaultValue("true") .build(); + public static final PropertyDescriptor DIRECTORY_LOCATION = new PropertyDescriptor.Builder() + .name("Input Directory Location") + .description("Specifies where the Input Directory is located. This is used to determine whether state should be stored locally or across the cluster.") + .allowableValues(LOCATION_LOCAL, LOCATION_REMOTE) + .defaultValue(LOCATION_LOCAL.getValue()) + .required(true) + .build(); + public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder() .name("File Filter") .description("Only files whose names match the given regular expression will be picked up") @@ -182,6 +195,7 @@ public class ListFile extends AbstractListProcessor { final List properties = new ArrayList<>(); properties.add(DIRECTORY); properties.add(RECURSE); + properties.add(DIRECTORY_LOCATION); properties.add(FILE_FILTER); properties.add(PATH_FILTER); properties.add(MIN_AGE); @@ -274,6 +288,16 @@ public class ListFile extends AbstractListProcessor { return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue(); } + @Override + protected Scope getStateScope(final ProcessContext context) { + final String location = context.getProperty(DIRECTORY_LOCATION).getValue(); + if (LOCATION_REMOTE.getValue().equalsIgnoreCase(location)) { + return Scope.CLUSTER; + } + + return Scope.LOCAL; + } + @Override protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException { final File path = new File(getPath(context)); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java index 609b69368d..e1cd4172d7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java @@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer; @@ -85,4 +86,11 @@ public class ListSFTP extends ListFileTransfer { protected String getProtocolName() { return "sftp"; } + + @Override + protected Scope getStateScope(final ProcessContext context) { + // Use cluster scope so that component can be run on Primary Node Only and can still + // pick up where it left off, even if the Primary Node changes. + return Scope.CLUSTER; + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java index 3d6d3a05af..7343eea58d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java @@ -19,11 +19,9 @@ package org.apache.nifi.processors.standard; import java.io.BufferedOutputStream; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -55,6 +53,8 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.AbstractProcessor; @@ -81,6 +81,10 @@ import org.apache.nifi.util.LongHolder; + "ingesting files that have been compressed when 'rolled over'.") public class TailFile extends AbstractProcessor { + static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "File is located on a local disk drive. Each node in a cluster will tail a different file."); + static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote", "File is located on a remote resource. This Processor will store state across the cluster so that " + + "it can be run on Primary Node Only and a new Primary Node can pick up where the last one left off."); + static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time", "Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail"); static final AllowableValue START_CURRENT_FILE = new AllowableValue("Beginning of File", "Beginning of File", @@ -104,6 +108,13 @@ public class TailFile extends AbstractProcessor { .expressionLanguageSupported(false) .required(false) .build(); + static final PropertyDescriptor FILE_LOCATION = new PropertyDescriptor.Builder() + .name("File Location") + .description("Specifies where the file is located, so that state can be stored appropriately in order to ensure that all data is consumed without duplicating data upon restart of NiFi") + .required(true) + .allowableValues(LOCATION_LOCAL, LOCATION_REMOTE) + .defaultValue(LOCATION_LOCAL.getValue()) + .build(); static final PropertyDescriptor STATE_FILE = new PropertyDescriptor.Builder() .name("State File") .description("Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off") @@ -152,12 +163,44 @@ public class TailFile extends AbstractProcessor { } } + @OnScheduled public void recoverState(final ProcessContext context) throws IOException { - final String tailFilename = context.getProperty(FILENAME).getValue(); - final String stateFilename = context.getProperty(STATE_FILE).getValue(); + // Before the State Manager existed, we had to store state in a local file. Now, we want to use the State Manager + // instead. So we will need to recover the state that is stored in the file (if any), and then store that in our + // State Manager. But we do this only if nothing has ever been stored in the State Manager. + final Scope scope = getStateScope(context); + final StateMap stateMap = context.getStateManager().getState(scope); + if (stateMap.getVersion() == -1L) { + // State has never been stored in the State Manager. Try to recover state from a file, if one exists. + final Map stateFromFile = recoverStateValuesFromFile(context); + if (!stateFromFile.isEmpty()) { + persistState(stateFromFile, context); + recoverState(context, stateFromFile); + } + return; + } + + recoverState(context, stateMap.toMap()); + } + + /** + * Recovers values for the State that was stored in a local file. + * + * @param context the ProcessContext that indicates where the state is stored + * @return a Map that contains the keys defined in {@link TailFileState.StateKeys} + * @throws IOException if the state file exists but was unable to be read + */ + private Map recoverStateValuesFromFile(final ProcessContext context) throws IOException { + final String stateFilename = context.getProperty(STATE_FILE).getValue(); + if (stateFilename == null) { + return Collections.emptyMap(); + } + + final Map stateValues = new HashMap<>(4); final File stateFile = new File(stateFilename); + try (final FileInputStream fis = new FileInputStream(stateFile); final DataInputStream dis = new DataInputStream(fis)) { @@ -171,62 +214,109 @@ public class TailFile extends AbstractProcessor { long position = dis.readLong(); final long timestamp = dis.readLong(); final boolean checksumPresent = dis.readBoolean(); + final Long checksumValue; - FileChannel reader = null; - File tailFile = null; - - if (checksumPresent && tailFilename.equals(filename)) { - expectedRecoveryChecksum = dis.readLong(); - - // We have an expected checksum and the currently configured filename is the same as the state file. - // We need to check if the existing file is the same as the one referred to in the state file based on - // the checksum. - final Checksum checksum = new CRC32(); - final File existingTailFile = new File(filename); - if (existingTailFile.length() >= position) { - try (final InputStream tailFileIs = new FileInputStream(existingTailFile); - final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) { - StreamUtils.copy(in, new NullOutputStream(), state.getPosition()); - - final long checksumResult = in.getChecksum().getValue(); - if (checksumResult == expectedRecoveryChecksum) { - // Checksums match. This means that we want to resume reading from where we left off. - // So we will populate the reader object so that it will be used in onTrigger. If the - // checksums do not match, then we will leave the reader object null, so that the next - // call to onTrigger will result in a new Reader being created and starting at the - // beginning of the file. - getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off."); - tailFile = existingTailFile; - reader = FileChannel.open(tailFile.toPath(), StandardOpenOption.READ); - getLogger().debug("Created FileChannel {} for {} in recoverState", new Object[] {reader, tailFile}); - - reader.position(position); - } else { - // we don't seek the reader to the position, so our reader will start at beginning of file. - getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning."); - } - } - } else { - // fewer bytes than our position, so we know we weren't already reading from this file. Keep reader at a position of 0. - getLogger().debug("When recovering state, existing file to tail is only {} bytes but position flag is {}; " - + "this indicates that the file has rotated. Will begin tailing current file from beginning.", new Object[] {existingTailFile.length(), position}); - } - - state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, checksum, ByteBuffer.allocate(65536)); + if (checksumPresent) { + checksumValue = dis.readLong(); } else { - // If filename changed or there is no checksum present, then we have no expected checksum to use for recovery. - expectedRecoveryChecksum = null; - - // tailing a new file since the state file was written out. We will reset state. - state = new TailFileState(tailFilename, null, null, 0L, 0L, null, ByteBuffer.allocate(65536)); + checksumValue = null; } - getLogger().debug("Recovered state {}", new Object[] {state}); + stateValues.put(TailFileState.StateKeys.FILENAME, filename); + stateValues.put(TailFileState.StateKeys.POSITION, String.valueOf(position)); + stateValues.put(TailFileState.StateKeys.TIMESTAMP, String.valueOf(timestamp)); + stateValues.put(TailFileState.StateKeys.CHECKSUM, checksumValue == null ? null : String.valueOf(checksumValue)); } else { // encoding Version == -1... no data in file. Just move on. } } catch (final FileNotFoundException fnfe) { } + + return stateValues; + } + + + /** + * Updates member variables to reflect the "expected recovery checksum" and seek to the appropriate location in the + * tailed file, updating our checksum, so that we are ready to proceed with the {@link #onTrigger(ProcessContext, ProcessSession)} call. + * + * @param context the ProcessContext + * @param stateValues the values that were recovered from state that was previously stored. This Map should be populated with the keys defined + * in {@link TailFileState.StateKeys}. + * @throws IOException if unable to seek to the appropriate location in the tailed file. + */ + private void recoverState(final ProcessContext context, final Map stateValues) throws IOException { + if (stateValues == null) { + return; + } + + if (!stateValues.containsKey(TailFileState.StateKeys.FILENAME)) { + return; + } + if (!stateValues.containsKey(TailFileState.StateKeys.POSITION)) { + return; + } + if (!stateValues.containsKey(TailFileState.StateKeys.TIMESTAMP)) { + return; + } + + final String currentFilename = context.getProperty(FILENAME).getValue(); + final String checksumValue = stateValues.get(TailFileState.StateKeys.CHECKSUM); + final boolean checksumPresent = (checksumValue != null); + final String storedStateFilename = stateValues.get(TailFileState.StateKeys.FILENAME); + final long position = Long.parseLong(stateValues.get(TailFileState.StateKeys.POSITION)); + final long timestamp = Long.parseLong(stateValues.get(TailFileState.StateKeys.TIMESTAMP)); + + FileChannel reader = null; + File tailFile = null; + + if (checksumPresent && currentFilename.equals(storedStateFilename)) { + expectedRecoveryChecksum = Long.parseLong(checksumValue); + + // We have an expected checksum and the currently configured filename is the same as the state file. + // We need to check if the existing file is the same as the one referred to in the state file based on + // the checksum. + final Checksum checksum = new CRC32(); + final File existingTailFile = new File(storedStateFilename); + if (existingTailFile.length() >= position) { + try (final InputStream tailFileIs = new FileInputStream(existingTailFile); + final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) { + StreamUtils.copy(in, new NullOutputStream(), state.getPosition()); + + final long checksumResult = in.getChecksum().getValue(); + if (checksumResult == expectedRecoveryChecksum) { + // Checksums match. This means that we want to resume reading from where we left off. + // So we will populate the reader object so that it will be used in onTrigger. If the + // checksums do not match, then we will leave the reader object null, so that the next + // call to onTrigger will result in a new Reader being created and starting at the + // beginning of the file. + getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off."); + tailFile = existingTailFile; + reader = FileChannel.open(tailFile.toPath(), StandardOpenOption.READ); + getLogger().debug("Created FileChannel {} for {} in recoverState", new Object[] {reader, tailFile}); + + reader.position(position); + } else { + // we don't seek the reader to the position, so our reader will start at beginning of file. + getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning."); + } + } + } else { + // fewer bytes than our position, so we know we weren't already reading from this file. Keep reader at a position of 0. + getLogger().debug("When recovering state, existing file to tail is only {} bytes but position flag is {}; " + + "this indicates that the file has rotated. Will begin tailing current file from beginning.", new Object[] {existingTailFile.length(), position}); + } + + state = new TailFileState(currentFilename, tailFile, reader, position, timestamp, checksum, ByteBuffer.allocate(65536)); + } else { + // If filename changed or there is no checksum present, then we have no expected checksum to use for recovery. + expectedRecoveryChecksum = null; + + // tailing a new file since the state file was written out. We will reset state. + state = new TailFileState(currentFilename, null, null, 0L, 0L, null, ByteBuffer.allocate(65536)); + } + + getLogger().debug("Recovered state {}", new Object[] {state}); } @@ -571,40 +661,27 @@ public class TailFile extends AbstractProcessor { } + private Scope getStateScope(final ProcessContext context) { + final String location = context.getProperty(FILE_LOCATION).getValue(); + if (LOCATION_REMOTE.getValue().equalsIgnoreCase(location)) { + return Scope.CLUSTER; + } + + return Scope.LOCAL; + } private void persistState(final TailFileState state, final ProcessContext context) { - final String stateFilename = context.getProperty(STATE_FILE).getValue(); + persistState(state.toStateMap(), context); + } + + private void persistState(final Map state, final ProcessContext context) { try { - persistState(state, stateFilename); + context.getStateManager().setState(state, getStateScope(context)); } catch (final IOException e) { - getLogger().warn("Failed to update state file {} due to {}; some data may be duplicated on restart of NiFi", new Object[] {stateFilename, e}); + getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", new Object[] {e}); } } - private void persistState(final TailFileState state, final String stateFilename) throws IOException { - getLogger().debug("Persisting state {} to {}", new Object[] {state, stateFilename}); - - final File stateFile = new File(stateFilename); - File directory = stateFile.getParentFile(); - if (directory != null && !directory.exists() && !directory.mkdirs()) { - getLogger().warn("Failed to persist state to {} because the parent directory does not exist and could not be created. This may result in data being duplicated upon restart of NiFi"); - return; - } - try (final FileOutputStream fos = new FileOutputStream(stateFile); - final DataOutputStream dos = new DataOutputStream(fos)) { - - dos.writeInt(0); // version - dos.writeUTF(state.getFilename()); - dos.writeLong(state.getPosition()); - dos.writeLong(state.getTimestamp()); - if (state.getChecksum() == null) { - dos.writeBoolean(false); - } else { - dos.writeBoolean(true); - dos.writeLong(state.getChecksum().getValue()); - } - } - } private FileChannel createReader(final File file, final long position) { final FileChannel reader; @@ -728,7 +805,7 @@ public class TailFile extends AbstractProcessor { // must ensure that we do session.commit() before persisting state in order to avoid data loss. session.commit(); - persistState(state, context.getProperty(STATE_FILE).getValue()); + persistState(state, context); } } else { getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file", @@ -800,6 +877,13 @@ public class TailFile extends AbstractProcessor { private final Checksum checksum; private final ByteBuffer buffer; + private static class StateKeys { + public static final String FILENAME = "filename"; + public static final String POSITION = "position"; + public static final String TIMESTAMP = "timestamp"; + public static final String CHECKSUM = "checksum"; + } + public TailFileState(final String filename, final File file, final FileChannel reader, final long position, final long timestamp, final Checksum checksum, final ByteBuffer buffer) { this.filename = filename; this.file = file; @@ -842,5 +926,14 @@ public class TailFile extends AbstractProcessor { public String toString() { return "TailFileState[filename=" + filename + ", position=" + position + ", timestamp=" + timestamp + ", checksum=" + (checksum == null ? "null" : checksum.getValue()) + "]"; } + + public Map toStateMap() { + final Map map = new HashMap<>(4); + map.put(StateKeys.FILENAME, filename); + map.put(StateKeys.POSITION, String.valueOf(position)); + map.put(StateKeys.TIMESTAMP, String.valueOf(timestamp)); + map.put(StateKeys.CHECKSUM, checksum == null ? null : String.valueOf(checksum.getValue())); + return map; + } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java index 37316b699d..1da2b4d1b6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java @@ -245,5 +245,10 @@ public class TestAbstractListProcessor { protected boolean isListingResetNecessary(PropertyDescriptor property) { return false; } + + @Override + protected Scope getStateScope(final ProcessContext context) { + return Scope.CLUSTER; + } } }