mirror of https://github.com/apache/nifi.git
NIFI-1588: Reworked how ListHDFS store state so that only 2 timestamps are stored and no paths
Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
8c488d7e8e
commit
56f79e1e85
|
@ -19,12 +19,14 @@ package org.apache.nifi.processors.hadoop;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -57,6 +59,7 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.hadoop.util.HDFSListing;
|
||||
import org.apache.nifi.processors.hadoop.util.StringSerDe;
|
||||
import org.apache.nifi.processors.hadoop.util.HDFSListing.StateKeys;
|
||||
import org.codehaus.jackson.JsonNode;
|
||||
import org.codehaus.jackson.JsonParseException;
|
||||
import org.codehaus.jackson.map.JsonMappingException;
|
||||
|
@ -119,8 +122,15 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
|||
.description("All FlowFiles are transferred to this relationship")
|
||||
.build();
|
||||
|
||||
private volatile Long lastListingTime = null;
|
||||
private volatile Set<Path> latestPathsListed = new HashSet<>();
|
||||
private volatile long latestTimestampListed = -1L;
|
||||
private volatile long latestTimestampEmitted = -1L;
|
||||
private volatile boolean electedPrimaryNodeSinceLastIteration = false;
|
||||
private volatile long lastRunTimestamp = -1L;
|
||||
|
||||
static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
|
||||
static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
|
||||
|
||||
static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
|
@ -155,12 +165,11 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
|||
return getIdentifier() + ".lastListingTime." + directory;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||
if (isConfigurationRestored() && descriptor.equals(DIRECTORY)) {
|
||||
lastListingTime = null; // clear lastListingTime so that we have to fetch new time
|
||||
latestPathsListed = new HashSet<>();
|
||||
latestTimestampEmitted = -1L;
|
||||
latestTimestampListed = -1L;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -170,6 +179,14 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
|||
return mapper.readValue(jsonNode, HDFSListing.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Transitions state from the Distributed cache service to the state manager. This will be
|
||||
* removed in NiFi 1.x
|
||||
*
|
||||
* @param context the ProcessContext
|
||||
* @throws IOException if unable to communicate with state manager or controller service
|
||||
*/
|
||||
@Deprecated
|
||||
@OnScheduled
|
||||
public void moveStateToStateManager(final ProcessContext context) throws IOException {
|
||||
final StateManager stateManager = context.getStateManager();
|
||||
|
@ -184,6 +201,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
private HDFSListing getListingFromService(final ProcessContext context) throws IOException {
|
||||
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
|
||||
if (client == null) {
|
||||
|
@ -204,28 +222,149 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Restores state information from the 'old' style of storing state. This is deprecated and will no longer be supported
|
||||
* in the 1.x NiFi baseline
|
||||
*
|
||||
* @param directory the directory that the listing was performed against
|
||||
* @param remoteListing the remote listing
|
||||
* @return the minimum timestamp that should be used for new entries
|
||||
*/
|
||||
@Deprecated
|
||||
private Long restoreTimestampFromOldStateFormat(final String directory, final HDFSListing remoteListing) {
|
||||
// No cluster-wide state has been recovered. Just use whatever values we already have.
|
||||
if (remoteListing == null) {
|
||||
return latestTimestampListed;
|
||||
}
|
||||
|
||||
// If our local timestamp is already later than the remote listing's timestamp, use our local info.
|
||||
Long minTimestamp = latestTimestampListed;
|
||||
if (minTimestamp != null && minTimestamp > remoteListing.getLatestTimestamp().getTime()) {
|
||||
return minTimestamp;
|
||||
}
|
||||
|
||||
// Use the remote listing's information.
|
||||
if (minTimestamp == null || electedPrimaryNodeSinceLastIteration) {
|
||||
this.latestTimestampListed = remoteListing.getLatestTimestamp().getTime();
|
||||
this.latestTimestampEmitted = this.latestTimestampListed;
|
||||
}
|
||||
|
||||
return minTimestamp;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Determines which of the given FileStatus's describes a File that should be listed.
|
||||
*
|
||||
* @param statuses the eligible FileStatus objects that we could potentially list
|
||||
* @return a Set containing only those FileStatus objects that we want to list
|
||||
*/
|
||||
Set<FileStatus> determineListable(final Set<FileStatus> statuses) {
|
||||
final long minTimestamp = this.latestTimestampListed;
|
||||
final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
|
||||
|
||||
// Build a sorted map to determine the latest possible entries
|
||||
for (final FileStatus status : statuses) {
|
||||
if (status.getPath().getName().endsWith("_COPYING_")) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final long entityTimestamp = status.getModificationTime();
|
||||
|
||||
if (entityTimestamp > latestTimestampListed) {
|
||||
latestTimestampListed = entityTimestamp;
|
||||
}
|
||||
|
||||
// New entries are all those that occur at or after the associated timestamp
|
||||
final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
|
||||
|
||||
if (newEntry) {
|
||||
List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
|
||||
if (entitiesForTimestamp == null) {
|
||||
entitiesForTimestamp = new ArrayList<FileStatus>();
|
||||
orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
|
||||
}
|
||||
entitiesForTimestamp.add(status);
|
||||
}
|
||||
}
|
||||
|
||||
final Set<FileStatus> toList = new HashSet<>();
|
||||
|
||||
if (orderedEntries.size() > 0) {
|
||||
long latestListingTimestamp = orderedEntries.lastKey();
|
||||
|
||||
// If the last listing time is equal to the newest entries previously seen,
|
||||
// another iteration has occurred without new files and special handling is needed to avoid starvation
|
||||
if (latestListingTimestamp == minTimestamp) {
|
||||
// We are done if the latest listing timestamp is equal to the last processed time,
|
||||
// meaning we handled those items originally passed over
|
||||
if (latestListingTimestamp == latestTimestampEmitted) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
} else {
|
||||
// Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
|
||||
orderedEntries.remove(latestListingTimestamp);
|
||||
}
|
||||
|
||||
for (List<FileStatus> timestampEntities : orderedEntries.values()) {
|
||||
for (FileStatus status : timestampEntities) {
|
||||
toList.add(status);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return toList;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
// We have to ensure that we don't continually perform listings, because if we perform two listings within
|
||||
// the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
|
||||
// not let that happen.
|
||||
final long now = System.nanoTime();
|
||||
if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
|
||||
lastRunTimestamp = now;
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
lastRunTimestamp = now;
|
||||
|
||||
final String directory = context.getProperty(DIRECTORY).getValue();
|
||||
|
||||
// Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
|
||||
Long minTimestamp = null;
|
||||
try {
|
||||
final HDFSListing stateListing;
|
||||
final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
|
||||
if (stateMap.getVersion() == -1L) {
|
||||
stateListing = null;
|
||||
latestPathsListed = new HashSet<>();
|
||||
lastListingTime = null;
|
||||
latestTimestampEmitted = -1L;
|
||||
latestTimestampListed = -1L;
|
||||
getLogger().debug("Found no state stored");
|
||||
} else {
|
||||
final Map<String, String> stateValues = stateMap.toMap();
|
||||
stateListing = HDFSListing.fromMap(stateValues);
|
||||
// Determine if state is stored in the 'new' format or the 'old' format
|
||||
final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
|
||||
if (emittedString == null && stateMap.get(StateKeys.TIMESTAMP) != null) {
|
||||
// state is stored in the old format with XML
|
||||
final Map<String, String> stateValues = stateMap.toMap();
|
||||
final HDFSListing stateListing = HDFSListing.fromMap(stateValues);
|
||||
getLogger().debug("Found old-style state stored");
|
||||
restoreTimestampFromOldStateFormat(directory, stateListing);
|
||||
} else if (emittedString == null) {
|
||||
latestTimestampEmitted = -1L;
|
||||
latestTimestampListed = -1L;
|
||||
getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
|
||||
} else {
|
||||
// state is stored in the new format, using just two timestamps
|
||||
latestTimestampEmitted = Long.parseLong(emittedString);
|
||||
final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
|
||||
if (listingTimestmapString != null) {
|
||||
latestTimestampListed = Long.parseLong(listingTimestmapString);
|
||||
}
|
||||
|
||||
if (stateListing != null) {
|
||||
latestPathsListed = stateListing.toPaths();
|
||||
lastListingTime = minTimestamp = stateListing.getLatestTimestamp().getTime();
|
||||
getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
|
||||
new Object[] {latestTimestampEmitted, latestTimestampListed});
|
||||
}
|
||||
}
|
||||
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
|
||||
context.yield();
|
||||
|
@ -234,80 +373,51 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
|||
|
||||
// Pull in any file that is newer than the timestamp that we have.
|
||||
final FileSystem hdfs = getFileSystem();
|
||||
final String directory = context.getProperty(DIRECTORY).getValue();
|
||||
final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
|
||||
final Path rootPath = new Path(directory);
|
||||
|
||||
int listCount = 0;
|
||||
Long latestListingModTime = null;
|
||||
final Set<FileStatus> statuses;
|
||||
try {
|
||||
statuses = getStatuses(rootPath, recursive, hdfs);
|
||||
for ( final FileStatus status : statuses ) {
|
||||
// don't get anything where the last modified timestamp is equal to our current timestamp.
|
||||
// if we do, then we run the risk of multiple files having the same last mod date but us only
|
||||
// seeing a portion of them.
|
||||
// I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe
|
||||
// only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a
|
||||
// modified date not before the current time.
|
||||
final long fileModTime = status.getModificationTime();
|
||||
|
||||
// we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time.
|
||||
// Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those.
|
||||
boolean fetch = !status.getPath().getName().endsWith("_COPYING_")
|
||||
&& (minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath())));
|
||||
|
||||
// Create the FlowFile for this path.
|
||||
if ( fetch ) {
|
||||
final Map<String, String> attributes = createAttributes(status);
|
||||
FlowFile flowFile = session.create();
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
listCount++;
|
||||
|
||||
if ( latestListingModTime == null || fileModTime > latestListingModTime ) {
|
||||
latestListingModTime = fileModTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe});
|
||||
return;
|
||||
}
|
||||
|
||||
final Set<FileStatus> listable = determineListable(statuses);
|
||||
getLogger().debug("Of the {} files found in HDFS, {} are listable", new Object[] {statuses.size(), listable.size()});
|
||||
|
||||
for (final FileStatus status : listable) {
|
||||
final Map<String, String> attributes = createAttributes(status);
|
||||
FlowFile flowFile = session.create();
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
final long fileModTime = status.getModificationTime();
|
||||
if (fileModTime > latestTimestampEmitted) {
|
||||
latestTimestampEmitted = fileModTime;
|
||||
}
|
||||
}
|
||||
|
||||
final int listCount = listable.size();
|
||||
if ( listCount > 0 ) {
|
||||
getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
|
||||
session.commit();
|
||||
|
||||
// We have performed a listing and pushed the FlowFiles out.
|
||||
// Now, we need to persist state about the Last Modified timestamp of the newest file
|
||||
// that we pulled in. We do this in order to avoid pulling in the same file twice.
|
||||
// However, we want to save the state both locally and remotely.
|
||||
// We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
|
||||
// previously Primary Node left off.
|
||||
final HDFSListing latestListing = createListing(latestListingModTime, statuses);
|
||||
|
||||
try {
|
||||
context.getStateManager().setState(latestListing.toMap(), Scope.CLUSTER);
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
|
||||
}
|
||||
|
||||
lastListingTime = latestListingModTime;
|
||||
latestPathsListed.clear();
|
||||
for (final FileStatus status : statuses) {
|
||||
latestPathsListed.add(status.getPath());
|
||||
}
|
||||
} else {
|
||||
getLogger().debug("There is no data to list. Yielding.");
|
||||
context.yield();
|
||||
}
|
||||
|
||||
// lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
|
||||
if ( lastListingTime == null ) {
|
||||
lastListingTime = 0L;
|
||||
}
|
||||
final Map<String, String> updatedState = new HashMap<>(1);
|
||||
updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(latestTimestampListed));
|
||||
updatedState.put(EMITTED_TIMESTAMP_KEY, String.valueOf(latestTimestampEmitted));
|
||||
getLogger().debug("New state map: {}", new Object[] {updatedState});
|
||||
|
||||
return;
|
||||
try {
|
||||
context.getStateManager().setState(updatedState, Scope.CLUSTER);
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -334,20 +444,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
|||
return statusSet;
|
||||
}
|
||||
|
||||
private HDFSListing createListing(final long latestListingModTime, final Set<FileStatus> statuses) {
|
||||
final Set<String> paths = new HashSet<>();
|
||||
for (final FileStatus status : statuses) {
|
||||
final String path = status.getPath().toUri().toString();
|
||||
paths.add(path);
|
||||
}
|
||||
|
||||
final HDFSListing listing = new HDFSListing();
|
||||
listing.setLatestTimestamp(new Date(latestListingModTime));
|
||||
listing.setMatchingPaths(paths);
|
||||
|
||||
return listing;
|
||||
}
|
||||
|
||||
private String getAbsolutePath(final Path path) {
|
||||
final Path parent = path.getParent();
|
||||
final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
|
||||
|
@ -373,19 +469,19 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
|||
|
||||
private String getPerms(final FsAction action) {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
if ( action.implies(FsAction.READ) ) {
|
||||
if (action.implies(FsAction.READ)) {
|
||||
sb.append("r");
|
||||
} else {
|
||||
sb.append("-");
|
||||
}
|
||||
|
||||
if ( action.implies(FsAction.WRITE) ) {
|
||||
if (action.implies(FsAction.WRITE)) {
|
||||
sb.append("w");
|
||||
} else {
|
||||
sb.append("-");
|
||||
}
|
||||
|
||||
if ( action.implies(FsAction.EXECUTE) ) {
|
||||
if (action.implies(FsAction.EXECUTE)) {
|
||||
sb.append("x");
|
||||
} else {
|
||||
sb.append("-");
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
package org.apache.nifi.processors.hadoop;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
|
@ -31,6 +29,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
|
@ -74,9 +73,14 @@ public class TestListHDFS {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testListingHasCorrectAttributes() {
|
||||
public void testListingHasCorrectAttributes() throws InterruptedException {
|
||||
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
|
||||
|
||||
// first iteration will not pick up files because it has to instead check timestamps.
|
||||
// We must then wait long enough to ensure that the listing can be performed safely and
|
||||
// run the Processor again.
|
||||
runner.run();
|
||||
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
|
||||
|
@ -87,12 +91,17 @@ public class TestListHDFS {
|
|||
|
||||
|
||||
@Test
|
||||
public void testRecursive() {
|
||||
public void testRecursive() throws InterruptedException {
|
||||
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
|
||||
|
||||
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
|
||||
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
|
||||
|
||||
// first iteration will not pick up files because it has to instead check timestamps.
|
||||
// We must then wait long enough to ensure that the listing can be performed safely and
|
||||
// run the Processor again.
|
||||
runner.run();
|
||||
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
|
||||
|
@ -113,13 +122,18 @@ public class TestListHDFS {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testNotRecursive() {
|
||||
public void testNotRecursive() throws InterruptedException {
|
||||
runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
|
||||
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
|
||||
|
||||
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
|
||||
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
|
||||
|
||||
// first iteration will not pick up files because it has to instead check timestamps.
|
||||
// We must then wait long enough to ensure that the listing can be performed safely and
|
||||
// run the Processor again.
|
||||
runner.run();
|
||||
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
|
||||
|
@ -131,9 +145,14 @@ public class TestListHDFS {
|
|||
|
||||
|
||||
@Test
|
||||
public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() throws IOException {
|
||||
public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() throws IOException, InterruptedException {
|
||||
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
|
||||
|
||||
// first iteration will not pick up files because it has to instead check timestamps.
|
||||
// We must then wait long enough to ensure that the listing can be performed safely and
|
||||
// run the Processor again.
|
||||
runner.run();
|
||||
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
|
||||
|
@ -145,7 +164,7 @@ public class TestListHDFS {
|
|||
runner.clearTransferState();
|
||||
|
||||
// add new file to pull
|
||||
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
|
||||
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 2000L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
|
||||
|
||||
// cause calls to service to fail
|
||||
service.failOnCalls = true;
|
||||
|
@ -172,23 +191,55 @@ public class TestListHDFS {
|
|||
|
||||
service.failOnCalls = false;
|
||||
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, false);
|
||||
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
|
||||
Map<String, String> newState = runner.getStateManager().getState(Scope.CLUSTER).toMap();
|
||||
assertEquals("2000", newState.get(ListHDFS.LISTING_TIMESTAMP_KEY));
|
||||
assertEquals("1999", newState.get(ListHDFS.EMITTED_TIMESTAMP_KEY));
|
||||
|
||||
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
|
||||
runner.run();
|
||||
|
||||
newState = runner.getStateManager().getState(Scope.CLUSTER).toMap();
|
||||
assertEquals("2000", newState.get(ListHDFS.LISTING_TIMESTAMP_KEY));
|
||||
assertEquals("2000", newState.get(ListHDFS.EMITTED_TIMESTAMP_KEY));
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
// ensure state saved
|
||||
runner.getStateManager().assertStateSet(Scope.CLUSTER);
|
||||
final Map<String, String> newState = runner.getStateManager().getState(Scope.CLUSTER).toMap();
|
||||
assertEquals(3, newState.size());
|
||||
@Test
|
||||
public void testOnlyNewestEntriesHeldBack() throws InterruptedException {
|
||||
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
|
||||
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 8L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
|
||||
|
||||
final String path0 = newState.get("path.0");
|
||||
final String path1 = newState.get("path.1");
|
||||
assertTrue(path0.equals("/test/testFile.txt") || path0.equals("/test/testFile2.txt"));
|
||||
assertTrue(path1.equals("/test/testFile.txt") || path1.equals("/test/testFile2.txt"));
|
||||
assertNotSame(path0, path1);
|
||||
// this is a directory, so it won't be counted toward the entries
|
||||
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 8L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
|
||||
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 100L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
|
||||
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 100L, 0L, create777(), "owner", "group", new Path("/test/testDir/2.txt")));
|
||||
|
||||
final Long timestamp = Long.parseLong(newState.get("timestamp"));
|
||||
assertEquals(1999L, timestamp.longValue());
|
||||
// The first iteration should pick up 2 files with the smaller timestamps.
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
|
||||
|
||||
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
|
||||
runner.run();
|
||||
|
||||
// Next iteration should pick up the other 2 files, since nothing else was added.
|
||||
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 4);
|
||||
|
||||
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 110L, 0L, create777(), "owner", "group", new Path("/test/testDir/3.txt")));
|
||||
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 4);
|
||||
|
||||
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 5);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -53,6 +53,30 @@ public class TestAbstractListProcessor {
|
|||
@Rule
|
||||
public final TemporaryFolder testFolder = new TemporaryFolder();
|
||||
|
||||
@Test
|
||||
public void testPreviouslySkippedEntriesEmittedOnNextIteration() throws Exception {
|
||||
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
||||
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.run();
|
||||
|
||||
final long initialTimestamp = System.currentTimeMillis();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
proc.addEntity("name", "id", initialTimestamp);
|
||||
proc.addEntity("name", "id2", initialTimestamp);
|
||||
runner.run();
|
||||
|
||||
// First run, the above listed entries would be skipped to avoid write synchronization issues
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
|
||||
|
||||
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
|
||||
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
|
||||
|
||||
// Run again without introducing any new entries
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnlyNewEntriesEmitted() throws Exception {
|
||||
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
||||
|
|
Loading…
Reference in New Issue