Merge branch 'NIFI-1249'

This commit is contained in:
Mark Payne 2015-12-04 16:38:49 -05:00
commit ee14d8f9dd
29 changed files with 659 additions and 327 deletions

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.components; package org.apache.nifi.components;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerService;
@ -120,7 +121,7 @@ public interface PropertyValue {
/** /**
* <p> * <p>
* Replaces values in the Property Value using the Attribute Expression * Replaces values in the Property Value using the NiFi Expression
* Language; a PropertyValue with the new value is then returned, supporting * Language; a PropertyValue with the new value is then returned, supporting
* call chaining. * call chaining.
* </p> * </p>
@ -128,14 +129,49 @@ public interface PropertyValue {
* @return a PropertyValue with the new value is returned, supporting call * @return a PropertyValue with the new value is returned, supporting call
* chaining * chaining
* *
* @throws ProcessException if the Query cannot be compiled or evaluating * @throws ProcessException if the Expression cannot be compiled or evaluating
* the query against the given attributes causes an Exception to be thrown * the Expression against the given attributes causes an Exception to be thrown
*/ */
public PropertyValue evaluateAttributeExpressions() throws ProcessException; public PropertyValue evaluateAttributeExpressions() throws ProcessException;
/** /**
* <p> * <p>
* Replaces values in the Property Value using the Attribute Expression * Replaces values in the Property Value using the NiFi Expression Language;
* a PropertyValue with the new value is then returned, supporting call chaining.
* </p>
*
* @param attributes a Map of attributes that the Expression can reference, in addition
* to JVM System Properties and Environmental Properties.
*
* @return a PropertyValue with the new value
*
* @throws ProcessException if the Expression cannot be compiled or evaluating the Expression against
* the given attributes causes an Exception to be thrown
*/
public PropertyValue evaluateAttributeExpressions(Map<String, String> attributes) throws ProcessException;
/**
* <p>
* Replaces values in the Property Value using the NiFi Expression Language.
* The supplied decorator is then given a chance to decorate the
* value, and a PropertyValue with the new value is then returned,
* supporting call chaining.
* </p>
*
* @param attributes a Map of attributes that the Expression can reference, in addition
* to JVM System Properties and Environmental Properties.
* @param decorator the decorator to use in order to update the values returned by the Expression Language
*
* @return a PropertyValue with the new value
*
* @throws ProcessException if the Expression cannot be compiled or evaluating the Expression against
* the given attributes causes an Exception to be thrown
*/
public PropertyValue evaluateAttributeExpressions(Map<String, String> attributes, AttributeValueDecorator decorator) throws ProcessException;
/**
* <p>
* Replaces values in the Property Value using the NiFi Expression
* Language; a PropertyValue with the new value is then returned, supporting * Language; a PropertyValue with the new value is then returned, supporting
* call chaining. * call chaining.
* </p> * </p>
@ -144,14 +180,53 @@ public interface PropertyValue {
* @return a PropertyValue with the new value is returned, supporting call * @return a PropertyValue with the new value is returned, supporting call
* chaining * chaining
* *
* @throws ProcessException if the Query cannot be compiled or evaluating * @throws ProcessException if the Expression cannot be compiled or evaluating
* the query against the given attributes causes an Exception to be thrown * the Expression against the given attributes causes an Exception to be thrown
*/ */
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile) throws ProcessException; public PropertyValue evaluateAttributeExpressions(FlowFile flowFile) throws ProcessException;
/** /**
* <p> * <p>
* Replaces values in the Property Value using the Attribute Expression * Replaces values in the Property Value using the NiFi Expression
* Language; a PropertyValue with the new value is then returned, supporting
* call chaining.
* </p>
*
* @param flowFile to evaluate attributes of
* @param additionalAttributes a Map of additional attributes that the Expression can reference. If entries in
* this Map conflict with entries in the FlowFile's attributes, the entries in this Map are given a higher priority.
*
* @return a PropertyValue with the new value is returned, supporting call
* chaining
*
* @throws ProcessException if the Expression cannot be compiled or evaluating
* the Expression against the given attributes causes an Exception to be thrown
*/
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes) throws ProcessException;
/**
* <p>
* Replaces values in the Property Value using the NiFi Expression
* Language; a PropertyValue with the new value is then returned, supporting
* call chaining.
* </p>
*
* @param flowFile to evaluate attributes of
* @param additionalAttributes a Map of additional attributes that the Expression can reference. If entries in
* this Map conflict with entries in the FlowFile's attributes, the entries in this Map are given a higher priority.
* @param decorator the decorator to use in order to update the values returned by the Expression Language
*
* @return a PropertyValue with the new value is returned, supporting call
* chaining
*
* @throws ProcessException if the Expression cannot be compiled or evaluating
* the Expression against the given attributes causes an Exception to be thrown
*/
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator) throws ProcessException;
/**
* <p>
* Replaces values in the Property Value using the NiFi Expression
* Language. The supplied decorator is then given a chance to decorate the * Language. The supplied decorator is then given a chance to decorate the
* value, and a PropertyValue with the new value is then returned, * value, and a PropertyValue with the new value is then returned,
* supporting call chaining. * supporting call chaining.
@ -162,14 +237,14 @@ public interface PropertyValue {
* @return a PropertyValue with the new value is then returned, supporting * @return a PropertyValue with the new value is then returned, supporting
* call chaining * call chaining
* *
* @throws ProcessException if the Query cannot be compiled or evaluating * @throws ProcessException if the Expression cannot be compiled or evaluating
* the query against the given attributes causes an Exception to be thrown * the Expression against the given attributes causes an Exception to be thrown
*/ */
public PropertyValue evaluateAttributeExpressions(AttributeValueDecorator decorator) throws ProcessException; public PropertyValue evaluateAttributeExpressions(AttributeValueDecorator decorator) throws ProcessException;
/** /**
* <p> * <p>
* Replaces values in the Property Value using the Attribute Expression * Replaces values in the Property Value using the NiFi Expression
* Language. The supplied decorator is then given a chance to decorate the * Language. The supplied decorator is then given a chance to decorate the
* value, and a PropertyValue with the new value is then returned, * value, and a PropertyValue with the new value is then returned,
* supporting call chaining. * supporting call chaining.
@ -182,8 +257,8 @@ public interface PropertyValue {
* @return a PropertyValue with the new value is then returned, supporting * @return a PropertyValue with the new value is then returned, supporting
* call chaining * call chaining
* *
* @throws ProcessException if the Query cannot be compiled or evaluating * @throws ProcessException if the Expression cannot be compiled or evaluating
* the query against the given attributes causes an Exception to be thrown * the Expression against the given attributes causes an Exception to be thrown
*/ */
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, AttributeValueDecorator decorator) throws ProcessException; public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, AttributeValueDecorator decorator) throws ProcessException;
} }

View File

@ -147,4 +147,11 @@ public interface ProcessContext {
*/ */
boolean hasConnection(Relationship relationship); boolean hasConnection(Relationship relationship);
/**
* @param property the Property whose value should be inspected to determined if it contains an Expression Language Expression
* @return <code>true</code> if the value of the given Property contains a NiFi Expression
* Language Expression, <code>false</code> if it does not. Note that <code>false</code> will be returned if the Property Descriptor
* does not allow the Expression Language, even if a seemingly valid Expression is present in the value.
*/
boolean isExpressionLanguagePresent(PropertyDescriptor property);
} }

View File

@ -36,6 +36,7 @@ import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.ParserConfigurationException;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.bootstrap.notification.NotificationContext; import org.apache.nifi.bootstrap.notification.NotificationContext;
import org.apache.nifi.bootstrap.notification.NotificationInitializationContext; import org.apache.nifi.bootstrap.notification.NotificationInitializationContext;
import org.apache.nifi.bootstrap.notification.NotificationService; import org.apache.nifi.bootstrap.notification.NotificationService;
@ -246,7 +247,7 @@ public class NotificationServiceManager {
configuredValue = fullPropDescriptor.getDefaultValue(); configuredValue = fullPropDescriptor.getDefaultValue();
} }
return new NotificationServicePropertyValue(configuredValue); return new StandardPropertyValue(configuredValue, null);
} }
@Override @Override
@ -363,7 +364,7 @@ public class NotificationServiceManager {
value = descriptor.getDefaultValue(); value = descriptor.getDefaultValue();
} }
return new NotificationServicePropertyValue(value); return new StandardPropertyValue(value, null);
} }
@Override @Override

View File

@ -1,120 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.AttributeValueDecorator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
public class NotificationServicePropertyValue implements PropertyValue {
private final String rawValue;
private final PreparedQuery preparedQuery;
public NotificationServicePropertyValue(final String rawValue) {
this.rawValue = rawValue;
this.preparedQuery = Query.prepare(rawValue);
}
@Override
public String getValue() {
return rawValue;
}
@Override
public Integer asInteger() {
return (rawValue == null) ? null : Integer.parseInt(rawValue.trim());
}
@Override
public Long asLong() {
return (rawValue == null) ? null : Long.parseLong(rawValue.trim());
}
@Override
public Boolean asBoolean() {
return (rawValue == null) ? null : Boolean.parseBoolean(rawValue.trim());
}
@Override
public Float asFloat() {
return (rawValue == null) ? null : Float.parseFloat(rawValue.trim());
}
@Override
public Double asDouble() {
return (rawValue == null) ? null : Double.parseDouble(rawValue.trim());
}
@Override
public Long asTimePeriod(final TimeUnit timeUnit) {
return (rawValue == null) ? null : FormatUtils.getTimeDuration(rawValue.trim(), timeUnit);
}
@Override
public Double asDataSize(final DataUnit dataUnit) {
return rawValue == null ? null : DataUnit.parseDataSize(rawValue.trim(), dataUnit);
}
@Override
public PropertyValue evaluateAttributeExpressions() throws ProcessException {
return evaluateAttributeExpressions((AttributeValueDecorator) null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException {
return new NotificationServicePropertyValue(preparedQuery.evaluateExpressions((FlowFile) null, decorator));
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException {
throw new UnsupportedOperationException();
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException {
throw new UnsupportedOperationException();
}
@Override
public String toString() {
return rawValue;
}
@Override
public ControllerService asControllerService() {
throw new UnsupportedOperationException();
}
@Override
public <T extends ControllerService> T asControllerService(final Class<T> serviceType) throws IllegalArgumentException {
throw new UnsupportedOperationException();
}
@Override
public boolean isSet() {
return rawValue != null;
}
}

View File

@ -23,7 +23,7 @@ import java.util.Map;
import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range; import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.attribute.expression.language.StandardExpressionLanguageCompiler; import org.apache.nifi.attribute.expression.language.StandardExpressionLanguageCompiler;
import org.apache.nifi.bootstrap.NotificationServicePropertyValue; import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
@ -48,7 +48,7 @@ public class NotificationValidationContext implements ValidationContext {
@Override @Override
public PropertyValue newPropertyValue(final String rawValue) { public PropertyValue newPropertyValue(final String rawValue) {
return new NotificationServicePropertyValue(rawValue); return new StandardPropertyValue(rawValue, null);
} }
@Override @Override

View File

@ -52,5 +52,9 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId> <artifactId>nifi-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -59,4 +59,9 @@ public class EmptyPreparedQuery implements PreparedQuery {
public String evaluateExpressions(Map<String, String> attributes, AttributeValueDecorator decorator) throws ProcessException { public String evaluateExpressions(Map<String, String> attributes, AttributeValueDecorator decorator) throws ProcessException {
return value; return value;
} }
@Override
public String evaluateExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator) throws ProcessException {
return value;
}
} }

View File

@ -69,4 +69,8 @@ public class InvalidPreparedQuery implements PreparedQuery {
throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation); throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation);
} }
@Override
public String evaluateExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator) throws ProcessException {
throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation);
}
} }

View File

@ -36,4 +36,5 @@ public interface PreparedQuery {
String evaluateExpressions(Map<String, String> attributes, AttributeValueDecorator decorator) throws ProcessException; String evaluateExpressions(Map<String, String> attributes, AttributeValueDecorator decorator) throws ProcessException;
String evaluateExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator) throws ProcessException;
} }

View File

@ -19,6 +19,7 @@ package org.apache.nifi.attribute.expression.language;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -416,7 +417,12 @@ public class Query {
} }
static Map<String, String> createExpressionMap(final FlowFile flowFile) { static Map<String, String> createExpressionMap(final FlowFile flowFile) {
final Map<String, String> attributeMap = flowFile == null ? new HashMap<String, String>() : flowFile.getAttributes(); return createExpressionMap(flowFile, null);
}
static Map<String, String> createExpressionMap(final FlowFile flowFile, final Map<String, String> additionalAttributes) {
final Map<String, String> attributeMap = flowFile == null ? Collections.<String, String> emptyMap() : flowFile.getAttributes();
final Map<String, String> additionalOrEmpty = additionalAttributes == null ? Collections.<String, String> emptyMap() : additionalAttributes;
final Map<String, String> envMap = System.getenv(); final Map<String, String> envMap = System.getenv();
final Map<?, ?> sysProps = System.getProperties(); final Map<?, ?> sysProps = System.getProperties();
@ -428,13 +434,13 @@ public class Query {
flowFileProps.put("lineageStartDate", String.valueOf(flowFile.getLineageStartDate())); flowFileProps.put("lineageStartDate", String.valueOf(flowFile.getLineageStartDate()));
} }
return wrap(attributeMap, flowFileProps, envMap, sysProps); return wrap(additionalOrEmpty, attributeMap, flowFileProps, envMap, sysProps);
} }
private static Map<String, String> wrap(final Map<String, String> attributes, final Map<String, String> flowFileProps, private static Map<String, String> wrap(final Map<String, String> additional, final Map<String, String> attributes, final Map<String, String> flowFileProps,
final Map<String, String> env, final Map<?, ?> sysProps) { final Map<String, String> env, final Map<?, ?> sysProps) {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
final Map[] maps = new Map[]{attributes, flowFileProps, env, sysProps}; final Map[] maps = new Map[] {additional, attributes, flowFileProps, env, sysProps};
return new Map<String, String>() { return new Map<String, String>() {
@Override @Override

View File

@ -59,6 +59,12 @@ public class StandardPreparedQuery implements PreparedQuery {
return sb.toString(); return sb.toString();
} }
@Override
public String evaluateExpressions(final FlowFile flowFile, final Map<String, String> additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException {
final Map<String, String> expressionMap = Query.createExpressionMap(flowFile, additionalAttributes);
return evaluateExpressions(expressionMap, decorator);
}
@Override @Override
public String evaluateExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException { public String evaluateExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException {
final Map<String, String> expressionMap = Query.createExpressionMap(flowFile); final Map<String, String> expressionMap = Query.createExpressionMap(flowFile);

View File

@ -14,21 +14,21 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processor; package org.apache.nifi.attribute.expression.language;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.expression.AttributeValueDecorator; import org.apache.nifi.expression.AttributeValueDecorator;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
public final class StandardPropertyValue implements PropertyValue { public class StandardPropertyValue implements PropertyValue {
private final String rawValue; private final String rawValue;
private final ControllerServiceLookup serviceLookup; private final ControllerServiceLookup serviceLookup;
@ -95,26 +95,46 @@ public final class StandardPropertyValue implements PropertyValue {
@Override @Override
public PropertyValue evaluateAttributeExpressions() throws ProcessException { public PropertyValue evaluateAttributeExpressions() throws ProcessException {
return evaluateAttributeExpressions(null, null); return evaluateAttributeExpressions(null, null, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final Map<String, String> attributes) throws ProcessException {
return evaluateAttributeExpressions(null, attributes, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final Map<String, String> attributes, final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(null, attributes, decorator);
} }
@Override @Override
public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException { public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(null, decorator); return evaluateAttributeExpressions(null, null, decorator);
} }
@Override @Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException { public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException {
return evaluateAttributeExpressions(flowFile, null); return evaluateAttributeExpressions(flowFile, null, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map<String, String> additionalAttributes) throws ProcessException {
return evaluateAttributeExpressions(flowFile, additionalAttributes, null);
} }
@Override @Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException { public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(flowFile, null, decorator);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map<String, String> additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException {
if (rawValue == null || preparedQuery == null) { if (rawValue == null || preparedQuery == null) {
return this; return this;
} }
return new StandardPropertyValue(preparedQuery.evaluateExpressions(flowFile, decorator), serviceLookup, null); return new StandardPropertyValue(preparedQuery.evaluateExpressions(flowFile, additionalAttributes, decorator), serviceLookup, null);
} }
@Override @Override
@ -124,7 +144,7 @@ public final class StandardPropertyValue implements PropertyValue {
@Override @Override
public ControllerService asControllerService() { public ControllerService asControllerService() {
if (rawValue == null || rawValue.equals("")) { if (rawValue == null || rawValue.equals("") || serviceLookup == null) {
return null; return null;
} }
@ -136,7 +156,7 @@ public final class StandardPropertyValue implements PropertyValue {
if (!serviceType.isInterface()) { if (!serviceType.isInterface()) {
throw new IllegalArgumentException("ControllerServices may be referenced only via their interfaces; " + serviceType + " is not an interface"); throw new IllegalArgumentException("ControllerServices may be referenced only via their interfaces; " + serviceType + " is not an interface");
} }
if (rawValue == null || rawValue.equals("")) { if (rawValue == null || rawValue.equals("") || serviceLookup == null) {
return null; return null;
} }

View File

@ -28,6 +28,8 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
@ -331,4 +333,13 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
} }
} }
@Override
public boolean isExpressionLanguagePresent(final PropertyDescriptor property) {
if (property == null || !property.isExpressionLanguageSupported()) {
return false;
}
final List<Range> elRanges = Query.extractExpressionRanges(getProperty(property).getValue());
return (elRanges != null && !elRanges.isEmpty());
}
} }

View File

@ -16,9 +16,10 @@
*/ */
package org.apache.nifi.util; package org.apache.nifi.util;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerService;
@ -29,11 +30,11 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
public class MockPropertyValue implements PropertyValue { public class MockPropertyValue implements PropertyValue {
private final String rawValue; private final String rawValue;
private final Boolean expectExpressions; private final Boolean expectExpressions;
private final ControllerServiceLookup serviceLookup; private final ControllerServiceLookup serviceLookup;
private final PropertyDescriptor propertyDescriptor; private final PropertyDescriptor propertyDescriptor;
private final PropertyValue stdPropValue;
private boolean expressionsEvaluated = false; private boolean expressionsEvaluated = false;
public MockPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup) { public MockPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup) {
@ -41,12 +42,20 @@ public class MockPropertyValue implements PropertyValue {
} }
public MockPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup, final PropertyDescriptor propertyDescriptor) { public MockPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup, final PropertyDescriptor propertyDescriptor) {
this(rawValue, serviceLookup, propertyDescriptor, false);
}
private MockPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup, final PropertyDescriptor propertyDescriptor, final boolean alreadyEvaluated) {
this.stdPropValue = new StandardPropertyValue(rawValue, serviceLookup);
this.rawValue = rawValue; this.rawValue = rawValue;
this.serviceLookup = serviceLookup; this.serviceLookup = serviceLookup;
this.expectExpressions = propertyDescriptor == null ? null : propertyDescriptor.isExpressionLanguageSupported(); this.expectExpressions = propertyDescriptor == null ? null : propertyDescriptor.isExpressionLanguageSupported();
this.propertyDescriptor = propertyDescriptor; this.propertyDescriptor = propertyDescriptor;
this.expressionsEvaluated = alreadyEvaluated;
} }
private void ensureExpressionsEvaluated() { private void ensureExpressionsEvaluated() {
if (Boolean.TRUE.equals(expectExpressions) && !expressionsEvaluated) { if (Boolean.TRUE.equals(expectExpressions) && !expressionsEvaluated) {
throw new IllegalStateException("Attempting to retrieve value of " + propertyDescriptor throw new IllegalStateException("Attempting to retrieve value of " + propertyDescriptor
@ -59,49 +68,49 @@ public class MockPropertyValue implements PropertyValue {
@Override @Override
public String getValue() { public String getValue() {
ensureExpressionsEvaluated(); ensureExpressionsEvaluated();
return rawValue; return stdPropValue.getValue();
} }
@Override @Override
public Integer asInteger() { public Integer asInteger() {
ensureExpressionsEvaluated(); ensureExpressionsEvaluated();
return (rawValue == null) ? null : Integer.parseInt(rawValue.trim()); return stdPropValue.asInteger();
} }
@Override @Override
public Long asLong() { public Long asLong() {
ensureExpressionsEvaluated(); ensureExpressionsEvaluated();
return (rawValue == null) ? null : Long.parseLong(rawValue.trim()); return stdPropValue.asLong();
} }
@Override @Override
public Boolean asBoolean() { public Boolean asBoolean() {
ensureExpressionsEvaluated(); ensureExpressionsEvaluated();
return (rawValue == null) ? null : Boolean.parseBoolean(rawValue.trim()); return stdPropValue.asBoolean();
} }
@Override @Override
public Float asFloat() { public Float asFloat() {
ensureExpressionsEvaluated(); ensureExpressionsEvaluated();
return (rawValue == null) ? null : Float.parseFloat(rawValue.trim()); return stdPropValue.asFloat();
} }
@Override @Override
public Double asDouble() { public Double asDouble() {
ensureExpressionsEvaluated(); ensureExpressionsEvaluated();
return (rawValue == null) ? null : Double.parseDouble(rawValue.trim()); return stdPropValue.asDouble();
} }
@Override @Override
public Long asTimePeriod(final TimeUnit timeUnit) { public Long asTimePeriod(final TimeUnit timeUnit) {
ensureExpressionsEvaluated(); ensureExpressionsEvaluated();
return (rawValue == null) ? null : FormatUtils.getTimeDuration(rawValue.trim(), timeUnit); return stdPropValue.asTimePeriod(timeUnit);
} }
@Override @Override
public Double asDataSize(final DataUnit dataUnit) { public Double asDataSize(final DataUnit dataUnit) {
ensureExpressionsEvaluated(); ensureExpressionsEvaluated();
return rawValue == null ? null : DataUnit.parseDataSize(rawValue.trim(), dataUnit); return stdPropValue.asDataSize(dataUnit);
} }
private void markEvaluated() { private void markEvaluated() {
@ -115,38 +124,48 @@ public class MockPropertyValue implements PropertyValue {
@Override @Override
public PropertyValue evaluateAttributeExpressions() throws ProcessException { public PropertyValue evaluateAttributeExpressions() throws ProcessException {
markEvaluated(); return evaluateAttributeExpressions(null, null, null);
if (rawValue == null) {
return this;
}
return evaluateAttributeExpressions(null, null);
} }
@Override @Override
public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException { public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException {
markEvaluated(); return evaluateAttributeExpressions(null, null, decorator);
if (rawValue == null) {
return this;
}
return evaluateAttributeExpressions(null, decorator);
} }
@Override @Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException { public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException {
markEvaluated(); return evaluateAttributeExpressions(flowFile, null, null);
if (rawValue == null) {
return this;
}
return evaluateAttributeExpressions(flowFile, null);
} }
@Override @Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException { public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(flowFile, null, decorator);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map<String, String> additionalAttributes) throws ProcessException {
return evaluateAttributeExpressions(flowFile, additionalAttributes, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final Map<String, String> attributes) throws ProcessException {
return evaluateAttributeExpressions(null, attributes, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final Map<String, String> attributes, final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(null, attributes, decorator);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map<String, String> additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException {
markEvaluated(); markEvaluated();
if (rawValue == null) { if (rawValue == null) {
return this; return this;
} }
return new MockPropertyValue(Query.evaluateExpressions(rawValue, flowFile, decorator), serviceLookup);
final PropertyValue newValue = stdPropValue.evaluateAttributeExpressions(flowFile, additionalAttributes, decorator);
return new MockPropertyValue(newValue.getValue(), serviceLookup, propertyDescriptor, true);
} }
@Override @Override

View File

@ -24,6 +24,7 @@ import java.io.InputStream;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
@ -384,11 +385,22 @@ public class StandardProcessorTestRunner implements TestRunner {
enqueue(data, new HashMap<String, String>()); enqueue(data, new HashMap<String, String>());
} }
@Override
public void enqueue(final String data) {
enqueue(data.getBytes(StandardCharsets.UTF_8), Collections.<String, String> emptyMap());
}
@Override @Override
public void enqueue(final byte[] data, final Map<String, String> attributes) { public void enqueue(final byte[] data, final Map<String, String> attributes) {
enqueue(new ByteArrayInputStream(data), attributes); enqueue(new ByteArrayInputStream(data), attributes);
} }
@Override
public void enqueue(final String data, final Map<String, String> attributes) {
enqueue(data.getBytes(StandardCharsets.UTF_8), attributes);
}
@Override @Override
public void enqueue(final InputStream data) { public void enqueue(final InputStream data) {
enqueue(data, new HashMap<String, String>()); enqueue(data, new HashMap<String, String>());

View File

@ -363,16 +363,33 @@ public interface TestRunner {
*/ */
void enqueue(byte[] data); void enqueue(byte[] data);
/**
* Creates a FlowFile with the content set to the given string (in UTF-8 format), with no attributes,
* and adds this FlowFile to the Processor's Input Queue
*
* @param data to enqueue
*/
void enqueue(String data);
/** /**
* Copies the content from the given byte array into memory and creates a * Copies the content from the given byte array into memory and creates a
* FlowFile from this content with the given attributes and adds this * FlowFile from this content with the given attributes and adds this
* FlowFile to the Processor's Input Queue * FlowFile to the Processor's Input Queue
* *
* @param data to enqueue * @param data to enqueue
* @param attributes to use for enqueued items * @param attributes to use for enqueued item
*/ */
void enqueue(byte[] data, Map<String, String> attributes); void enqueue(byte[] data, Map<String, String> attributes);
/**
* Creates a FlowFile with the content set to the given string (in UTF-8 format), with the given attributes,
* and adds this FlowFile to the Processor's Input Queue
*
* @param data to enqueue
* @param attributes to use for enqueued item
*/
void enqueue(String data, Map<String, String> attributes);
/** /**
* Reads the content from the given {@link InputStream} into memory and * Reads the content from the given {@link InputStream} into memory and
* creates a FlowFile from this content with no attributes and adds this * creates a FlowFile from this content with no attributes and adds this

View File

@ -98,5 +98,8 @@ public class MockProcessContext implements ProcessContext {
return false; return false;
} }
@Override
public boolean isExpressionLanguagePresent(PropertyDescriptor property) {
return false;
}
} }

View File

@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerService;
@ -32,7 +33,6 @@ import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.processor.StandardPropertyValue;
import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.reporting.ComponentType;

View File

@ -23,6 +23,7 @@ import java.util.Set;
import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connectable;
@ -32,7 +33,6 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.StandardPropertyValue;
import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.EventAccess; import org.apache.nifi.reporting.EventAccess;

View File

@ -59,6 +59,9 @@ public class ConnectableProcessContext implements ProcessContext {
@Override @Override
public PropertyValue getProperty(final String propertyName) { public PropertyValue getProperty(final String propertyName) {
// None of the connectable components other than Processor's will ever need to evaluate these.
// Since Processors use a different implementation of ProcessContext all together, we will just
// return null for all values
return new PropertyValue() { return new PropertyValue() {
@Override @Override
public String getValue() { public String getValue() {
@ -134,6 +137,26 @@ public class ConnectableProcessContext implements ProcessContext {
public boolean isSet() { public boolean isSet() {
return false; return false;
} }
@Override
public PropertyValue evaluateAttributeExpressions(Map<String, String> attributes) throws ProcessException {
return null;
}
@Override
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes) throws ProcessException {
return null;
}
@Override
public PropertyValue evaluateAttributeExpressions(Map<String, String> attributes, AttributeValueDecorator decorator) throws ProcessException {
return null;
}
@Override
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator) throws ProcessException {
return null;
}
}; };
} }
@ -208,5 +231,8 @@ public class ConnectableProcessContext implements ProcessContext {
return connections != null && !connections.isEmpty(); return connections != null && !connections.isEmpty();
} }
@Override
public boolean isExpressionLanguagePresent(PropertyDescriptor property) {
return false;
}
} }

View File

@ -22,12 +22,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.processor.StandardPropertyValue;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
public class StandardConfigurationContext implements ConfigurationContext { public class StandardConfigurationContext implements ConfigurationContext {

View File

@ -19,11 +19,14 @@ package org.apache.nifi.processor;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Connection;
@ -196,4 +199,13 @@ public class StandardProcessContext implements ProcessContext, ControllerService
return connections != null && !connections.isEmpty(); return connections != null && !connections.isEmpty();
} }
@Override
public boolean isExpressionLanguagePresent(final PropertyDescriptor property) {
if (property == null || !property.isExpressionLanguageSupported()) {
return false;
}
final List<Range> elRanges = Query.extractExpressionRanges(getProperty(property).getValue());
return (elRanges != null && !elRanges.isEmpty());
}
} }

View File

@ -16,9 +16,12 @@
*/ */
package org.apache.nifi.processor; package org.apache.nifi.processor;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ControllerServiceLookup;
@ -126,4 +129,14 @@ public class StandardSchedulingContext implements SchedulingContext {
public boolean hasConnection(Relationship relationship) { public boolean hasConnection(Relationship relationship) {
return processContext.hasConnection(relationship); return processContext.hasConnection(relationship);
} }
@Override
public boolean isExpressionLanguagePresent(PropertyDescriptor property) {
if (property == null || !property.isExpressionLanguageSupported()) {
return false;
}
final List<Range> elRanges = Query.extractExpressionRanges(getProperty(property).getValue());
return (elRanges != null && !elRanges.isEmpty());
}
} }

View File

@ -26,6 +26,7 @@ import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range; import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.attribute.expression.language.StandardExpressionLanguageCompiler; import org.apache.nifi.attribute.expression.language.StandardExpressionLanguageCompiler;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.nifi.processor; package org.apache.nifi.processor;
import org.apache.nifi.processor.StandardPropertyValue;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.util.Calendar; import java.util.Calendar;
@ -25,6 +23,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ControllerServiceLookup;

View File

@ -18,11 +18,11 @@ package org.apache.nifi.web.controller;
import java.util.Map; import java.util.Map;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.processor.StandardPropertyValue;
import org.apache.nifi.search.SearchContext; import org.apache.nifi.search.SearchContext;
/** /**

View File

@ -25,17 +25,19 @@ import java.io.OutputStreamWriter;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -53,6 +55,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.FlowFileFilters; import org.apache.nifi.processor.util.FlowFileFilters;
@ -77,7 +80,8 @@ public class ReplaceText extends AbstractProcessor {
public static final String appendValue = "Append"; public static final String appendValue = "Append";
public static final String regexReplaceValue = "Regex Replace"; public static final String regexReplaceValue = "Regex Replace";
public static final String literalReplaceValue = "Literal Replace"; public static final String literalReplaceValue = "Literal Replace";
private final Pattern backReferencePattern = Pattern.compile("\\$(\\d+)"); public static final String alwaysReplace = "Always Replace";
private static final Pattern backReferencePattern = Pattern.compile("\\$(\\d+)");
private static final String DEFAULT_REGEX = "(?s:^.*$)"; private static final String DEFAULT_REGEX = "(?s:^.*$)";
private static final String DEFAULT_REPLACEMENT_VALUE = "$1"; private static final String DEFAULT_REPLACEMENT_VALUE = "$1";
@ -95,6 +99,9 @@ public class ReplaceText extends AbstractProcessor {
"Interpret the Search Value as a Regular Expression and replace all matches with the Replacement Value. The Replacement Value may reference Capturing Groups used " "Interpret the Search Value as a Regular Expression and replace all matches with the Replacement Value. The Replacement Value may reference Capturing Groups used "
+ "in the Search Value by using a dollar-sign followed by the Capturing Group number, such as $1 or $2. If the Search Value is set to .* then everything is replaced without " + "in the Search Value by using a dollar-sign followed by the Capturing Group number, such as $1 or $2. If the Search Value is set to .* then everything is replaced without "
+ "even evaluating the Regular Expression."); + "even evaluating the Regular Expression.");
static final AllowableValue ALWAYS_REPLACE = new AllowableValue(alwaysReplace, alwaysReplace,
"Always replaces the entire line or the entire contents of the FlowFile (depending on the value of the <Evaluation Mode> property) and does not bother searching "
+ "for any value. When this strategy is chosen, the <Search Value> property is ignored.");
public static final PropertyDescriptor SEARCH_VALUE = new PropertyDescriptor.Builder() public static final PropertyDescriptor SEARCH_VALUE = new PropertyDescriptor.Builder()
.name("Regular Expression") .name("Regular Expression")
@ -108,7 +115,9 @@ public class ReplaceText extends AbstractProcessor {
public static final PropertyDescriptor REPLACEMENT_VALUE = new PropertyDescriptor.Builder() public static final PropertyDescriptor REPLACEMENT_VALUE = new PropertyDescriptor.Builder()
.name("Replacement Value") .name("Replacement Value")
.description("The value to insert using the 'Replacement Strategy'. Using \"Regex Replace\" back-references to Regular Expression capturing groups " .description("The value to insert using the 'Replacement Strategy'. Using \"Regex Replace\" back-references to Regular Expression capturing groups "
+ "are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value.") + "are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value. "
+ "Back References may also be referenced using the Expression Language, as '$1', '$2', etc. The single-tick marks MUST be included, as these variables are "
+ "not \"Standard\" attribute names (attribute names must be quoted unless they contain only numbers, letters, and _).")
.required(true) .required(true)
.defaultValue(DEFAULT_REPLACEMENT_VALUE) .defaultValue(DEFAULT_REPLACEMENT_VALUE)
.addValidator(Validator.VALID) .addValidator(Validator.VALID)
@ -124,11 +133,11 @@ public class ReplaceText extends AbstractProcessor {
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Buffer Size") .name("Maximum Buffer Size")
.description("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to " .description("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to "
+ "apply the regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, " + "apply the replacement. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, "
+ "the FlowFile will be routed to 'failure'. " + "the FlowFile will be routed to 'failure'. "
+ "In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value " + "In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value "
+ "of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. " + "of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. "
+ "This value is ignored and the buffer is not used if 'Regular Expression' is set to '.*'") + "This value is ignored if the <Replacement Strategy> property is set to one of: Append, Prepend, Always Replace")
.required(true) .required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB") .defaultValue("1 MB")
@ -136,7 +145,7 @@ public class ReplaceText extends AbstractProcessor {
public static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder() public static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder()
.name("Replacement Strategy") .name("Replacement Strategy")
.description("The strategy for how and what to replace within the FlowFile's text content.") .description("The strategy for how and what to replace within the FlowFile's text content.")
.allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE) .allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE)
.defaultValue(REGEX_REPLACE.getValue()) .defaultValue(REGEX_REPLACE.getValue())
.required(true) .required(true)
.build(); .build();
@ -210,18 +219,6 @@ public class ReplaceText extends AbstractProcessor {
return; return;
} }
final AttributeValueDecorator escapeBackRefDecorator = new AttributeValueDecorator() {
@Override
public String decorate(final String attributeValue) {
return attributeValue.replace("$", "\\$");
}
};
final String regexValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions().getValue();
final int numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount();
final boolean skipBuffer = ".*".equals(unsubstitutedRegex);
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
@ -233,70 +230,44 @@ public class ReplaceText extends AbstractProcessor {
buffer = null; buffer = null;
} }
ReplacementStrategyExecutor replacementStrategyExecutor;
switch (replacementStrategy) {
case prependValue:
replacementStrategyExecutor = new PrependReplace();
break;
case appendValue:
replacementStrategyExecutor = new AppendReplace();
break;
case regexReplaceValue:
// for backward compatibility - if replacement regex is ".*" then we will simply always replace the content.
if (context.getProperty(SEARCH_VALUE).getValue().equals(".*")) {
replacementStrategyExecutor = new AlwaysReplace();
} else {
replacementStrategyExecutor = new RegexReplace(buffer, context);
}
break;
case literalReplaceValue:
replacementStrategyExecutor = new LiteralReplace(buffer);
break;
case alwaysReplace:
replacementStrategyExecutor = new AlwaysReplace();
break;
default:
throw new AssertionError();
}
for (FlowFile flowFile : flowFiles) { for (FlowFile flowFile : flowFiles) {
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
if (flowFile.getSize() > maxBufferSize && !skipBuffer) { if (flowFile.getSize() > maxBufferSize && replacementStrategyExecutor.isAllDataBufferedForEntireText()) {
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
continue; continue;
} }
} }
String replacement;
if (!replacementStrategy.equals(regexReplaceValue)) {
replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
} else {
replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, escapeBackRefDecorator).getValue();
final Matcher backRefMatcher = backReferencePattern.matcher(replacement);
while (backRefMatcher.find()) {
final String backRefNum = backRefMatcher.group(1);
if (backRefNum.startsWith("0")) {
continue;
}
final int originalBackRefIndex = Integer.parseInt(backRefNum);
int backRefIndex = originalBackRefIndex;
// if we have a replacement value like $123, and we have less than 123 capturing groups, then
// we want to truncate the 3 and use capturing group 12; if we have less than 12 capturing groups,
// then we want to truncate the 2 and use capturing group 1; if we don't have a capturing group then
// we want to truncate the 1 and get 0.
while (backRefIndex > numCapturingGroups && backRefIndex >= 10) {
backRefIndex /= 10;
}
if (backRefIndex > numCapturingGroups) {
final StringBuilder sb = new StringBuilder(replacement.length() + 1);
final int groupStart = backRefMatcher.start(1);
sb.append(replacement.substring(0, groupStart - 1));
sb.append("\\");
sb.append(replacement.substring(groupStart - 1));
replacement = sb.toString();
}
}
}
ReplacementStrategyExecutor replacementStrategyExecutor;
switch (replacementStrategy) {
case prependValue:
replacementStrategyExecutor = new PrependReplace();
break;
case appendValue:
replacementStrategyExecutor = new AppendReplace();
break;
case regexReplaceValue:
replacementStrategyExecutor = new RegexReplace(buffer);
break;
case literalReplaceValue:
replacementStrategyExecutor = new LiteralReplace(buffer);
break;
default:
throw new AssertionError();
}
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
flowFile = replacementStrategyExecutor.replace(flowFile, session, context, replacement, evaluateMode, flowFile = replacementStrategyExecutor.replace(flowFile, session, context, evaluateMode, charset, maxBufferSize);
charset, maxBufferSize, skipBuffer);
logger.info("Transferred {} to 'success'", new Object[] {flowFile}); logger.info("Transferred {} to 'success'", new Object[] {flowFile});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
@ -304,11 +275,106 @@ public class ReplaceText extends AbstractProcessor {
} }
} }
private static class PrependReplace implements ReplacementStrategyExecutor {
// If we find a back reference that is not valid, then we will treat it as a literal string. For example, if we have 3 capturing
// groups and the Replacement Value has the value is "I owe $8 to him", then we want to treat the $8 as a literal "$8", rather
// than attempting to use it as a back reference.
private static String escapeLiteralBackReferences(final String unescaped, final int numCapturingGroups) {
if (numCapturingGroups == 0) {
return unescaped;
}
String value = unescaped;
final Matcher backRefMatcher = backReferencePattern.matcher(value);
while (backRefMatcher.find()) {
final String backRefNum = backRefMatcher.group(1);
if (backRefNum.startsWith("0")) {
continue;
}
final int originalBackRefIndex = Integer.parseInt(backRefNum);
int backRefIndex = originalBackRefIndex;
// if we have a replacement value like $123, and we have less than 123 capturing groups, then
// we want to truncate the 3 and use capturing group 12; if we have less than 12 capturing groups,
// then we want to truncate the 2 and use capturing group 1; if we don't have a capturing group then
// we want to truncate the 1 and get 0.
while (backRefIndex > numCapturingGroups && backRefIndex >= 10) {
backRefIndex /= 10;
}
if (backRefIndex > numCapturingGroups) {
final StringBuilder sb = new StringBuilder(value.length() + 1);
final int groupStart = backRefMatcher.start(1);
sb.append(value.substring(0, groupStart - 1));
sb.append("\\");
sb.append(value.substring(groupStart - 1));
value = sb.toString();
}
}
return value;
}
private static class AlwaysReplace implements ReplacementStrategyExecutor {
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
final StringBuilder lineEndingBuilder = new StringBuilder(2);
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
out.write(replacementValue.getBytes(charset));
}
});
} else {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) {
String line;
while ((line = br.readLine()) != null) {
// We need to determine what line ending was used and use that after our replacement value.
lineEndingBuilder.setLength(0);
for (int i = line.length() - 1; i >= 0; i--) {
final char c = line.charAt(i);
if (c == '\r' || c == '\n') {
lineEndingBuilder.append(c);
} else {
break;
}
}
bw.write(replacementValue);
// Preserve original line endings. Reverse string because we iterated over original line ending in reverse order, appending to builder.
// So if builder has multiple characters, they are now reversed from the original string's ordering.
bw.write(lineEndingBuilder.reverse().toString());
}
}
}
});
}
return flowFile;
}
@Override @Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String replacementValue, final String evaluateMode, public boolean isAllDataBufferedForEntireText() {
final Charset charset, final int maxBufferSize, final boolean skipBuffer) { return false;
}
}
private static class PrependReplace implements ReplacementStrategyExecutor {
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() { flowFile = session.write(flowFile, new StreamCallback() {
@Override @Override
@ -334,13 +400,19 @@ public class ReplaceText extends AbstractProcessor {
} }
return flowFile; return flowFile;
} }
@Override
public boolean isAllDataBufferedForEntireText() {
return false;
}
} }
private static class AppendReplace implements ReplacementStrategyExecutor { private static class AppendReplace implements ReplacementStrategyExecutor {
@Override @Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String replacementValue, final String evaluateMode, public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final Charset charset, final int maxBufferSize, final boolean skipBuffer) { final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() { flowFile = session.write(flowFile, new StreamCallback() {
@Override @Override
@ -387,81 +459,121 @@ public class ReplaceText extends AbstractProcessor {
} }
return flowFile; return flowFile;
} }
@Override
public boolean isAllDataBufferedForEntireText() {
return false;
}
} }
private static class RegexReplace implements ReplacementStrategyExecutor { private static class RegexReplace implements ReplacementStrategyExecutor {
private final byte[] buffer; private final byte[] buffer;
private final int numCapturingGroups;
private final Map<String, String> additionalAttrs;
public RegexReplace(final byte[] buffer) { private static final AttributeValueDecorator escapeBackRefDecorator = new AttributeValueDecorator() {
@Override
public String decorate(final String attributeValue) {
return attributeValue.replace("$", "\\$");
}
};
public RegexReplace(final byte[] buffer, final ProcessContext context) {
this.buffer = buffer; this.buffer = buffer;
final String regexValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions().getValue();
numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount();
additionalAttrs = new HashMap<>(numCapturingGroups);
} }
@Override @Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String replacementValue, final String evaluateMode, public FlowFile replace(final FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final Charset charset, final int maxBufferSize, final boolean skipBuffer) { final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() {
final String replacementFinal = replacementValue.replaceAll("(\\$\\D)", "\\\\$1"); @Override
public String decorate(final String attributeValue) {
return Pattern.quote(attributeValue);
}
};
final String searchRegex = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
final Pattern searchPattern = Pattern.compile(searchRegex);
// always match; just overwrite value with the replacement value; this optimization prevents us final int flowFileSize = (int) flowFile.getSize();
// from reading the file at all. FlowFile updatedFlowFile;
if (skipBuffer) { if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { session.read(flowFile, new InputStreamCallback() {
flowFile = session.write(flowFile, new OutputStreamCallback() { @Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
}
});
final String contentString = new String(buffer, 0, flowFileSize, charset);
additionalAttrs.clear();
final Matcher matcher = searchPattern.matcher(contentString);
if (matcher.find()) {
for (int i = 1; i <= matcher.groupCount(); i++) {
final String groupValue = matcher.group(i);
additionalAttrs.put("$" + i, groupValue);
}
String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue();
replacement = escapeLiteralBackReferences(replacement, numCapturingGroups);
// If we have a $ followed by anything other than a number, then escape it. E.g., $d becomes \$d so that it can be used as a literal in a regex.
final String replacementFinal = replacement.replaceAll("(\\$\\D)", "\\\\$1");
final String updatedValue = contentString.replaceAll(searchRegex, replacementFinal);
updatedFlowFile = session.write(flowFile, new OutputStreamCallback() {
@Override @Override
public void process(final OutputStream out) throws IOException { public void process(final OutputStream out) throws IOException {
out.write(replacementFinal.getBytes(charset));
}
});
} else {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) {
while (null != br.readLine()) {
bw.write(replacementFinal);
}
}
}
});
}
} else {
final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() {
@Override
public String decorate(final String attributeValue) {
return Pattern.quote(attributeValue);
}
};
final String searchRegex = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
final int flowFileSize = (int) flowFile.getSize();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
final String contentString = new String(buffer, 0, flowFileSize, charset);
final String updatedValue = contentString.replaceAll(searchRegex, replacementFinal);
out.write(updatedValue.getBytes(charset)); out.write(updatedValue.getBytes(charset));
} }
}); });
} else { } else {
flowFile = session.write(flowFile, new StreamCallback() { // If no match, just return the original. No need to write out any content.
@Override return flowFile;
public void process(final InputStream in, final OutputStream out) throws IOException { }
try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize); } else {
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) { updatedFlowFile = session.write(flowFile, new StreamCallback() {
String oneLine; @Override
while (null != (oneLine = br.readLine())) { public void process(final InputStream in, final OutputStream out) throws IOException {
try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
String oneLine;
while (null != (oneLine = br.readLine())) {
additionalAttrs.clear();
final Matcher matcher = searchPattern.matcher(oneLine);
if (matcher.find()) {
for (int i = 1; i <= matcher.groupCount(); i++) {
final String groupValue = matcher.group(i);
additionalAttrs.put("$" + i, groupValue);
}
String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue();
replacement = escapeLiteralBackReferences(replacement, numCapturingGroups);
// If we have a $ followed by anything other than a number, then escape it. E.g., $d becomes \$d so that it can be used as a literal in a regex.
final String replacementFinal = replacement.replaceAll("(\\$\\D)", "\\\\$1");
final String updatedValue = oneLine.replaceAll(searchRegex, replacementFinal); final String updatedValue = oneLine.replaceAll(searchRegex, replacementFinal);
bw.write(updatedValue); bw.write(updatedValue);
} else {
// No match. Just write out the line as it was.
bw.write(oneLine);
} }
} }
} }
}); }
} });
} }
return flowFile;
return updatedFlowFile;
}
@Override
public boolean isAllDataBufferedForEntireText() {
return true;
} }
} }
@ -473,8 +585,10 @@ public class ReplaceText extends AbstractProcessor {
} }
@Override @Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String replacementValue, final String evaluateMode, public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final Charset charset, final int maxBufferSize, final boolean skipBuffer) {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() { final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() {
@Override @Override
public String decorate(final String attributeValue) { public String decorate(final String attributeValue) {
@ -484,7 +598,6 @@ public class ReplaceText extends AbstractProcessor {
final String searchValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue(); final String searchValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
final int flowFileSize = (int) flowFile.getSize(); final int flowFileSize = (int) flowFile.getSize();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() { flowFile = session.write(flowFile, new StreamCallback() {
@ -515,9 +628,16 @@ public class ReplaceText extends AbstractProcessor {
} }
return flowFile; return flowFile;
} }
@Override
public boolean isAllDataBufferedForEntireText() {
return true;
}
} }
private interface ReplacementStrategyExecutor { private interface ReplacementStrategyExecutor {
FlowFile replace(FlowFile flowFile, ProcessSession session, ProcessContext context, String replacement, String evaluateMode, Charset charset, int maxBufferSize, boolean skipBuffer); FlowFile replace(FlowFile flowFile, ProcessSession session, ProcessContext context, String evaluateMode, Charset charset, int maxBufferSize);
boolean isAllDataBufferedForEntireText();
} }
} }

View File

@ -914,13 +914,13 @@ public class TestReplaceText {
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "Good"); attributes.put("abc", "Good");
runner.enqueue(translateNewLines(Paths.get("src/test/resources/TestReplaceTextLineByLine/testFile.txt")), attributes); runner.enqueue(Paths.get("src/test/resources/TestReplaceTextLineByLine/testFile.txt"), attributes);
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0); final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
out.assertContentEquals(translateNewLines(new File("src/test/resources/TestReplaceTextLineByLine/Good.txt"))); out.assertContentEquals("Good\nGood\nGood\nGood\nGood\nGood\nGood\nGood\nGood\nGood\nGood");
} }
@Test @Test
@ -939,7 +939,7 @@ public class TestReplaceText {
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0); final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
out.assertContentEquals("GoodGoodGood"); out.assertContentEquals("Good\r\nGood\r\nGood\r");
} }
@Test @Test
@ -984,17 +984,108 @@ public class TestReplaceText {
System.out.println(outContent); System.out.println(outContent);
Assert.assertTrue(outContent.equals("attribute header\n\nabc.txt\n\ndata header\n\nHello\n\n\nfooter\n" Assert.assertTrue(outContent.equals("attribute header\n\nabc.txt\n\ndata header\n\nHello\n\n\nfooter\n"
+ "attribute header\n\nabc.txt\n\ndata header\n\nWorld!\n\nfooter\n")); + "attribute header\n\nabc.txt\n\ndata header\n\nWorld!\n\nfooter\n"));
} }
private byte[] translateNewLines(final File file) throws IOException { @Test
public void testCapturingGroupInExpressionLanguage() {
final TestRunner runner = TestRunners.newTestRunner(new ReplaceText());
runner.setValidateExpressionUsage(false);
runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.LINE_BY_LINE);
runner.setProperty(ReplaceText.SEARCH_VALUE, "(.*?),(.*?),(\\d+.*)");
runner.setProperty(ReplaceText.REPLACEMENT_VALUE, "$1,$2,${ '$3':toDate('ddMMMyyyy'):format('yyyy/MM/dd') }");
final String csvIn =
"2006,10-01-2004,10may2004\n"
+ "2007,15-05-2006,10jun2005\r\n"
+ "2009,8-8-2008,10aug2008";
final String expectedCsvOut =
"2006,10-01-2004,2004/05/10\n"
+ "2007,15-05-2006,2005/06/10\r\n"
+ "2009,8-8-2008,2008/08/10";
runner.enqueue(csvIn.getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
out.assertContentEquals(expectedCsvOut);
}
@Test
public void testCapturingGroupInExpressionLanguage2() {
final TestRunner runner = TestRunners.newTestRunner(new ReplaceText());
runner.setValidateExpressionUsage(false);
runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.LINE_BY_LINE);
runner.setProperty(ReplaceText.SEARCH_VALUE, "(.*)/(.*?).jpg");
runner.setProperty(ReplaceText.REPLACEMENT_VALUE, "$1/${ '$2':substring(0,1) }.png");
final String csvIn =
"1,2,3,https://123.jpg,email@mydomain.com\n"
+ "3,2,1,https://321.jpg,other.email@mydomain.com";
final String expectedCsvOut =
"1,2,3,https://1.png,email@mydomain.com\n"
+ "3,2,1,https://3.png,other.email@mydomain.com";
runner.enqueue(csvIn.getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
out.assertContentEquals(expectedCsvOut);
}
@Test
public void testAlwaysReplaceEntireText() {
final TestRunner runner = TestRunners.newTestRunner(new ReplaceText());
runner.setValidateExpressionUsage(false);
runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.ENTIRE_TEXT);
runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, ReplaceText.ALWAYS_REPLACE);
runner.setProperty(ReplaceText.SEARCH_VALUE, "i do not exist anywhere in the text");
runner.setProperty(ReplaceText.REPLACEMENT_VALUE, "${filename}");
final Map<String, String> attributes = new HashMap<>();
attributes.put("filename", "abc.txt");
runner.enqueue("Hello\nWorld!".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
out.assertContentEquals("abc.txt");
}
@Test
public void testAlwaysReplaceLineByLine() {
final TestRunner runner = TestRunners.newTestRunner(new ReplaceText());
runner.setValidateExpressionUsage(false);
runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.LINE_BY_LINE);
runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, ReplaceText.ALWAYS_REPLACE);
runner.setProperty(ReplaceText.SEARCH_VALUE, "i do not exist anywhere in the text");
runner.setProperty(ReplaceText.REPLACEMENT_VALUE, "${filename}");
final Map<String, String> attributes = new HashMap<>();
attributes.put("filename", "abc.txt");
runner.enqueue("Hello\nWorld!\r\ntoday!\n".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
out.assertContentEquals("abc.txt\nabc.txt\r\nabc.txt\n");
}
private String translateNewLines(final File file) throws IOException {
return translateNewLines(file.toPath()); return translateNewLines(file.toPath());
} }
private byte[] translateNewLines(final Path path) throws IOException { private String translateNewLines(final Path path) throws IOException {
final byte[] data = Files.readAllBytes(path); final byte[] data = Files.readAllBytes(path);
final String text = new String(data, StandardCharsets.UTF_8); final String text = new String(data, StandardCharsets.UTF_8);
return translateNewLines(text).getBytes(StandardCharsets.UTF_8); return translateNewLines(text);
} }
private String translateNewLines(final String text) { private String translateNewLines(final String text) {