mirror of https://github.com/apache/nifi.git
Merge branch 'NIFI-784' of https://github.com/brianghig/nifi into develop
This commit is contained in:
commit
bbacc3d8ca
|
@ -27,12 +27,14 @@ import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
|
import org.apache.nifi.annotation.behavior.DynamicRelationship;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.behavior.DynamicRelationship;
|
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
import org.apache.nifi.components.AllowableValue;
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.PropertyValue;
|
import org.apache.nifi.components.PropertyValue;
|
||||||
|
@ -103,6 +105,13 @@ public class RouteOnAttribute extends AbstractProcessor {
|
||||||
private volatile String configuredRouteStrategy = ROUTE_STRATEGY.getDefaultValue();
|
private volatile String configuredRouteStrategy = ROUTE_STRATEGY.getDefaultValue();
|
||||||
private volatile Set<String> dynamicPropertyNames = new HashSet<>();
|
private volatile Set<String> dynamicPropertyNames = new HashSet<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cache of dynamic properties set during {@link #onScheduled(ProcessContext)} and
|
||||||
|
* cleared during {@link #onStopped(ProcessContext)} for quick access in
|
||||||
|
* {@link #onTrigger(ProcessContext, ProcessSession)}
|
||||||
|
*/
|
||||||
|
private volatile Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void init(final ProcessorInitializationContext context) {
|
protected void init(final ProcessorInitializationContext context) {
|
||||||
final Set<Relationship> set = new HashSet<>();
|
final Set<Relationship> set = new HashSet<>();
|
||||||
|
@ -166,6 +175,28 @@ public class RouteOnAttribute extends AbstractProcessor {
|
||||||
this.relationships.set(newRelationships);
|
this.relationships.set(newRelationships);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When this processor is scheduled, update the dynamic properties into the map
|
||||||
|
* for quick access during each onTrigger call
|
||||||
|
* @param context ProcessContext used to retrieve dynamic properties
|
||||||
|
*/
|
||||||
|
@OnScheduled
|
||||||
|
public void onScheduled(final ProcessContext context) {
|
||||||
|
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
|
||||||
|
if (!descriptor.isDynamic()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
getLogger().debug("Adding new dynamic property: {}", new Object[]{descriptor});
|
||||||
|
propertyMap.put(new Relationship.Builder().name(descriptor.getName()).build(), context.getProperty(descriptor));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnStopped
|
||||||
|
public void onStopped() {
|
||||||
|
getLogger().debug("Clearing propertyMap");
|
||||||
|
propertyMap.clear();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||||
FlowFile flowFile = session.get();
|
FlowFile flowFile = session.get();
|
||||||
|
@ -174,14 +205,6 @@ public class RouteOnAttribute extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
final ProcessorLog logger = getLogger();
|
final ProcessorLog logger = getLogger();
|
||||||
final Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
|
|
||||||
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
|
|
||||||
if (!descriptor.isDynamic()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
propertyMap.put(new Relationship.Builder().name(descriptor.getName()).build(), context.getProperty(descriptor));
|
|
||||||
}
|
|
||||||
|
|
||||||
final Set<Relationship> matchingRelationships = new HashSet<>();
|
final Set<Relationship> matchingRelationships = new HashSet<>();
|
||||||
for (final Map.Entry<Relationship, PropertyValue> entry : propertyMap.entrySet()) {
|
for (final Map.Entry<Relationship, PropertyValue> entry : propertyMap.entrySet()) {
|
||||||
|
@ -216,7 +239,7 @@ public class RouteOnAttribute extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (destinationRelationships.isEmpty()) {
|
if (destinationRelationships.isEmpty()) {
|
||||||
logger.info(this + " routing " + flowFile + " to unmatched");
|
logger.info("Routing {} to unmatched", new Object[]{ flowFile });
|
||||||
flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_NO_MATCH.getName());
|
flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_NO_MATCH.getName());
|
||||||
session.getProvenanceReporter().route(flowFile, REL_NO_MATCH);
|
session.getProvenanceReporter().route(flowFile, REL_NO_MATCH);
|
||||||
session.transfer(flowFile, REL_NO_MATCH);
|
session.transfer(flowFile, REL_NO_MATCH);
|
||||||
|
@ -236,7 +259,7 @@ public class RouteOnAttribute extends AbstractProcessor {
|
||||||
|
|
||||||
// now transfer any clones generated
|
// now transfer any clones generated
|
||||||
for (final Map.Entry<Relationship, FlowFile> entry : transferMap.entrySet()) {
|
for (final Map.Entry<Relationship, FlowFile> entry : transferMap.entrySet()) {
|
||||||
logger.info(this + " cloned " + flowFile + " into " + entry.getValue() + " and routing clone to relationship " + entry.getKey());
|
logger.info("Cloned {} into {} and routing clone to relationship {}", new Object[]{ flowFile, entry.getValue(), entry.getKey() });
|
||||||
FlowFile updatedFlowFile = session.putAttribute(entry.getValue(), ROUTE_ATTRIBUTE_KEY, entry.getKey().getName());
|
FlowFile updatedFlowFile = session.putAttribute(entry.getValue(), ROUTE_ATTRIBUTE_KEY, entry.getKey().getName());
|
||||||
session.getProvenanceReporter().route(updatedFlowFile, entry.getKey());
|
session.getProvenanceReporter().route(updatedFlowFile, entry.getKey());
|
||||||
session.transfer(updatedFlowFile, entry.getKey());
|
session.transfer(updatedFlowFile, entry.getKey());
|
||||||
|
|
Loading…
Reference in New Issue