[NIFI-784] Moving dynamic propertyMap to onScheduled method and updating logger statements to use lazy evaluation with String formatter

This commit is contained in:
Brian Ghigiarelli 2015-07-23 22:52:35 -04:00
parent a1af29eca0
commit 4a43e81343
1 changed files with 28 additions and 11 deletions

View File

@ -27,12 +27,14 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
@ -103,6 +105,8 @@ public class RouteOnAttribute extends AbstractProcessor {
private volatile String configuredRouteStrategy = ROUTE_STRATEGY.getDefaultValue(); private volatile String configuredRouteStrategy = ROUTE_STRATEGY.getDefaultValue();
private volatile Set<String> dynamicPropertyNames = new HashSet<>(); private volatile Set<String> dynamicPropertyNames = new HashSet<>();
private final Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> set = new HashSet<>(); final Set<Relationship> set = new HashSet<>();
@ -166,6 +170,27 @@ public class RouteOnAttribute extends AbstractProcessor {
this.relationships.set(newRelationships); this.relationships.set(newRelationships);
} }
/**
* When this processor is
* @param context
*/
@OnScheduled
public void onScheduled(final ProcessContext context) {
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (!descriptor.isDynamic()) {
continue;
}
getLogger().debug("Adding new dynamic property: {}", new Object[]{descriptor});
propertyMap.put(new Relationship.Builder().name(descriptor.getName()).build(), context.getProperty(descriptor));
}
}
@OnUnscheduled
public void onUnscheduled(final ProcessContext context) {
getLogger().debug("Clearing propertyMap");
propertyMap.clear();
}
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) { public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get(); FlowFile flowFile = session.get();
@ -174,14 +199,6 @@ public class RouteOnAttribute extends AbstractProcessor {
} }
final ProcessorLog logger = getLogger(); final ProcessorLog logger = getLogger();
final Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (!descriptor.isDynamic()) {
continue;
}
propertyMap.put(new Relationship.Builder().name(descriptor.getName()).build(), context.getProperty(descriptor));
}
final Set<Relationship> matchingRelationships = new HashSet<>(); final Set<Relationship> matchingRelationships = new HashSet<>();
for (final Map.Entry<Relationship, PropertyValue> entry : propertyMap.entrySet()) { for (final Map.Entry<Relationship, PropertyValue> entry : propertyMap.entrySet()) {
@ -216,7 +233,7 @@ public class RouteOnAttribute extends AbstractProcessor {
} }
if (destinationRelationships.isEmpty()) { if (destinationRelationships.isEmpty()) {
logger.info(this + " routing " + flowFile + " to unmatched"); logger.info("{} routing {} to unmatched", new Object[]{ this, flowFile });
flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_NO_MATCH.getName()); flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_NO_MATCH.getName());
session.getProvenanceReporter().route(flowFile, REL_NO_MATCH); session.getProvenanceReporter().route(flowFile, REL_NO_MATCH);
session.transfer(flowFile, REL_NO_MATCH); session.transfer(flowFile, REL_NO_MATCH);
@ -236,7 +253,7 @@ public class RouteOnAttribute extends AbstractProcessor {
// now transfer any clones generated // now transfer any clones generated
for (final Map.Entry<Relationship, FlowFile> entry : transferMap.entrySet()) { for (final Map.Entry<Relationship, FlowFile> entry : transferMap.entrySet()) {
logger.info(this + " cloned " + flowFile + " into " + entry.getValue() + " and routing clone to relationship " + entry.getKey()); logger.info("{} cloned {} into {} and routing clone to relationship {}", new Object[]{ this, flowFile, entry.getValue(), entry.getKey() });
FlowFile updatedFlowFile = session.putAttribute(entry.getValue(), ROUTE_ATTRIBUTE_KEY, entry.getKey().getName()); FlowFile updatedFlowFile = session.putAttribute(entry.getValue(), ROUTE_ATTRIBUTE_KEY, entry.getKey().getName());
session.getProvenanceReporter().route(updatedFlowFile, entry.getKey()); session.getProvenanceReporter().route(updatedFlowFile, entry.getKey());
session.transfer(updatedFlowFile, entry.getKey()); session.transfer(updatedFlowFile, entry.getKey());