NIFI-7263 This closes #4167. Add No Tracking Listing Strategy to ListFile/ListFTP processors

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Waleed Al Aibani 2020-03-28 04:19:14 +03:00 committed by Joe Witt
parent 4ff9cddf15
commit 813ac539b3
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
2 changed files with 129 additions and 7 deletions

View File

@ -198,12 +198,17 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
" However additional DistributedMapCache controller service is required and more JVM heap memory is used." +
" See the description of 'Entity Tracking Time Window' property for further details on how it works.");
public static final PropertyDescriptor LISTING_STRATEGY = new Builder()
public static final AllowableValue NO_TRACKING = new AllowableValue("none", "No Tracking",
"This strategy lists an entity without any tracking. The same entity will be listed each time" +
" on executing this processor. It is recommended to change the default run schedule value." +
" Any property that related to the persisting state will be disregarded.");
public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder()
.name("listing-strategy")
.displayName("Listing Strategy")
.description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
.required(true)
.allowableValues(BY_TIMESTAMPS, BY_ENTITIES)
.allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING)
.defaultValue(BY_TIMESTAMPS.getValue())
.build();
@ -232,7 +237,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
private volatile boolean resetState = false;
private volatile boolean resetEntityTrackingState = false;
private volatile List<String> latestIdentifiersProcessed = new ArrayList<>();
private volatile ListedEntityTracker<T> listedEntityTracker;
/*
@ -442,11 +446,62 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
} else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session);
} else if (NO_TRACKING.equals(listingStrategy)) {
listByNoTracking(context, session);
} else {
throw new ProcessException("Unknown listing strategy: " + listingStrategy);
}
}
public void listByNoTracking(final ProcessContext context, final ProcessSession session) {
final List<T> entityList;
try {
// Remove any previous state from the state manager before use a No Tracking Strategy.
context.getStateManager().clear(getStateScope(context));
} catch (final IOException re) {
getLogger().error("Failed to remove previous state from the State Manager.", new Object[]{re.getMessage()}, re);
context.yield();
return;
}
try {
// minTimestamp = 0L by default on this strategy to ignore any future
// comparision in lastModifiedMap to the same entity.
entityList = performListing(context, 0L);
} catch (final IOException pe) {
getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{pe.getMessage()}, pe);
context.yield();
return;
}
if (entityList == null || entityList.isEmpty()) {
context.yield();
return;
}
final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
for (final T entity : entityList) {
List<T> entitiesForTimestamp = orderedEntries.computeIfAbsent(entity.getTimestamp(), k -> new ArrayList<T>());
entitiesForTimestamp.add(entity);
}
if (orderedEntries.size() > 0) {
for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
List<T> entities = timestampEntities.getValue();
for (T entity : entities) {
// Create the FlowFile for this path.
final Map<String, String> attributes = createAttributes(entity, context);
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
}
}
}
}
public void listByTrackingTimestamps(final ProcessContext context, final ProcessSession session) throws ProcessException {
Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;

View File

@ -20,6 +20,7 @@ package org.apache.nifi.processor.util.list;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
@ -28,6 +29,7 @@ import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
@ -52,11 +54,11 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Comparator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -208,6 +210,58 @@ public class TestAbstractListProcessor {
runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 0);
}
@Test
public void testNoTrackingEntityStrategy() throws IOException {
// Firstly, choose Timestamp Strategy lists 2 entities and set state.
// After that choose No Tracking Strategy to test if this strategy remove the state.
ProcessSession session = runner.getProcessSessionFactory().createSession();
ProcessContext context = runner.getProcessContext();
runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIMESTAMPS);
// two entities listed
proc.addEntity("one","firstFile",1585344381476L);
proc.addEntity("two","secondFile",1585344381475L);
runner.run();
assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
assertEquals(2, proc.entities.size());
final MockStateManager stateManager = runner.getStateManager();
final Map<String, String> expectedState = new HashMap<>();
final Map<String, String> realState = new HashMap<>();
realState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(proc.entities.get("firstFile").getTimestamp()));
realState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(proc.entities.get("secondFile").getTimestamp()));
realState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", proc.entities.get("firstFile").getIdentifier());
realState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", proc.entities.get("secondFile").getIdentifier());
stateManager.setState(realState, Scope.CLUSTER);
// Ensure timestamp and identifies are migrated
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(proc.entities.get("firstFile").getTimestamp()));
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(proc.entities.get("secondFile").getTimestamp()));
expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", proc.entities.get("firstFile").getIdentifier());
expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", proc.entities.get("secondFile").getIdentifier());
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
// Change listing strategy
runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.NO_TRACKING);
// Clear any listed entities after choose No Tracking Strategy
proc.entities.clear();
// Add new entity
proc.addEntity("one","firstFile",1585344381476L);
proc.listByNoTracking(context, session);
// Test if state cleared or not
runner.getStateManager().assertStateNotEquals(expectedState, Scope.CLUSTER);
assertEquals(1, proc.entities.size());
}
@Test
public void testEntityTrackingStrategy() throws InitializationException {
runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES);
@ -487,5 +541,18 @@ public class TestAbstractListProcessor {
fields.add(new RecordField("size", RecordFieldType.LONG.getDataType()));
return new SimpleRecordSchema(fields);
}
private void persist(final long latestListedEntryTimestampThisCycleMillis,
final long lastProcessedLatestEntryTimestampMillis,
final List<String> processedIdentifiesWithLatestTimestamp,
final StateManager stateManager, final Scope scope) throws IOException {
final Map<String, String> updatedState = new HashMap<>(processedIdentifiesWithLatestTimestamp.size() + 2);
updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis));
for (int i = 0; i < processedIdentifiesWithLatestTimestamp.size(); i++) {
updatedState.put(IDENTIFIER_PREFIX + "." + i, processedIdentifiesWithLatestTimestamp.get(i));
}
stateManager.setState(updatedState, scope);
}
}
}