mirror of https://github.com/apache/nifi.git
Merge branch 'NIFI-843' into develop
This commit is contained in:
commit
a4f0afce7b
|
@ -34,7 +34,6 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
|
||||||
import org.apache.nifi.components.AllowableValue;
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.PropertyValue;
|
import org.apache.nifi.components.PropertyValue;
|
||||||
|
@ -106,8 +105,7 @@ public class RouteOnAttribute extends AbstractProcessor {
|
||||||
private volatile Set<String> dynamicPropertyNames = new HashSet<>();
|
private volatile Set<String> dynamicPropertyNames = new HashSet<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cache of dynamic properties set during {@link #onScheduled(ProcessContext)} and
|
* Cache of dynamic properties set during {@link #onScheduled(ProcessContext)} for quick access in
|
||||||
* cleared during {@link #onStopped(ProcessContext)} for quick access in
|
|
||||||
* {@link #onTrigger(ProcessContext, ProcessSession)}
|
* {@link #onTrigger(ProcessContext, ProcessSession)}
|
||||||
*/
|
*/
|
||||||
private volatile Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
|
private volatile Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
|
||||||
|
@ -182,20 +180,18 @@ public class RouteOnAttribute extends AbstractProcessor {
|
||||||
*/
|
*/
|
||||||
@OnScheduled
|
@OnScheduled
|
||||||
public void onScheduled(final ProcessContext context) {
|
public void onScheduled(final ProcessContext context) {
|
||||||
|
final Map<Relationship, PropertyValue> newPropertyMap = new HashMap<>();
|
||||||
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
|
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
|
||||||
if (!descriptor.isDynamic()) {
|
if (!descriptor.isDynamic()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
getLogger().debug("Adding new dynamic property: {}", new Object[]{descriptor});
|
getLogger().debug("Adding new dynamic property: {}", new Object[]{descriptor});
|
||||||
propertyMap.put(new Relationship.Builder().name(descriptor.getName()).build(), context.getProperty(descriptor));
|
newPropertyMap.put(new Relationship.Builder().name(descriptor.getName()).build(), context.getProperty(descriptor));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.propertyMap = newPropertyMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnStopped
|
|
||||||
public void onStopped() {
|
|
||||||
getLogger().debug("Clearing propertyMap");
|
|
||||||
propertyMap.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||||
|
@ -206,8 +202,9 @@ public class RouteOnAttribute extends AbstractProcessor {
|
||||||
|
|
||||||
final ProcessorLog logger = getLogger();
|
final ProcessorLog logger = getLogger();
|
||||||
|
|
||||||
|
final Map<Relationship, PropertyValue> propMap = this.propertyMap;
|
||||||
final Set<Relationship> matchingRelationships = new HashSet<>();
|
final Set<Relationship> matchingRelationships = new HashSet<>();
|
||||||
for (final Map.Entry<Relationship, PropertyValue> entry : propertyMap.entrySet()) {
|
for (final Map.Entry<Relationship, PropertyValue> entry : propMap.entrySet()) {
|
||||||
final PropertyValue value = entry.getValue();
|
final PropertyValue value = entry.getValue();
|
||||||
|
|
||||||
final boolean matches = value.evaluateAttributeExpressions(flowFile).asBoolean();
|
final boolean matches = value.evaluateAttributeExpressions(flowFile).asBoolean();
|
||||||
|
@ -219,7 +216,7 @@ public class RouteOnAttribute extends AbstractProcessor {
|
||||||
final Set<Relationship> destinationRelationships = new HashSet<>();
|
final Set<Relationship> destinationRelationships = new HashSet<>();
|
||||||
switch (context.getProperty(ROUTE_STRATEGY).getValue()) {
|
switch (context.getProperty(ROUTE_STRATEGY).getValue()) {
|
||||||
case routeAllMatchValue:
|
case routeAllMatchValue:
|
||||||
if (matchingRelationships.size() == propertyMap.size()) {
|
if (matchingRelationships.size() == propMap.size()) {
|
||||||
destinationRelationships.add(REL_MATCH);
|
destinationRelationships.add(REL_MATCH);
|
||||||
} else {
|
} else {
|
||||||
destinationRelationships.add(REL_NO_MATCH);
|
destinationRelationships.add(REL_NO_MATCH);
|
||||||
|
|
Loading…
Reference in New Issue