NIFI-259: Began updating processors to use new state management

This commit is contained in:
Mark Payne 2016-01-12 10:21:47 -05:00
parent 7a3e3efce1
commit e559c68766
10 changed files with 491 additions and 272 deletions

View File

@ -17,6 +17,7 @@
package org.apache.nifi.state; package org.apache.nifi.state;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -33,8 +34,28 @@ public class MockStateManager implements StateManager {
private StateMap localStateMap = new MockStateMap(null, -1L); private StateMap localStateMap = new MockStateMap(null, -1L);
private StateMap clusterStateMap = new MockStateMap(null, -1L); private StateMap clusterStateMap = new MockStateMap(null, -1L);
private volatile boolean failToGetLocalState = false;
private volatile boolean failToSetLocalState = false;
private volatile boolean failToGetClusterState = false;
private volatile boolean failToSetClusterState = false;
private void verifyCanSet(final Scope scope) throws IOException {
final boolean failToSet = (scope == Scope.LOCAL) ? failToSetLocalState : failToSetClusterState;
if (failToSet) {
throw new IOException("Unit Test configured to throw IOException if " + scope + " State is set");
}
}
private void verifyCanGet(final Scope scope) throws IOException {
final boolean failToGet = (scope == Scope.LOCAL) ? failToGetLocalState : failToGetClusterState;
if (failToGet) {
throw new IOException("Unit Test configured to throw IOException if " + scope + " State is retrieved");
}
}
@Override @Override
public synchronized void setState(final Map<String, String> state, final Scope scope) { public synchronized void setState(final Map<String, String> state, final Scope scope) throws IOException {
verifyCanSet(scope);
final StateMap stateMap = new MockStateMap(state, versionIndex.incrementAndGet()); final StateMap stateMap = new MockStateMap(state, versionIndex.incrementAndGet());
if (scope == Scope.CLUSTER) { if (scope == Scope.CLUSTER) {
@ -45,7 +66,12 @@ public class MockStateManager implements StateManager {
} }
@Override @Override
public synchronized StateMap getState(final Scope scope) { public synchronized StateMap getState(final Scope scope) throws IOException {
verifyCanGet(scope);
return retrieveState(scope);
}
private synchronized StateMap retrieveState(final Scope scope) {
if (scope == Scope.CLUSTER) { if (scope == Scope.CLUSTER) {
return clusterStateMap; return clusterStateMap;
} else { } else {
@ -54,9 +80,10 @@ public class MockStateManager implements StateManager {
} }
@Override @Override
public synchronized boolean replace(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) { public synchronized boolean replace(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException {
if (scope == Scope.CLUSTER) { if (scope == Scope.CLUSTER) {
if (oldValue == clusterStateMap) { if (oldValue == clusterStateMap) {
verifyCanSet(scope);
clusterStateMap = new MockStateMap(newValue, versionIndex.incrementAndGet()); clusterStateMap = new MockStateMap(newValue, versionIndex.incrementAndGet());
return true; return true;
} }
@ -64,6 +91,7 @@ public class MockStateManager implements StateManager {
return false; return false;
} else { } else {
if (oldValue == localStateMap) { if (oldValue == localStateMap) {
verifyCanSet(scope);
localStateMap = new MockStateMap(newValue, versionIndex.incrementAndGet()); localStateMap = new MockStateMap(newValue, versionIndex.incrementAndGet());
return true; return true;
} }
@ -73,13 +101,19 @@ public class MockStateManager implements StateManager {
} }
@Override @Override
public synchronized void clear(final Scope scope) { public synchronized void clear(final Scope scope) throws IOException {
setState(Collections.<String, String> emptyMap(), scope); setState(Collections.<String, String> emptyMap(), scope);
} }
private String getValue(final String key, final Scope scope) { private String getValue(final String key, final Scope scope) {
final StateMap stateMap = getState(scope); final StateMap stateMap;
if (scope == Scope.CLUSTER) {
stateMap = clusterStateMap;
} else {
stateMap = localStateMap;
}
return stateMap.get(key); return stateMap.get(key);
} }
@ -104,7 +138,7 @@ public class MockStateManager implements StateManager {
* @param scope the scope to compare the stateValues against * @param scope the scope to compare the stateValues against
*/ */
public void assertStateEquals(final Map<String, String> stateValues, final Scope scope) { public void assertStateEquals(final Map<String, String> stateValues, final Scope scope) {
final StateMap stateMap = getState(scope); final StateMap stateMap = retrieveState(scope);
Assert.assertEquals(stateValues, stateMap.toMap()); Assert.assertEquals(stateValues, stateMap.toMap());
} }
@ -115,7 +149,7 @@ public class MockStateManager implements StateManager {
* @param scope the scope to compare the stateValues against * @param scope the scope to compare the stateValues against
*/ */
public void assertStateNotEquals(final Map<String, String> stateValues, final Scope scope) { public void assertStateNotEquals(final Map<String, String> stateValues, final Scope scope) {
final StateMap stateMap = getState(scope); final StateMap stateMap = retrieveState(scope);
Assert.assertNotSame(stateValues, stateMap.toMap()); Assert.assertNotSame(stateValues, stateMap.toMap());
} }
@ -157,16 +191,49 @@ public class MockStateManager implements StateManager {
*/ */
public void assertStateSet(final Scope scope) { public void assertStateSet(final Scope scope) {
final StateMap stateMap = (scope == Scope.CLUSTER) ? clusterStateMap : localStateMap; final StateMap stateMap = (scope == Scope.CLUSTER) ? clusterStateMap : localStateMap;
Assert.assertEquals("Expected state to be set for Scope " + scope + ", but it was not set", -1L, stateMap.getVersion()); Assert.assertNotSame("Expected state to be set for Scope " + scope + ", but it was not set", -1L, stateMap.getVersion());
} }
/** /**
* Ensures that the state was not set for the given scope * Ensures that the state was not set for the given scope
* *
* @param scope the scope * @param scope the scope
*/ */
public void assertStateNotSet(final Scope scope) { public void assertStateNotSet(final Scope scope) {
final StateMap stateMap = (scope == Scope.CLUSTER) ? clusterStateMap : localStateMap; final StateMap stateMap = (scope == Scope.CLUSTER) ? clusterStateMap : localStateMap;
Assert.assertNotSame("Expected state not to be set for Scope " + scope + ", but it was set", -1L, stateMap.getVersion()); Assert.assertEquals("Expected state not to be set for Scope " + scope + ", but it was set", -1L, stateMap.getVersion());
}
/**
* Specifies whether or not the State Manager should throw an IOException when state is set for the given scope.
* Note that calls to {@link #replace(StateMap, Map, Scope)} will fail only if the state would be set (i.e., if
* we call replace and the StateMap does not match the old value, it will not fail).
*
* Also note that if setting state is set to fail, clearing will also fail, as clearing is thought of as setting the
* state to empty
*
* @param scope the scope that should (or should not) fail
* @param fail whether or not setting state should fail
*/
public void setFailOnStateSet(final Scope scope, final boolean fail) {
if (scope == Scope.LOCAL) {
failToSetLocalState = fail;
} else {
failToSetClusterState = fail;
}
}
/**
* Specifies whether or not the State Manager should throw an IOException when state is retrieved for the given scope.
*
* @param scope the scope that should (or should not) fail
* @param fail whether or not retrieving state should fail
*/
public void setFailOnStateGet(final Scope scope, final boolean fail) {
if (scope == Scope.LOCAL) {
failToGetLocalState = fail;
} else {
failToGetClusterState = fail;
}
} }
} }

View File

@ -27,6 +27,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -50,11 +53,9 @@ import org.wali.WriteAheadRepository;
public class WriteAheadLocalStateProvider extends AbstractStateProvider { public class WriteAheadLocalStateProvider extends AbstractStateProvider {
private static final Logger logger = LoggerFactory.getLogger(WriteAheadLocalStateProvider.class); private static final Logger logger = LoggerFactory.getLogger(WriteAheadLocalStateProvider.class);
// TODO: CREATE BACKGROUND THREAD OR USE EXECUTOR (in StateProviderInitializationContext?) to schedule checkpointing.
private static final long CHECKPOINT_NANOS = TimeUnit.MINUTES.toNanos(2);
private final StateMapSerDe serde; private final StateMapSerDe serde;
private final ConcurrentMap<String, ComponentProvider> componentProviders = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ComponentProvider> componentProviders = new ConcurrentHashMap<>();
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory());
static final PropertyDescriptor PATH = new PropertyDescriptor.Builder() static final PropertyDescriptor PATH = new PropertyDescriptor.Builder()
.name("Directory") .name("Directory")
@ -115,6 +116,8 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
// expensive than just keeping track of a local 'long' variable. Since we won't actually increment this at any point until this after // expensive than just keeping track of a local 'long' variable. Since we won't actually increment this at any point until this after
// the init() method completes, this is okay to do. // the init() method completes, this is okay to do.
versionGenerator.set(maxRecordVersion); versionGenerator.set(maxRecordVersion);
executor.scheduleWithFixedDelay(new CheckpointTask(), 2, 2, TimeUnit.MINUTES);
} }
@Override @Override
@ -221,4 +224,29 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
wal.update(Collections.singleton(update), false); wal.update(Collections.singleton(update), false);
} }
} }
private class CheckpointTask implements Runnable {
@Override
public void run() {
try {
logger.debug("Checkpointing Write-Ahead Log used to store components' state");
writeAheadLog.checkpoint();
} catch (final IOException e) {
logger.error("Failed to checkpoint Write-Ahead Log used to store components' state", e);
}
}
}
private static class NamedThreadFactory implements ThreadFactory {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
@Override
public Thread newThread(final Runnable r) {
final Thread t = defaultFactory.newThread(r);
t.setName("Write-Ahead Local State Provider Maintenance");
t.setDaemon(true);
return t;
}
}
} }

View File

@ -17,18 +17,13 @@
package org.apache.nifi.processors.hadoop; package org.apache.nifi.processors.hadoop;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -45,9 +40,13 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -59,7 +58,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.HDFSListing; import org.apache.nifi.processors.hadoop.util.HDFSListing;
import org.apache.nifi.processors.hadoop.util.StringSerDe; import org.apache.nifi.processors.hadoop.util.StringSerDe;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.JsonMappingException;
@ -93,7 +91,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
.name("Distributed Cache Service") .name("Distributed Cache Service")
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node " .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node "
+ "begins pulling data, it won't duplicate all of the work that has been done.") + "begins pulling data, it won't duplicate all of the work that has been done.")
.required(true) .required(false)
.identifiesControllerService(DistributedMapCacheClient.class) .identifiesControllerService(DistributedMapCacheClient.class)
.build(); .build();
@ -176,70 +174,61 @@ public class ListHDFS extends AbstractHadoopProcessor {
return mapper.readValue(jsonNode, HDFSListing.class); return mapper.readValue(jsonNode, HDFSListing.class);
} }
@OnScheduled
public void moveStateToStateManager(final ProcessContext context) throws IOException {
final StateManager stateManager = context.getStateManager();
final StateMap stateMap = stateManager.getState(Scope.CLUSTER);
private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException { // Check if we have already stored state in the cluster state manager.
// Determine the timestamp for the last file that we've listed. if (stateMap.getVersion() == -1L) {
final HDFSListing serviceListing = getListingFromService(context);
if (serviceListing != null) {
persistState(serviceListing, context.getStateManager());
}
}
}
private HDFSListing getListingFromService(final ProcessContext context) throws IOException {
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
if (client == null) {
return null;
}
final String directory = context.getProperty(DIRECTORY).getValue();
final String remoteValue = client.get(getKey(directory), new StringSerDe(), new StringSerDe());
if (remoteValue == null) {
return null;
}
try {
return deserialize(remoteValue);
} catch (final Exception e) {
getLogger().error("Failed to retrieve state from Distributed Map Cache because the content that was retrieved could not be understood", e);
return null;
}
}
private void persistState(final HDFSListing listing, final StateManager stateManager) throws IOException {
final Map<String, String> stateValues = listing.toMap();
stateManager.setState(stateValues, Scope.CLUSTER);
}
private Long getMinTimestamp(final String directory, final HDFSListing remoteListing) throws IOException {
// No cluster-wide state has been recovered. Just use whatever values we already have.
if (remoteListing == null) {
return lastListingTime;
}
// If our local timestamp is already later than the remote listing's timestamp, use our local info.
Long minTimestamp = lastListingTime; Long minTimestamp = lastListingTime;
if ( minTimestamp == null || electedPrimaryNode ) { if (minTimestamp != null && minTimestamp > remoteListing.getLatestTimestamp().getTime()) {
// We haven't yet restored any state from local or distributed state - or it's been at least a minute since return minTimestamp;
// we have performed a listing. In this case, }
// First, attempt to get timestamp from distributed cache service.
try {
final StringSerDe serde = new StringSerDe();
final String serializedState = client.get(getKey(directory), serde, serde);
if ( serializedState == null || serializedState.isEmpty() ) {
minTimestamp = null;
this.latestPathsListed = Collections.emptySet();
} else {
final HDFSListing listing = deserialize(serializedState);
this.lastListingTime = listing.getLatestTimestamp().getTime();
minTimestamp = listing.getLatestTimestamp().getTime();
this.latestPathsListed = listing.toPaths();
}
this.lastListingTime = minTimestamp; // Use the remote listing's information.
electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore. if (minTimestamp == null || electedPrimaryNode) {
} catch (final IOException ioe) { this.latestPathsListed = remoteListing.toPaths();
throw ioe; this.lastListingTime = remoteListing.getLatestTimestamp().getTime();
}
// Check the persistence file. We want to use the latest timestamp that we have so that
// we don't duplicate data.
try {
final File persistenceFile = getPersistenceFile();
if ( persistenceFile.exists() ) {
try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
final Properties props = new Properties();
props.load(fis);
// get the local timestamp for this directory, if it exists.
final String locallyPersistedValue = props.getProperty(directory);
if ( locallyPersistedValue != null ) {
final HDFSListing listing = deserialize(locallyPersistedValue);
final long localTimestamp = listing.getLatestTimestamp().getTime();
// If distributed state doesn't have an entry or the local entry is later than the distributed state,
// update the distributed state so that we are in sync.
if (minTimestamp == null || localTimestamp > minTimestamp) {
minTimestamp = localTimestamp;
// Our local persistence file shows a later time than the Distributed service.
// Update the distributed service to match our local state.
try {
final StringSerDe serde = new StringSerDe();
client.put(getKey(directory), locallyPersistedValue, serde, serde);
} catch (final IOException ioe) {
getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed "
+ "state due to {}. If a new node performs HDFS Listing, data duplication may occur",
new Object[] {directory, locallyPersistedValue, ioe});
}
}
}
}
}
} catch (final IOException ioe) {
getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe);
}
} }
return minTimestamp; return minTimestamp;
@ -248,11 +237,20 @@ public class ListHDFS extends AbstractHadoopProcessor {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final String directory = context.getProperty(DIRECTORY).getValue(); final String directory = context.getProperty(DIRECTORY).getValue();
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
// Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
final Long minTimestamp; final Long minTimestamp;
try { try {
minTimestamp = getMinTimestamp(directory, client); final HDFSListing stateListing;
final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
if (stateMap.getVersion() == -1L) {
stateListing = null;
} else {
final Map<String, String> stateValues = stateMap.toMap();
stateListing = HDFSListing.fromMap(stateValues);
}
minTimestamp = getMinTimestamp(directory, stateListing);
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
context.yield(); context.yield();
@ -311,32 +309,19 @@ public class ListHDFS extends AbstractHadoopProcessor {
// However, we want to save the state both locally and remotely. // However, we want to save the state both locally and remotely.
// We store the state remotely so that if a new Primary Node is chosen, it can pick up where the // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
// previously Primary Node left off. // previously Primary Node left off.
// We also store the state locally so that if the node is restarted, and the node cannot contact final HDFSListing latestListing = createListing(latestListingModTime, statuses);
// the distributed state cache, the node can continue to run (if it is primary node).
String serializedState = null;
try { try {
serializedState = serializeState(latestListingModTime, statuses); context.getStateManager().setState(latestListing.toMap(), Scope.CLUSTER);
} catch (final Exception e) { } catch (final IOException ioe) {
getLogger().error("Failed to serialize state due to {}", new Object[] {e}); getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
}
if ( serializedState != null ) {
// Save our state locally.
try {
persistLocalState(directory, serializedState);
} catch (final IOException ioe) {
getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe);
}
// Attempt to save state to remote server.
try {
client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe());
} catch (final IOException ioe) {
getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe);
}
} }
lastListingTime = latestListingModTime; lastListingTime = latestListingModTime;
latestPathsListed.clear();
for (final FileStatus status : statuses) {
latestPathsListed.add(status.getPath());
}
} else { } else {
getLogger().debug("There is no data to list. Yielding."); getLogger().debug("There is no data to list. Yielding.");
context.yield(); context.yield();
@ -372,71 +357,18 @@ public class ListHDFS extends AbstractHadoopProcessor {
return statusSet; return statusSet;
} }
private HDFSListing createListing(final long latestListingModTime, final Set<FileStatus> statuses) {
private String serializeState(final long latestListingTime, final Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, IOException { final Set<String> paths = new HashSet<>();
// we need to keep track of all files that we pulled in that had a modification time equal to for (final FileStatus status : statuses) {
// lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files final String path = status.getPath().toUri().toString();
// that have a mod time equal to that timestamp because more files may come in with the same timestamp paths.add(path);
// later in the same millisecond.
if ( statuses.isEmpty() ) {
return null;
} else {
final List<FileStatus> sortedStatuses = new ArrayList<>(statuses);
Collections.sort(sortedStatuses, new Comparator<FileStatus>() {
@Override
public int compare(final FileStatus o1, final FileStatus o2) {
return Long.compare(o1.getModificationTime(), o2.getModificationTime());
}
});
final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime();
final Set<Path> pathsWithModTimeEqualToListingModTime = new HashSet<>();
for (int i=sortedStatuses.size() - 1; i >= 0; i--) {
final FileStatus status = sortedStatuses.get(i);
if (status.getModificationTime() == latestListingModTime) {
pathsWithModTimeEqualToListingModTime.add(status.getPath());
}
}
this.latestPathsListed = pathsWithModTimeEqualToListingModTime;
final HDFSListing listing = new HDFSListing();
listing.setLatestTimestamp(new Date(latestListingModTime));
final Set<String> paths = new HashSet<>();
for ( final Path path : pathsWithModTimeEqualToListingModTime ) {
paths.add(path.toUri().toString());
}
listing.setMatchingPaths(paths);
final ObjectMapper mapper = new ObjectMapper();
final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing);
return serializedState;
}
}
protected void persistLocalState(final String directory, final String serializedState) throws IOException {
// we need to keep track of all files that we pulled in that had a modification time equal to
// lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
// that have a mod time equal to that timestamp because more files may come in with the same timestamp
// later in the same millisecond.
final File persistenceFile = getPersistenceFile();
final File dir = persistenceFile.getParentFile();
if ( !dir.exists() && !dir.mkdirs() ) {
throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");
} }
final Properties props = new Properties(); final HDFSListing listing = new HDFSListing();
if ( persistenceFile.exists() ) { listing.setLatestTimestamp(new Date(latestListingModTime));
try (final FileInputStream fis = new FileInputStream(persistenceFile)) { listing.setMatchingPaths(paths);
props.load(fis);
}
}
props.setProperty(directory, serializedState); return listing;
try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
props.store(fos, null);
}
} }
private String getAbsolutePath(final Path path) { private String getAbsolutePath(final Path path) {

View File

@ -16,9 +16,12 @@
*/ */
package org.apache.nifi.processors.hadoop.util; package org.apache.nifi.processors.hadoop.util;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map;
import java.util.Set; import java.util.Set;
import javax.xml.bind.annotation.XmlTransient; import javax.xml.bind.annotation.XmlTransient;
@ -35,6 +38,11 @@ public class HDFSListing {
private Date latestTimestamp; private Date latestTimestamp;
private Collection<String> matchingPaths; private Collection<String> matchingPaths;
public static class StateKeys {
public static final String TIMESTAMP = "timestamp";
public static final String PATH_PREFIX = "path.";
}
/** /**
* @return the modification date of the newest file that was contained in the HDFS Listing * @return the modification date of the newest file that was contained in the HDFS Listing
*/ */
@ -80,4 +88,41 @@ public class HDFSListing {
this.matchingPaths = matchingPaths; this.matchingPaths = matchingPaths;
} }
/**
* Converts this HDFSListing into a Map<String, String> so that it can be stored in a StateManager.
*
* @return a Map<String, String> that represents the same information as this HDFSListing
*/
public Map<String, String> toMap() {
final Map<String, String> map = new HashMap<>(1 + matchingPaths.size());
map.put(StateKeys.TIMESTAMP, String.valueOf(latestTimestamp.getTime()));
int counter = 0;
for (final String path : matchingPaths) {
map.put(StateKeys.PATH_PREFIX + String.valueOf(counter++), path);
}
return map;
}
public static HDFSListing fromMap(final Map<String, String> map) {
if (map == null || map.isEmpty()) {
return null;
}
final String timestampValue = map.get(StateKeys.TIMESTAMP);
final long timestamp = Long.parseLong(timestampValue);
final Collection<String> matchingPaths = new ArrayList<>(map.size() - 1);
for (final Map.Entry<String, String> entry : map.entrySet()) {
if (entry.getKey().startsWith(StateKeys.PATH_PREFIX)) {
matchingPaths.add(entry.getValue());
}
}
final HDFSListing listing = new HDFSListing();
listing.setLatestTimestamp(new Date(timestamp));
listing.setMatchingPaths(matchingPaths);
return listing;
}
} }

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.nifi.processors.hadoop; package org.apache.nifi.processors.hadoop;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
@ -131,8 +132,8 @@ public class TestListHDFS {
@Test @Test
public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() { public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() throws IOException {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
runner.run(); runner.run();
@ -145,7 +146,7 @@ public class TestListHDFS {
runner.clearTransferState(); runner.clearTransferState();
// add new file to pull // add new file to pull
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt"))); proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
// trigger primary node change // trigger primary node change
proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE); proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
@ -153,29 +154,45 @@ public class TestListHDFS {
// cause calls to service to fail // cause calls to service to fail
service.failOnCalls = true; service.failOnCalls = true;
runner.run(); runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
runner.run(); // Should fail to perform @OnScheduled methods.
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
final String key = proc.getKey("/test");
// wait just to a bit to ensure that the timestamp changes when we update the service
final Object curVal = service.values.get(key);
try { try {
Thread.sleep(10L); runner.run();
} catch (final InterruptedException ie) { Assert.fail("Processor ran successfully");
} catch (final AssertionError e) {
} }
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
// Should fail to perform @OnScheduled methods.
try {
runner.run();
Assert.fail("Processor ran successfully");
} catch (final AssertionError e) {
}
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
service.failOnCalls = false; service.failOnCalls = false;
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, false);
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
// ensure state saved both locally & remotely // ensure state saved
assertTrue(proc.localStateSaved); runner.getStateManager().assertStateSet(Scope.CLUSTER);
assertNotNull(service.values.get(key)); final Map<String, String> newState = runner.getStateManager().getState(Scope.CLUSTER).toMap();
assertNotSame(curVal, service.values.get(key)); assertEquals(3, newState.size());
final String path0 = newState.get("path.0");
final String path1 = newState.get("path.1");
assertTrue(path0.equals("/test/testFile.txt") || path0.equals("/test/testFile2.txt"));
assertTrue(path1.equals("/test/testFile.txt") || path1.equals("/test/testFile2.txt"));
assertNotSame(path0, path1);
final Long timestamp = Long.parseLong(newState.get("timestamp"));
assertEquals(1999L, timestamp.longValue());
} }
@ -186,7 +203,6 @@ public class TestListHDFS {
private class ListHDFSWithMockedFileSystem extends ListHDFS { private class ListHDFSWithMockedFileSystem extends ListHDFS {
private final MockFileSystem fileSystem = new MockFileSystem(); private final MockFileSystem fileSystem = new MockFileSystem();
private boolean localStateSaved = false;
@Override @Override
protected FileSystem getFileSystem() { protected FileSystem getFileSystem() {
@ -202,12 +218,6 @@ public class TestListHDFS {
protected FileSystem getFileSystem(final Configuration config) throws IOException { protected FileSystem getFileSystem(final Configuration config) throws IOException {
return fileSystem; return fileSystem;
} }
@Override
protected void persistLocalState(final String directory, final String serializedState) throws IOException {
super.persistLocalState(directory, serializedState);
localStateSaved = true;
}
} }

View File

@ -463,6 +463,13 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
*/ */
protected abstract boolean isListingResetNecessary(final PropertyDescriptor property); protected abstract boolean isListingResetNecessary(final PropertyDescriptor property);
/**
* Returns a Scope that specifies where the state should be managed for this Processor
*
* @param context the ProcessContext to use in order to make a determination
* @return a Scope that specifies where the state should be managed for this Processor
*/
protected abstract Scope getStateScope(final ProcessContext context);
private static class StringSerDe implements Serializer<String>, Deserializer<String> { private static class StringSerDe implements Serializer<String>, Deserializer<String> {

View File

@ -53,7 +53,9 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -93,6 +95,9 @@ import org.apache.nifi.processors.standard.util.FileInfo;
}) })
@SeeAlso({GetFile.class, PutFile.class, FetchFile.class}) @SeeAlso({GetFile.class, PutFile.class, FetchFile.class})
public class ListFile extends AbstractListProcessor<FileInfo> { public class ListFile extends AbstractListProcessor<FileInfo> {
static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "Input Directory is located on a local disk. State will be stored locally on each node in the cluster.");
static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote", "Input Directory is located on a remote system. State will be stored across the cluster so that "
+ "the listing can be performed on Primary Node Only and another node can pick up where the last node left off, if the Primary Node changes");
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name("Input Directory") .name("Input Directory")
@ -110,6 +115,14 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
.defaultValue("true") .defaultValue("true")
.build(); .build();
public static final PropertyDescriptor DIRECTORY_LOCATION = new PropertyDescriptor.Builder()
.name("Input Directory Location")
.description("Specifies where the Input Directory is located. This is used to determine whether state should be stored locally or across the cluster.")
.allowableValues(LOCATION_LOCAL, LOCATION_REMOTE)
.defaultValue(LOCATION_LOCAL.getValue())
.required(true)
.build();
public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder() public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
.name("File Filter") .name("File Filter")
.description("Only files whose names match the given regular expression will be picked up") .description("Only files whose names match the given regular expression will be picked up")
@ -182,6 +195,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DIRECTORY); properties.add(DIRECTORY);
properties.add(RECURSE); properties.add(RECURSE);
properties.add(DIRECTORY_LOCATION);
properties.add(FILE_FILTER); properties.add(FILE_FILTER);
properties.add(PATH_FILTER); properties.add(PATH_FILTER);
properties.add(MIN_AGE); properties.add(MIN_AGE);
@ -274,6 +288,16 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue(); return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
} }
@Override
protected Scope getStateScope(final ProcessContext context) {
final String location = context.getProperty(DIRECTORY_LOCATION).getValue();
if (LOCATION_REMOTE.getValue().equalsIgnoreCase(location)) {
return Scope.CLUSTER;
}
return Scope.LOCAL;
}
@Override @Override
protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException { protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
final File path = new File(getPath(context)); final File path = new File(getPath(context));

View File

@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer;
@ -85,4 +86,11 @@ public class ListSFTP extends ListFileTransfer {
protected String getProtocolName() { protected String getProtocolName() {
return "sftp"; return "sftp";
} }
@Override
protected Scope getStateScope(final ProcessContext context) {
// Use cluster scope so that component can be run on Primary Node Only and can still
// pick up where it left off, even if the Primary Node changes.
return Scope.CLUSTER;
}
} }

View File

@ -19,11 +19,9 @@ package org.apache.nifi.processors.standard;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -55,6 +53,8 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
@ -81,6 +81,10 @@ import org.apache.nifi.util.LongHolder;
+ "ingesting files that have been compressed when 'rolled over'.") + "ingesting files that have been compressed when 'rolled over'.")
public class TailFile extends AbstractProcessor { public class TailFile extends AbstractProcessor {
static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "File is located on a local disk drive. Each node in a cluster will tail a different file.");
static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote", "File is located on a remote resource. This Processor will store state across the cluster so that "
+ "it can be run on Primary Node Only and a new Primary Node can pick up where the last one left off.");
static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time", static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time",
"Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail"); "Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail");
static final AllowableValue START_CURRENT_FILE = new AllowableValue("Beginning of File", "Beginning of File", static final AllowableValue START_CURRENT_FILE = new AllowableValue("Beginning of File", "Beginning of File",
@ -104,6 +108,13 @@ public class TailFile extends AbstractProcessor {
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.required(false) .required(false)
.build(); .build();
static final PropertyDescriptor FILE_LOCATION = new PropertyDescriptor.Builder()
.name("File Location")
.description("Specifies where the file is located, so that state can be stored appropriately in order to ensure that all data is consumed without duplicating data upon restart of NiFi")
.required(true)
.allowableValues(LOCATION_LOCAL, LOCATION_REMOTE)
.defaultValue(LOCATION_LOCAL.getValue())
.build();
static final PropertyDescriptor STATE_FILE = new PropertyDescriptor.Builder() static final PropertyDescriptor STATE_FILE = new PropertyDescriptor.Builder()
.name("State File") .name("State File")
.description("Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off") .description("Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off")
@ -152,12 +163,44 @@ public class TailFile extends AbstractProcessor {
} }
} }
@OnScheduled @OnScheduled
public void recoverState(final ProcessContext context) throws IOException { public void recoverState(final ProcessContext context) throws IOException {
final String tailFilename = context.getProperty(FILENAME).getValue(); // Before the State Manager existed, we had to store state in a local file. Now, we want to use the State Manager
final String stateFilename = context.getProperty(STATE_FILE).getValue(); // instead. So we will need to recover the state that is stored in the file (if any), and then store that in our
// State Manager. But we do this only if nothing has ever been stored in the State Manager.
final Scope scope = getStateScope(context);
final StateMap stateMap = context.getStateManager().getState(scope);
if (stateMap.getVersion() == -1L) {
// State has never been stored in the State Manager. Try to recover state from a file, if one exists.
final Map<String, String> stateFromFile = recoverStateValuesFromFile(context);
if (!stateFromFile.isEmpty()) {
persistState(stateFromFile, context);
recoverState(context, stateFromFile);
}
return;
}
recoverState(context, stateMap.toMap());
}
/**
* Recovers values for the State that was stored in a local file.
*
* @param context the ProcessContext that indicates where the state is stored
* @return a Map<String, String> that contains the keys defined in {@link TailFileState.StateKeys}
* @throws IOException if the state file exists but was unable to be read
*/
private Map<String, String> recoverStateValuesFromFile(final ProcessContext context) throws IOException {
final String stateFilename = context.getProperty(STATE_FILE).getValue();
if (stateFilename == null) {
return Collections.emptyMap();
}
final Map<String, String> stateValues = new HashMap<>(4);
final File stateFile = new File(stateFilename); final File stateFile = new File(stateFilename);
try (final FileInputStream fis = new FileInputStream(stateFile); try (final FileInputStream fis = new FileInputStream(stateFile);
final DataInputStream dis = new DataInputStream(fis)) { final DataInputStream dis = new DataInputStream(fis)) {
@ -171,62 +214,109 @@ public class TailFile extends AbstractProcessor {
long position = dis.readLong(); long position = dis.readLong();
final long timestamp = dis.readLong(); final long timestamp = dis.readLong();
final boolean checksumPresent = dis.readBoolean(); final boolean checksumPresent = dis.readBoolean();
final Long checksumValue;
FileChannel reader = null; if (checksumPresent) {
File tailFile = null; checksumValue = dis.readLong();
if (checksumPresent && tailFilename.equals(filename)) {
expectedRecoveryChecksum = dis.readLong();
// We have an expected checksum and the currently configured filename is the same as the state file.
// We need to check if the existing file is the same as the one referred to in the state file based on
// the checksum.
final Checksum checksum = new CRC32();
final File existingTailFile = new File(filename);
if (existingTailFile.length() >= position) {
try (final InputStream tailFileIs = new FileInputStream(existingTailFile);
final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) {
StreamUtils.copy(in, new NullOutputStream(), state.getPosition());
final long checksumResult = in.getChecksum().getValue();
if (checksumResult == expectedRecoveryChecksum) {
// Checksums match. This means that we want to resume reading from where we left off.
// So we will populate the reader object so that it will be used in onTrigger. If the
// checksums do not match, then we will leave the reader object null, so that the next
// call to onTrigger will result in a new Reader being created and starting at the
// beginning of the file.
getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off.");
tailFile = existingTailFile;
reader = FileChannel.open(tailFile.toPath(), StandardOpenOption.READ);
getLogger().debug("Created FileChannel {} for {} in recoverState", new Object[] {reader, tailFile});
reader.position(position);
} else {
// we don't seek the reader to the position, so our reader will start at beginning of file.
getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning.");
}
}
} else {
// fewer bytes than our position, so we know we weren't already reading from this file. Keep reader at a position of 0.
getLogger().debug("When recovering state, existing file to tail is only {} bytes but position flag is {}; "
+ "this indicates that the file has rotated. Will begin tailing current file from beginning.", new Object[] {existingTailFile.length(), position});
}
state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, checksum, ByteBuffer.allocate(65536));
} else { } else {
// If filename changed or there is no checksum present, then we have no expected checksum to use for recovery. checksumValue = null;
expectedRecoveryChecksum = null;
// tailing a new file since the state file was written out. We will reset state.
state = new TailFileState(tailFilename, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
} }
getLogger().debug("Recovered state {}", new Object[] {state}); stateValues.put(TailFileState.StateKeys.FILENAME, filename);
stateValues.put(TailFileState.StateKeys.POSITION, String.valueOf(position));
stateValues.put(TailFileState.StateKeys.TIMESTAMP, String.valueOf(timestamp));
stateValues.put(TailFileState.StateKeys.CHECKSUM, checksumValue == null ? null : String.valueOf(checksumValue));
} else { } else {
// encoding Version == -1... no data in file. Just move on. // encoding Version == -1... no data in file. Just move on.
} }
} catch (final FileNotFoundException fnfe) { } catch (final FileNotFoundException fnfe) {
} }
return stateValues;
}
/**
* Updates member variables to reflect the "expected recovery checksum" and seek to the appropriate location in the
* tailed file, updating our checksum, so that we are ready to proceed with the {@link #onTrigger(ProcessContext, ProcessSession)} call.
*
* @param context the ProcessContext
* @param stateValues the values that were recovered from state that was previously stored. This Map should be populated with the keys defined
* in {@link TailFileState.StateKeys}.
* @throws IOException if unable to seek to the appropriate location in the tailed file.
*/
private void recoverState(final ProcessContext context, final Map<String, String> stateValues) throws IOException {
if (stateValues == null) {
return;
}
if (!stateValues.containsKey(TailFileState.StateKeys.FILENAME)) {
return;
}
if (!stateValues.containsKey(TailFileState.StateKeys.POSITION)) {
return;
}
if (!stateValues.containsKey(TailFileState.StateKeys.TIMESTAMP)) {
return;
}
final String currentFilename = context.getProperty(FILENAME).getValue();
final String checksumValue = stateValues.get(TailFileState.StateKeys.CHECKSUM);
final boolean checksumPresent = (checksumValue != null);
final String storedStateFilename = stateValues.get(TailFileState.StateKeys.FILENAME);
final long position = Long.parseLong(stateValues.get(TailFileState.StateKeys.POSITION));
final long timestamp = Long.parseLong(stateValues.get(TailFileState.StateKeys.TIMESTAMP));
FileChannel reader = null;
File tailFile = null;
if (checksumPresent && currentFilename.equals(storedStateFilename)) {
expectedRecoveryChecksum = Long.parseLong(checksumValue);
// We have an expected checksum and the currently configured filename is the same as the state file.
// We need to check if the existing file is the same as the one referred to in the state file based on
// the checksum.
final Checksum checksum = new CRC32();
final File existingTailFile = new File(storedStateFilename);
if (existingTailFile.length() >= position) {
try (final InputStream tailFileIs = new FileInputStream(existingTailFile);
final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) {
StreamUtils.copy(in, new NullOutputStream(), state.getPosition());
final long checksumResult = in.getChecksum().getValue();
if (checksumResult == expectedRecoveryChecksum) {
// Checksums match. This means that we want to resume reading from where we left off.
// So we will populate the reader object so that it will be used in onTrigger. If the
// checksums do not match, then we will leave the reader object null, so that the next
// call to onTrigger will result in a new Reader being created and starting at the
// beginning of the file.
getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off.");
tailFile = existingTailFile;
reader = FileChannel.open(tailFile.toPath(), StandardOpenOption.READ);
getLogger().debug("Created FileChannel {} for {} in recoverState", new Object[] {reader, tailFile});
reader.position(position);
} else {
// we don't seek the reader to the position, so our reader will start at beginning of file.
getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning.");
}
}
} else {
// fewer bytes than our position, so we know we weren't already reading from this file. Keep reader at a position of 0.
getLogger().debug("When recovering state, existing file to tail is only {} bytes but position flag is {}; "
+ "this indicates that the file has rotated. Will begin tailing current file from beginning.", new Object[] {existingTailFile.length(), position});
}
state = new TailFileState(currentFilename, tailFile, reader, position, timestamp, checksum, ByteBuffer.allocate(65536));
} else {
// If filename changed or there is no checksum present, then we have no expected checksum to use for recovery.
expectedRecoveryChecksum = null;
// tailing a new file since the state file was written out. We will reset state.
state = new TailFileState(currentFilename, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
}
getLogger().debug("Recovered state {}", new Object[] {state});
} }
@ -571,40 +661,27 @@ public class TailFile extends AbstractProcessor {
} }
private Scope getStateScope(final ProcessContext context) {
final String location = context.getProperty(FILE_LOCATION).getValue();
if (LOCATION_REMOTE.getValue().equalsIgnoreCase(location)) {
return Scope.CLUSTER;
}
return Scope.LOCAL;
}
private void persistState(final TailFileState state, final ProcessContext context) { private void persistState(final TailFileState state, final ProcessContext context) {
final String stateFilename = context.getProperty(STATE_FILE).getValue(); persistState(state.toStateMap(), context);
}
private void persistState(final Map<String, String> state, final ProcessContext context) {
try { try {
persistState(state, stateFilename); context.getStateManager().setState(state, getStateScope(context));
} catch (final IOException e) { } catch (final IOException e) {
getLogger().warn("Failed to update state file {} due to {}; some data may be duplicated on restart of NiFi", new Object[] {stateFilename, e}); getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", new Object[] {e});
} }
} }
private void persistState(final TailFileState state, final String stateFilename) throws IOException {
getLogger().debug("Persisting state {} to {}", new Object[] {state, stateFilename});
final File stateFile = new File(stateFilename);
File directory = stateFile.getParentFile();
if (directory != null && !directory.exists() && !directory.mkdirs()) {
getLogger().warn("Failed to persist state to {} because the parent directory does not exist and could not be created. This may result in data being duplicated upon restart of NiFi");
return;
}
try (final FileOutputStream fos = new FileOutputStream(stateFile);
final DataOutputStream dos = new DataOutputStream(fos)) {
dos.writeInt(0); // version
dos.writeUTF(state.getFilename());
dos.writeLong(state.getPosition());
dos.writeLong(state.getTimestamp());
if (state.getChecksum() == null) {
dos.writeBoolean(false);
} else {
dos.writeBoolean(true);
dos.writeLong(state.getChecksum().getValue());
}
}
}
private FileChannel createReader(final File file, final long position) { private FileChannel createReader(final File file, final long position) {
final FileChannel reader; final FileChannel reader;
@ -728,7 +805,7 @@ public class TailFile extends AbstractProcessor {
// must ensure that we do session.commit() before persisting state in order to avoid data loss. // must ensure that we do session.commit() before persisting state in order to avoid data loss.
session.commit(); session.commit();
persistState(state, context.getProperty(STATE_FILE).getValue()); persistState(state, context);
} }
} else { } else {
getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file", getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file",
@ -800,6 +877,13 @@ public class TailFile extends AbstractProcessor {
private final Checksum checksum; private final Checksum checksum;
private final ByteBuffer buffer; private final ByteBuffer buffer;
private static class StateKeys {
public static final String FILENAME = "filename";
public static final String POSITION = "position";
public static final String TIMESTAMP = "timestamp";
public static final String CHECKSUM = "checksum";
}
public TailFileState(final String filename, final File file, final FileChannel reader, final long position, final long timestamp, final Checksum checksum, final ByteBuffer buffer) { public TailFileState(final String filename, final File file, final FileChannel reader, final long position, final long timestamp, final Checksum checksum, final ByteBuffer buffer) {
this.filename = filename; this.filename = filename;
this.file = file; this.file = file;
@ -842,5 +926,14 @@ public class TailFile extends AbstractProcessor {
public String toString() { public String toString() {
return "TailFileState[filename=" + filename + ", position=" + position + ", timestamp=" + timestamp + ", checksum=" + (checksum == null ? "null" : checksum.getValue()) + "]"; return "TailFileState[filename=" + filename + ", position=" + position + ", timestamp=" + timestamp + ", checksum=" + (checksum == null ? "null" : checksum.getValue()) + "]";
} }
public Map<String, String> toStateMap() {
final Map<String, String> map = new HashMap<>(4);
map.put(StateKeys.FILENAME, filename);
map.put(StateKeys.POSITION, String.valueOf(position));
map.put(StateKeys.TIMESTAMP, String.valueOf(timestamp));
map.put(StateKeys.CHECKSUM, checksum == null ? null : String.valueOf(checksum.getValue()));
return map;
}
} }
} }

View File

@ -245,5 +245,10 @@ public class TestAbstractListProcessor {
protected boolean isListingResetNecessary(PropertyDescriptor property) { protected boolean isListingResetNecessary(PropertyDescriptor property) {
return false; return false;
} }
@Override
protected Scope getStateScope(final ProcessContext context) {
return Scope.CLUSTER;
}
} }
} }