[NIFI-784] Per review feedback, updating to use OnStopped insted of OnUnscheduled to avoid concurrency issue updating propertyMap. Also removed this from logger statements since it is already included

This commit is contained in:
Brian Ghigiarelli 2015-07-24 15:26:22 -04:00
parent 4a43e81343
commit 8507902096
1 changed files with 15 additions and 9 deletions

View File

@ -34,7 +34,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
@ -105,7 +105,12 @@ public class RouteOnAttribute extends AbstractProcessor {
private volatile String configuredRouteStrategy = ROUTE_STRATEGY.getDefaultValue(); private volatile String configuredRouteStrategy = ROUTE_STRATEGY.getDefaultValue();
private volatile Set<String> dynamicPropertyNames = new HashSet<>(); private volatile Set<String> dynamicPropertyNames = new HashSet<>();
private final Map<Relationship, PropertyValue> propertyMap = new HashMap<>(); /**
* Cache of dynamic properties set during {@link #onScheduled(ProcessContext)} and
* cleared during {@link #onStopped(ProcessContext)} for quick access in
* {@link #onTrigger(ProcessContext, ProcessSession)}
*/
private volatile Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
@ -171,8 +176,9 @@ public class RouteOnAttribute extends AbstractProcessor {
} }
/** /**
* When this processor is * When this processor is scheduled, update the dynamic properties into the map
* @param context * for quick access during each onTrigger call
* @param context ProcessContext used to retrieve dynamic properties
*/ */
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) { public void onScheduled(final ProcessContext context) {
@ -185,8 +191,8 @@ public class RouteOnAttribute extends AbstractProcessor {
} }
} }
@OnUnscheduled @OnStopped
public void onUnscheduled(final ProcessContext context) { public void onStopped() {
getLogger().debug("Clearing propertyMap"); getLogger().debug("Clearing propertyMap");
propertyMap.clear(); propertyMap.clear();
} }
@ -233,7 +239,7 @@ public class RouteOnAttribute extends AbstractProcessor {
} }
if (destinationRelationships.isEmpty()) { if (destinationRelationships.isEmpty()) {
logger.info("{} routing {} to unmatched", new Object[]{ this, flowFile }); logger.info("Routing {} to unmatched", new Object[]{ flowFile });
flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_NO_MATCH.getName()); flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_NO_MATCH.getName());
session.getProvenanceReporter().route(flowFile, REL_NO_MATCH); session.getProvenanceReporter().route(flowFile, REL_NO_MATCH);
session.transfer(flowFile, REL_NO_MATCH); session.transfer(flowFile, REL_NO_MATCH);
@ -253,7 +259,7 @@ public class RouteOnAttribute extends AbstractProcessor {
// now transfer any clones generated // now transfer any clones generated
for (final Map.Entry<Relationship, FlowFile> entry : transferMap.entrySet()) { for (final Map.Entry<Relationship, FlowFile> entry : transferMap.entrySet()) {
logger.info("{} cloned {} into {} and routing clone to relationship {}", new Object[]{ this, flowFile, entry.getValue(), entry.getKey() }); logger.info("Cloned {} into {} and routing clone to relationship {}", new Object[]{ flowFile, entry.getValue(), entry.getKey() });
FlowFile updatedFlowFile = session.putAttribute(entry.getValue(), ROUTE_ATTRIBUTE_KEY, entry.getKey().getName()); FlowFile updatedFlowFile = session.putAttribute(entry.getValue(), ROUTE_ATTRIBUTE_KEY, entry.getKey().getName());
session.getProvenanceReporter().route(updatedFlowFile, entry.getKey()); session.getProvenanceReporter().route(updatedFlowFile, entry.getKey());
session.transfer(updatedFlowFile, entry.getKey()); session.transfer(updatedFlowFile, entry.getKey());