NIFI-54096: Consolidating new model into existing List processors.

Adding followings:

- Use separate DistributedMapCache for tracking entities to avoid
conflict with existing code
- Added more validation
- Delete listed entities from cache if reset is needed
- Support Local scope
- Added Initial Listing Target

This closes #2876.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Koji Kawamura 2018-07-10 17:00:38 +09:00 committed by Mark Payne
parent 0a493bf7fd
commit 8b9d446118
11 changed files with 705 additions and 41 deletions

View File

@ -40,10 +40,12 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
@ -92,12 +94,17 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
LISTING_STRATEGY,
AzureStorageUtils.CONTAINER,
AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY,
PROP_PREFIX,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE,
ListedEntityTracker.TRACKING_STATE_CACHE,
ListedEntityTracker.TRACKING_TIME_WINDOW,
ListedEntityTracker.INITIAL_LISTING_TARGET
));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -105,10 +112,8 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Collection<ValidationResult> results = AzureStorageUtils.validateCredentialProperties(validationContext);
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
AzureStorageUtils.validateProxySpec(validationContext, results);
return results;
}
@Override
@ -144,7 +149,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
}
@Override
protected Scope getStateScope(final ProcessContext context) {
protected Scope getStateScope(final PropertyContext context) {
return Scope.CLUSTER;
}

View File

@ -209,4 +209,9 @@ public class BlobInfo implements Comparable<BlobInfo>, Serializable, ListableEnt
public long getTimestamp() {
return getLastModifiedTime();
}
@Override
public long getSize() {
return length;
}
}

View File

@ -27,9 +27,12 @@ import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
@ -49,6 +52,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -62,8 +66,8 @@ import java.util.stream.Collectors;
/**
* <p>
* An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote resources.
* Those remote resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that
* An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote or local resources.
* Those resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that
* we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor.
* </p>
* <p>
@ -83,6 +87,9 @@ import java.util.stream.Collectors;
* than the last timestamp pulled, then the entity is considered new.
* </li>
* <li>
* With 'Tracking Entities' strategy, the size of entity content is also used to determine if an entity is "new". If the size changes the entity is considered "new".
* </li>
* <li>
* Entity must have a user-readable name that can be used for logging purposes.
* </li>
* </ul>
@ -96,9 +103,10 @@ import java.util.stream.Collectors;
* NOTE: This processor performs migrations of legacy state mechanisms inclusive of locally stored, file-based state and the optional utilization of the <code>Distributed Cache
* Service</code> property to the new {@link StateManager} functionality. Upon successful migration, the associated data from one or both of the legacy mechanisms is purged.
* </p>
*
* <p>
* For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set
* of attributes (defined by the concrete implementation) that can be used to fetch those remote resources or interact with them in whatever way makes sense for
* of attributes (defined by the concrete implementation) that can be used to fetch those resources or interact with them in whatever way makes sense for
* the configured dataflow.
* </p>
* <p>
@ -106,8 +114,8 @@ import java.util.stream.Collectors;
* </p>
* <ul>
* <li>
* Perform a listing of remote resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all
* entities on the remote system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those
* Perform a listing of resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all
* entities on the target system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those
* entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability
* to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation.
* </li>
@ -138,9 +146,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node "
+ "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. "
+ "This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.")
.description("NOTE: This property is used merely for migration from old NiFi version before state management was introduced at version 0.5.0. "
+ "The stored value in the cache service will be migrated into the state when this processor is started at the first time. "
+ "The specified Controller Service was used to maintain state about what had been pulled from the remote server so that if a new node "
+ "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information was not shared across the cluster. "
+ "This property did not need to be set for standalone instances of NiFi but was supposed to be configured if NiFi had been running within a cluster.")
.required(false)
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
@ -169,6 +179,28 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
.description("All FlowFiles that are received are routed to success")
.build();
public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps",
"This strategy tracks the latest timestamp of listed entity to determine new/updated entities." +
" Since it only tracks few timestamps, it can manage listing state efficiently." +
" However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy." +
" For example, such situation can happen in a file system if a file with old timestamp" +
" is copied or moved into the target directory without its last modified timestamp being updated.");
public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities",
"This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities." +
" This strategy can pick entities having old timestamp that can be missed with 'Tracing Timestamps'." +
" However additional DistributedMapCache controller service is required and more JVM heap memory is used." +
" See the description of 'Entity Tracking Time Window' property for further details on how it works.");
public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder()
.name("listing-strategy")
.displayName("Listing Strategy")
.description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
.required(true)
.allowableValues(BY_TIMESTAMPS, BY_ENTITIES)
.defaultValue(BY_TIMESTAMPS.getValue())
.build();
/**
* Represents the timestamp of an entity which was the latest one within those listed at the previous cycle.
* It does not necessary mean it has been processed as well.
@ -185,6 +217,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
private volatile boolean resetState = false;
private volatile List<String> latestIdentifiersProcessed = new ArrayList<>();
private volatile ListedEntityTracker<T> listedEntityTracker;
/*
* A constant used in determining an internal "yield" of processing files. Given the logic to provide a pause on the newest
* files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled
@ -206,14 +240,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
return new File("conf/state/" + getIdentifier());
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DISTRIBUTED_CACHE_SERVICE);
properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
return properties;
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (isConfigurationRestored() && isListingResetNecessary(descriptor)) {
@ -230,6 +256,32 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
return relationships;
}
/**
* In order to add custom validation at sub-classes, implement {@link #customValidate(ValidationContext, Collection)} method.
*/
@Override
protected final Collection<ValidationResult> customValidate(ValidationContext context) {
final Collection<ValidationResult> results = new ArrayList<>();
final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
if (BY_ENTITIES.equals(listingStrategy)) {
ListedEntityTracker.validateProperties(context, results, getStateScope(context));
}
customValidate(context, results);
return results;
}
/**
* Sub-classes can add custom validation by implementing this method.
* @param validationContext the validation context
* @param validationResults add custom validation result to this collection
*/
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> validationResults) {
}
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
@ -260,7 +312,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
if (resetState) {
context.getStateManager().clear(getStateScope(context));
resetState = false;
}
}
@ -352,9 +403,24 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
return mapper.readValue(serializedState, EntityListing.class);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
resetState = false;
final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
if (BY_TIMESTAMPS.equals(listingStrategy)) {
listByTrackingTimestamps(context, session);
} else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session);
} else {
throw new ProcessException("Unknown listing strategy: " + listingStrategy);
}
}
public void listByTrackingTimestamps(final ProcessContext context, final ProcessSession session) throws ProcessException {
Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;
if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
@ -624,7 +690,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
* @param context the ProcessContext to use in order to make a determination
* @return a Scope that specifies where the state should be managed for this Processor
*/
protected abstract Scope getStateScope(final ProcessContext context);
protected abstract Scope getStateScope(final PropertyContext context);
private static class StringSerDe implements Serializer<String>, Deserializer<String> {
@ -642,4 +708,41 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
out.write(value.getBytes(StandardCharsets.UTF_8));
}
}
@OnScheduled
public void initListedEntityTracker(ProcessContext context) {
final boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
if (listedEntityTracker != null && (resetState || !isTrackingEntityStrategy)) {
try {
listedEntityTracker.clearListedEntities();
} catch (IOException e) {
throw new RuntimeException("Failed to reset previously listed entities due to " + e, e);
}
}
if (isTrackingEntityStrategy) {
if (listedEntityTracker == null) {
listedEntityTracker = createListedEntityTracker();
}
} else {
listedEntityTracker = null;
}
}
protected ListedEntityTracker<T> createListedEntityTracker() {
return new ListedEntityTracker<>(getIdentifier(), getLogger());
}
private void listByTrackingEntities(ProcessContext context, ProcessSession session) throws ProcessException {
listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, getStateScope(context), minTimestampToList -> {
try {
return performListing(context, minTimestampToList);
} catch (final IOException e) {
getLogger().error("Failed to perform listing on remote host due to {}", e);
return Collections.emptyList();
}
}, entity -> createAttributes(entity, context));
justElectedPrimaryNode = false;
}
}

View File

@ -37,4 +37,9 @@ public interface ListableEntity {
*/
long getTimestamp();
/**
* @return the size of the entity content.
*/
long getSize();
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.list;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class ListedEntity {
/**
* Milliseconds.
*/
private final long timestamp;
/**
* Bytes.
*/
private final long size;
@JsonCreator
public ListedEntity(@JsonProperty("timestamp") long timestamp, @JsonProperty("size") long size) {
this.timestamp = timestamp;
this.size = size;
}
public long getTimestamp() {
return timestamp;
}
public long getSize() {
return size;
}
}

View File

@ -0,0 +1,342 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.list;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.util.StringUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import static java.lang.String.format;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
public class ListedEntityTracker<T extends ListableEntity> {
private final ObjectMapper objectMapper = new ObjectMapper();
private volatile Map<String, ListedEntity> alreadyListedEntities;
private static final String NOTE = "Used by 'Tracking Entities' strategy.";
public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder()
.name("et-state-cache")
.displayName("Entity Tracking State Cache")
.description(format("Listed entities are stored in the specified cache storage" +
" so that this processor can resume listing across NiFi restart or in case of primary node change." +
" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." +
" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." +
" Cache key format is 'ListedEntities::{processorId}(::{nodeId})'." +
" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." +
" E.g. cluster wide cache key = 'ListedEntities::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
" per node cache key = 'ListedEntities::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
" The stored cache content is Gzipped JSON string." +
" The cache key will be deleted when target listing configuration is changed." +
" %s", NOTE))
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
public static final PropertyDescriptor TRACKING_TIME_WINDOW = new PropertyDescriptor.Builder()
.name("et-time-window")
.displayName("Entity Tracking Time Window")
.description(format("Specify how long this processor should track already-listed entities." +
" 'Tracking Entities' strategy can pick any entity whose timestamp is inside the specified time window." +
" For example, if set to '30 minutes', any entity having timestamp in recent 30 minutes will be the listing target when this processor runs." +
" A listed entity is considered 'new/updated' and a FlowFile is emitted if one of following condition meets:" +
" 1. does not exist in the already-listed entities," +
" 2. has newer timestamp than the cached entity," +
" 3. has different size than the cached entity." +
" If a cached entity's timestamp becomes older than specified time window, that entity will be removed from the cached already-listed entities." +
" %s", NOTE))
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("3 hours")
.build();
private static final AllowableValue INITIAL_LISTING_TARGET_ALL = new AllowableValue("all", "All Available",
"Regardless of entities timestamp, all existing entities will be listed at the initial listing activity.");
private static final AllowableValue INITIAL_LISTING_TARGET_WINDOW = new AllowableValue("window", "Tracking Time Window",
"Ignore entities having timestamp older than the specified 'Tracking Time Window' at the initial listing activity.");
public static final PropertyDescriptor INITIAL_LISTING_TARGET = new PropertyDescriptor.Builder()
.name("et-initial-listing-target")
.displayName("Entity Tracking Initial Listing Target")
.description(format("Specify how initial listing should be handled." +
" %s", NOTE))
.allowableValues(INITIAL_LISTING_TARGET_WINDOW, INITIAL_LISTING_TARGET_ALL)
.defaultValue(INITIAL_LISTING_TARGET_ALL.getValue())
.build();
public static final PropertyDescriptor NODE_IDENTIFIER = new PropertyDescriptor.Builder()
.name("et-node-identifier")
.displayName("Entity Tracking Node Identifier")
.description(format("The configured value will be appended to the cache key" +
" so that listing state can be tracked per NiFi node rather than cluster wide" +
" when tracking state is scoped to LOCAL. %s", NOTE))
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("${hostname()}")
.build();
static final Supplier<Long> DEFAULT_CURRENT_TIMESTAMP_SUPPLIER = System::currentTimeMillis;
private final Supplier<Long> currentTimestampSupplier;
private final Serializer<String> stringSerializer = (v, o) -> o.write(v.getBytes(StandardCharsets.UTF_8));
private final Serializer<Map<String, ListedEntity>> listedEntitiesSerializer = (v, o) -> {
final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(o);
objectMapper.writeValue(gzipOutputStream, v);
// Finish writing gzip data without closing the underlying stream.
gzipOutputStream.finish();
};
private final Deserializer<Map<String, ListedEntity>> listedEntitiesDeserializer = v -> {
if (v == null || v.length == 0) {
return null;
}
try (final GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(v))) {
return objectMapper.readValue(in, new TypeReference<Map<String, ListedEntity>>() {});
}
};
private final String componentId;
private final ComponentLog logger;
/*
* The scope, nodeId and mapCacheClient being used at the previous trackEntities method execution is captured,
* so that it can be used when resetListedEntities is called.
*/
private Scope scope;
private String nodeId;
private DistributedMapCacheClient mapCacheClient;
ListedEntityTracker(String componentId, ComponentLog logger) {
this(componentId, logger, DEFAULT_CURRENT_TIMESTAMP_SUPPLIER);
}
/**
* This constructor is used by unit test code so that it can produce the consistent result by controlling current timestamp.
* @param currentTimestampSupplier a function to return current timestamp.
*/
ListedEntityTracker(String componentId, ComponentLog logger, Supplier<Long> currentTimestampSupplier) {
this.componentId = componentId;
this.logger = logger;
this.currentTimestampSupplier = currentTimestampSupplier;
}
static void validateProperties(ValidationContext context, Collection<ValidationResult> results, Scope scope) {
validateRequiredProperty(context, results, ListedEntityTracker.TRACKING_STATE_CACHE);
validateRequiredProperty(context, results, ListedEntityTracker.TRACKING_TIME_WINDOW);
if (Scope.LOCAL.equals(scope)
&& StringUtils.isEmpty(context.getProperty(NODE_IDENTIFIER).evaluateAttributeExpressions().getValue())) {
results.add(new ValidationResult.Builder()
.subject(NODE_IDENTIFIER.getDisplayName())
.explanation(format("'%s' is required to use local scope with '%s' listing strategy",
NODE_IDENTIFIER.getDisplayName(), AbstractListProcessor.BY_ENTITIES.getDisplayName()))
.build());
}
}
private static void validateRequiredProperty(ValidationContext context, Collection<ValidationResult> results, PropertyDescriptor property) {
if (!context.getProperty(property).isSet()) {
final String displayName = property.getDisplayName();
results.add(new ValidationResult.Builder()
.subject(displayName)
.explanation(format("'%s' is required to use '%s' listing strategy", displayName, AbstractListProcessor.BY_ENTITIES.getDisplayName()))
.valid(false)
.build());
}
}
private static final String CACHE_KEY_PREFIX = "ListedEntities";
private String getCacheKey() {
switch (scope) {
case LOCAL:
return format("%s::%s::%s", CACHE_KEY_PREFIX, componentId, nodeId);
case CLUSTER:
return format("%s::%s", CACHE_KEY_PREFIX, componentId);
}
throw new IllegalArgumentException("Unknown scope: " + scope);
}
private void persistListedEntities(Map<String, ListedEntity> listedEntities) throws IOException {
final String cacheKey = getCacheKey();
logger.debug("Persisting listed entities: {}={}", new Object[]{cacheKey, listedEntities});
mapCacheClient.put(cacheKey, listedEntities, stringSerializer, listedEntitiesSerializer);
}
private Map<String, ListedEntity> fetchListedEntities() throws IOException {
final String cacheKey = getCacheKey();
final Map<String, ListedEntity> listedEntities = mapCacheClient.get(cacheKey, stringSerializer, listedEntitiesDeserializer);
logger.debug("Fetched listed entities: {}={}", new Object[]{cacheKey, listedEntities});
return listedEntities;
}
void clearListedEntities() throws IOException {
alreadyListedEntities = null;
if (mapCacheClient != null) {
final String cacheKey = getCacheKey();
logger.debug("Removing listed entities from cache storage: {}", new Object[]{cacheKey});
mapCacheClient.remove(cacheKey, stringSerializer);
}
}
public void trackEntities(ProcessContext context, ProcessSession session,
boolean justElectedPrimaryNode,
Scope scope,
Function<Long, Collection<T>> listEntities,
Function<T, Map<String, String>> createAttributes) throws ProcessException {
boolean initialListing = false;
mapCacheClient = context.getProperty(TRACKING_STATE_CACHE).asControllerService(DistributedMapCacheClient.class);
this.scope = scope;
if (Scope.LOCAL.equals(scope)) {
nodeId = context.getProperty(ListedEntityTracker.NODE_IDENTIFIER).evaluateAttributeExpressions().getValue();
} else {
nodeId = null;
}
if (alreadyListedEntities == null || justElectedPrimaryNode) {
logger.info(justElectedPrimaryNode
? "Just elected as Primary node, restoring already-listed entities."
: "At the first onTrigger, restoring already-listed entities.");
try {
final Map<String, ListedEntity> fetchedListedEntities = fetchListedEntities();
if (fetchedListedEntities == null) {
this.alreadyListedEntities = new ConcurrentHashMap<>();
initialListing = true;
} else {
this.alreadyListedEntities = new ConcurrentHashMap<>(fetchedListedEntities);
}
} catch (IOException e) {
throw new ProcessException("Failed to restore already-listed entities due to " + e, e);
}
}
final long currentTimeMillis = currentTimestampSupplier.get();
final long watchWindowMillis = context.getProperty(TRACKING_TIME_WINDOW).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final String initialListingTarget = context.getProperty(INITIAL_LISTING_TARGET).getValue();
final long minTimestampToList = (initialListing && INITIAL_LISTING_TARGET_ALL.getValue().equals(initialListingTarget))
? -1 : currentTimeMillis - watchWindowMillis;
final Collection<T> listedEntities = listEntities.apply(minTimestampToList);
if (listedEntities.size() == 0) {
logger.debug("No entity is listed. Yielding.");
context.yield();
return;
}
final List<T> updatedEntities = listedEntities.stream().filter(entity -> {
final String identifier = entity.getIdentifier();
if (entity.getTimestamp() < minTimestampToList) {
logger.trace("Skipped {} having older timestamp than the minTimestampToList {}.", new Object[]{identifier, entity.getTimestamp(), minTimestampToList});
return false;
}
final ListedEntity alreadyListedEntity = alreadyListedEntities.get(identifier);
if (alreadyListedEntity == null) {
logger.trace("Picked {} being newly found.", new Object[]{identifier});
return true;
}
if (entity.getTimestamp() > alreadyListedEntity.getTimestamp()) {
logger.trace("Picked {} having newer timestamp {} than {}.",
new Object[]{identifier, entity.getTimestamp(), alreadyListedEntity.getTimestamp()});
return true;
}
if (entity.getSize() != alreadyListedEntity.getSize()) {
logger.trace("Picked {} having different size {} than {}.",
new Object[]{identifier, entity.getSize(), alreadyListedEntity.getSize()});
return true;
}
logger.trace("Skipped {}, not changed.", new Object[]{identifier, entity.getTimestamp(), minTimestampToList});
return false;
}).collect(Collectors.toList());
// Find old enough entries.
final List<String> oldEntityIds = alreadyListedEntities.entrySet().stream()
.filter(entry -> entry.getValue().getTimestamp() < minTimestampToList).map(Map.Entry::getKey)
.collect(Collectors.toList());
if (updatedEntities.isEmpty() && oldEntityIds.isEmpty()) {
logger.debug("None of updated or old entity was found. Yielding.");
context.yield();
return;
}
// Remove old entries.
oldEntityIds.forEach(oldEntityId -> alreadyListedEntities.remove(oldEntityId));
// Emit updated entities.
for (T updatedEntity : updatedEntities) {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, createAttributes.apply(updatedEntity));
session.transfer(flowFile, REL_SUCCESS);
// In order to reduce object size, discard meta data captured at the sub-classes.
final ListedEntity listedEntity = new ListedEntity(updatedEntity.getTimestamp(), updatedEntity.getSize());
alreadyListedEntities.put(updatedEntity.getIdentifier(), listedEntity);
}
// Commit ProcessSession before persisting listed entities.
// In case persisting listed entities failure, same entities may be listed again, but better than not listing.
session.commit();
try {
logger.debug("Removed old entities count: {}, Updated entities count: {}",
new Object[]{oldEntityIds.size(), updatedEntities.size()});
if (logger.isTraceEnabled()) {
logger.trace("Removed old entities: {}, Updated entities: {}",
new Object[]{oldEntityIds, updatedEntities});
}
persistListedEntities(alreadyListedEntities);
} catch (IOException e) {
throw new ProcessException("Failed to persist already-listed entities due to " + e, e);
}
}
}

View File

@ -72,7 +72,7 @@ public class ITAbstractListProcessor {
throw new RuntimeException("Failed to retrieve state", e);
}
},
() -> proc.entities,
() -> proc.getEntityList(),
() -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList())
);

View File

@ -18,7 +18,9 @@
package org.apache.nifi.processor.util.list;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
@ -42,12 +44,13 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -82,7 +85,7 @@ public class TestAbstractListProcessor {
throw new RuntimeException("Failed to retrieve state", e);
}
},
() -> proc.entities,
() -> proc.getEntityList(),
() -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList())
);
@ -171,6 +174,109 @@ public class TestAbstractListProcessor {
assertEquals(1, cache.fetchCount);
}
@Test
public void testEntityTrackingStrategy() throws InitializationException {
runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES);
// Require a cache service.
runner.assertNotValid();
final DistributedCache trackingCache = new DistributedCache();
runner.addControllerService("tracking-cache", trackingCache);
runner.enableControllerService(trackingCache);
runner.setProperty(ListedEntityTracker.TRACKING_STATE_CACHE, "tracking-cache");
runner.setProperty(ListedEntityTracker.TRACKING_TIME_WINDOW, "10ms");
runner.assertValid();
proc.currentTimestamp.set(0L);
runner.run();
assertEquals(0, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
// Should list one entity.
proc.addEntity("one", "one", 1, 1);
proc.currentTimestamp.set(1L);
runner.clearTransferState();
runner.run();
assertEquals(1, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "one");
// Should not list any entity.
proc.currentTimestamp.set(2L);
runner.clearTransferState();
runner.run();
assertEquals(0, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
// Should list added entities.
proc.currentTimestamp.set(10L);
proc.addEntity("five", "five", 5, 5);
proc.addEntity("six", "six", 6, 6);
runner.clearTransferState();
runner.run();
assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "five");
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(1)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "six");
// Should be able to list entities having older timestamp than the previously listed entity.
// But if its timestamp is out of tracking window, then it won't be picked.
// Current timestamp = 13, and window = 10ms, meaning it can pick entities having timestamp 3 to 13.
proc.currentTimestamp.set(13L);
proc.addEntity("two", "two", 2, 2);
proc.addEntity("three", "three", 3, 3);
proc.addEntity("four", "four", 4, 4);
runner.clearTransferState();
runner.run();
assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "three");
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(1)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "four");
// Can pick entity that has newer timestamp.
// Can pick entity that has different size.
proc.currentTimestamp.set(14L);
proc.addEntity("five", "five", 7, 5);
proc.addEntity("six", "six", 6, 16);
runner.clearTransferState();
runner.run();
assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "six");
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(1)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "five");
// Reset state.
// Current timestamp = 15, and window = 11ms, meaning it can pick entities having timestamp 4 to 15.
proc.currentTimestamp.set(15L);
// ConcreteListProcessor can reset state with any property.
runner.setProperty(ListedEntityTracker.TRACKING_TIME_WINDOW, "11ms");
runner.setProperty(ConcreteListProcessor.RESET_STATE, "1");
runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "window");
runner.clearTransferState();
runner.run();
assertEquals(3, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "four");
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(1)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "six");
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(2)
.assertAttributeEquals(CoreAttributes.FILENAME.key(), "five");
// Reset state again.
proc.currentTimestamp.set(20L);
// ConcreteListProcessor can reset state with any property.
runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "all");
runner.setProperty(ConcreteListProcessor.RESET_STATE, "2");
runner.clearTransferState();
runner.run();
// All entities should be picked, one to six.
assertEquals(6, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
}
static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
private final Map<Object, Object> stored = new HashMap<>();
private int fetchCount = 0;
@ -230,18 +336,47 @@ public class TestAbstractListProcessor {
}
static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> {
final List<ListableEntity> entities = new ArrayList<>();
final Map<String, ListableEntity> entities = new HashMap<>();
final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json";
String persistenceFolder = "target/";
File persistenceFile = new File(persistenceFolder + persistenceFilename);
private static PropertyDescriptor RESET_STATE = new PropertyDescriptor.Builder()
.name("reset-state")
.addValidator(Validator.VALID)
.build();
final AtomicReference<Long> currentTimestamp = new AtomicReference<>();
@Override
protected ListedEntityTracker<ListableEntity> createListedEntityTracker() {
return new ListedEntityTracker<>(getIdentifier(), getLogger(), () -> currentTimestamp.get());
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(LISTING_STRATEGY);
properties.add(DISTRIBUTED_CACHE_SERVICE);
properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
properties.add(ListedEntityTracker.TRACKING_STATE_CACHE);
properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW);
properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET);
properties.add(RESET_STATE);
return properties;
}
@Override
public File getPersistenceFile() {
return persistenceFile;
}
public void addEntity(final String name, final String identifier, final long timestamp) {
addEntity(name, identifier, timestamp, 0);
}
public void addEntity(final String name, final String identifier, final long timestamp, long size) {
final ListableEntity entity = new ListableEntity() {
@Override
public String getName() {
@ -257,9 +392,14 @@ public class TestAbstractListProcessor {
public long getTimestamp() {
return timestamp;
}
@Override
public long getSize() {
return size;
}
};
entities.add(entity);
entities.put(entity.getIdentifier(), entity);
}
@Override
@ -276,16 +416,20 @@ public class TestAbstractListProcessor {
@Override
protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
return Collections.unmodifiableList(entities);
return getEntityList();
}
List<ListableEntity> getEntityList() {
return entities.values().stream().sorted(Comparator.comparing(ListableEntity::getTimestamp)).collect(Collectors.toList());
}
@Override
protected boolean isListingResetNecessary(PropertyDescriptor property) {
return false;
return RESET_STATE.equals(property);
}
@Override
protected Scope getStateScope(final ProcessContext context) {
protected Scope getStateScope(final PropertyContext context) {
return Scope.CLUSTER;
}
}

View File

@ -34,7 +34,9 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.FTPTransfer;
@ -69,6 +71,7 @@ public class ListFTP extends ListFileTransfer {
final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("21").build();
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(LISTING_STRATEGY);
properties.add(HOSTNAME);
properties.add(port);
properties.add(USERNAME);
@ -92,6 +95,9 @@ public class ListFTP extends ListFileTransfer {
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
properties.add(FTPTransfer.BUFFER_SIZE);
properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
properties.add(ListedEntityTracker.TRACKING_STATE_CACHE);
properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW);
properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET);
return properties;
}
@ -106,16 +112,14 @@ public class ListFTP extends ListFileTransfer {
}
@Override
protected Scope getStateScope(final ProcessContext context) {
protected Scope getStateScope(final PropertyContext context) {
// Use cluster scope so that component can be run on Primary Node Only and can still
// pick up where it left off, even if the Primary Node changes.
return Scope.CLUSTER;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
FTPTransfer.validateProxySpec(validationContext, results);
return results;
}
}

View File

@ -30,6 +30,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
@ -38,6 +39,7 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.processors.standard.util.FileInfo;
import java.io.File;
@ -217,6 +219,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(LISTING_STRATEGY);
properties.add(DIRECTORY);
properties.add(RECURSE);
properties.add(DIRECTORY_LOCATION);
@ -229,6 +232,10 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
properties.add(MAX_SIZE);
properties.add(IGNORE_HIDDEN_FILES);
properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
properties.add(ListedEntityTracker.TRACKING_STATE_CACHE);
properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW);
properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET);
properties.add(ListedEntityTracker.NODE_IDENTIFIER);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
@ -320,7 +327,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
}
@Override
protected Scope getStateScope(final ProcessContext context) {
protected Scope getStateScope(final PropertyContext context) {
final String location = context.getProperty(DIRECTORY_LOCATION).getValue();
if (LOCATION_REMOTE.getValue().equalsIgnoreCase(location)) {
return Scope.CLUSTER;

View File

@ -35,7 +35,9 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
@ -71,6 +73,7 @@ public class ListSFTP extends ListFileTransfer {
final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build();
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(LISTING_STRATEGY);
properties.add(HOSTNAME);
properties.add(port);
properties.add(USERNAME);
@ -95,6 +98,9 @@ public class ListSFTP extends ListFileTransfer {
properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
properties.add(ListedEntityTracker.TRACKING_STATE_CACHE);
properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW);
properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET);
return properties;
}
@ -109,16 +115,14 @@ public class ListSFTP extends ListFileTransfer {
}
@Override
protected Scope getStateScope(final ProcessContext context) {
protected Scope getStateScope(final PropertyContext context) {
// Use cluster scope so that component can be run on Primary Node Only and can still
// pick up where it left off, even if the Primary Node changes.
return Scope.CLUSTER;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Collection<ValidationResult> results = new ArrayList<>();
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
SFTPTransfer.validateProxySpec(validationContext, results);
return results;
}
}