NIFI-1587: Always poll state from State Manager when running ListHDFS instead of relying on local state over the cluster state

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2016-03-04 16:06:42 -05:00 committed by joewitt
parent 5a8b2cf7f1
commit 8c488d7e8e
2 changed files with 10 additions and 38 deletions

View File

@ -42,8 +42,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManager;
@ -123,7 +121,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
private volatile Long lastListingTime = null; private volatile Long lastListingTime = null;
private volatile Set<Path> latestPathsListed = new HashSet<>(); private volatile Set<Path> latestPathsListed = new HashSet<>();
private volatile boolean electedPrimaryNode = false;
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
@ -158,12 +155,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
return getIdentifier() + ".lastListingTime." + directory; return getIdentifier() + ".lastListingTime." + directory;
} }
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) {
electedPrimaryNode = true;
}
}
@Override @Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
@ -214,44 +205,27 @@ public class ListHDFS extends AbstractHadoopProcessor {
} }
private Long getMinTimestamp(final String directory, final HDFSListing remoteListing) throws IOException {
// No cluster-wide state has been recovered. Just use whatever values we already have.
if (remoteListing == null) {
return lastListingTime;
}
// If our local timestamp is already later than the remote listing's timestamp, use our local info.
Long minTimestamp = lastListingTime;
if (minTimestamp != null && minTimestamp > remoteListing.getLatestTimestamp().getTime()) {
return minTimestamp;
}
// Use the remote listing's information.
if (minTimestamp == null || electedPrimaryNode) {
this.latestPathsListed = remoteListing.toPaths();
this.lastListingTime = remoteListing.getLatestTimestamp().getTime();
}
return minTimestamp;
}
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final String directory = context.getProperty(DIRECTORY).getValue();
// Ensure that we are using the latest listing information before we try to perform a listing of HDFS files. // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
final Long minTimestamp; Long minTimestamp = null;
try { try {
final HDFSListing stateListing; final HDFSListing stateListing;
final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER); final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
if (stateMap.getVersion() == -1L) { if (stateMap.getVersion() == -1L) {
stateListing = null; stateListing = null;
latestPathsListed = new HashSet<>();
lastListingTime = null;
} else { } else {
final Map<String, String> stateValues = stateMap.toMap(); final Map<String, String> stateValues = stateMap.toMap();
stateListing = HDFSListing.fromMap(stateValues); stateListing = HDFSListing.fromMap(stateValues);
if (stateListing != null) {
latestPathsListed = stateListing.toPaths();
lastListingTime = minTimestamp = stateListing.getLatestTimestamp().getTime();
}
} }
minTimestamp = getMinTimestamp(directory, stateListing);
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
context.yield(); context.yield();
@ -260,6 +234,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
// Pull in any file that is newer than the timestamp that we have. // Pull in any file that is newer than the timestamp that we have.
final FileSystem hdfs = getFileSystem(); final FileSystem hdfs = getFileSystem();
final String directory = context.getProperty(DIRECTORY).getValue();
final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
final Path rootPath = new Path(directory); final Path rootPath = new Path(directory);
@ -339,6 +314,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException { private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException {
final Set<FileStatus> statusSet = new HashSet<>(); final Set<FileStatus> statusSet = new HashSet<>();
getLogger().debug("Fetching listing for {}", new Object[] {path});
final FileStatus[] statuses = hdfs.listStatus(path); final FileStatus[] statuses = hdfs.listStatus(path);
for ( final FileStatus status : statuses ) { for ( final FileStatus status : statuses ) {

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Deserializer;
@ -148,9 +147,6 @@ public class TestListHDFS {
// add new file to pull // add new file to pull
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt"))); proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
// trigger primary node change
proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
// cause calls to service to fail // cause calls to service to fail
service.failOnCalls = true; service.failOnCalls = true;