NIFI-1249: Allow Processors to add their own variables to those referencable by Expression Language. Make ReplaceText allow users to reference back-references of regex matches

This commit is contained in:
Mark Payne 2015-12-04 13:17:37 -05:00
parent dce039b54f
commit f378ee9021
29 changed files with 659 additions and 327 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.components;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.ControllerService;
@ -120,7 +121,7 @@ public interface PropertyValue {
/**
* <p>
* Replaces values in the Property Value using the Attribute Expression
* Replaces values in the Property Value using the NiFi Expression
* Language; a PropertyValue with the new value is then returned, supporting
* call chaining.
* </p>
@ -128,14 +129,49 @@ public interface PropertyValue {
* @return a PropertyValue with the new value is returned, supporting call
* chaining
*
* @throws ProcessException if the Query cannot be compiled or evaluating
* the query against the given attributes causes an Exception to be thrown
* @throws ProcessException if the Expression cannot be compiled or evaluating
* the Expression against the given attributes causes an Exception to be thrown
*/
public PropertyValue evaluateAttributeExpressions() throws ProcessException;
/**
* <p>
* Replaces values in the Property Value using the Attribute Expression
* Replaces values in the Property Value using the NiFi Expression Language;
* a PropertyValue with the new value is then returned, supporting call chaining.
* </p>
*
* @param attributes a Map of attributes that the Expression can reference, in addition
* to JVM System Properties and Environmental Properties.
*
* @return a PropertyValue with the new value
*
* @throws ProcessException if the Expression cannot be compiled or evaluating the Expression against
* the given attributes causes an Exception to be thrown
*/
public PropertyValue evaluateAttributeExpressions(Map<String, String> attributes) throws ProcessException;
/**
* <p>
* Replaces values in the Property Value using the NiFi Expression Language.
* The supplied decorator is then given a chance to decorate the
* value, and a PropertyValue with the new value is then returned,
* supporting call chaining.
* </p>
*
* @param attributes a Map of attributes that the Expression can reference, in addition
* to JVM System Properties and Environmental Properties.
* @param decorator the decorator to use in order to update the values returned by the Expression Language
*
* @return a PropertyValue with the new value
*
* @throws ProcessException if the Expression cannot be compiled or evaluating the Expression against
* the given attributes causes an Exception to be thrown
*/
public PropertyValue evaluateAttributeExpressions(Map<String, String> attributes, AttributeValueDecorator decorator) throws ProcessException;
/**
* <p>
* Replaces values in the Property Value using the NiFi Expression
* Language; a PropertyValue with the new value is then returned, supporting
* call chaining.
* </p>
@ -144,14 +180,53 @@ public interface PropertyValue {
* @return a PropertyValue with the new value is returned, supporting call
* chaining
*
* @throws ProcessException if the Query cannot be compiled or evaluating
* the query against the given attributes causes an Exception to be thrown
* @throws ProcessException if the Expression cannot be compiled or evaluating
* the Expression against the given attributes causes an Exception to be thrown
*/
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile) throws ProcessException;
/**
* <p>
* Replaces values in the Property Value using the Attribute Expression
* Replaces values in the Property Value using the NiFi Expression
* Language; a PropertyValue with the new value is then returned, supporting
* call chaining.
* </p>
*
* @param flowFile to evaluate attributes of
* @param additionalAttributes a Map of additional attributes that the Expression can reference. If entries in
* this Map conflict with entries in the FlowFile's attributes, the entries in this Map are given a higher priority.
*
* @return a PropertyValue with the new value is returned, supporting call
* chaining
*
* @throws ProcessException if the Expression cannot be compiled or evaluating
* the Expression against the given attributes causes an Exception to be thrown
*/
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes) throws ProcessException;
/**
* <p>
* Replaces values in the Property Value using the NiFi Expression
* Language; a PropertyValue with the new value is then returned, supporting
* call chaining.
* </p>
*
* @param flowFile to evaluate attributes of
* @param additionalAttributes a Map of additional attributes that the Expression can reference. If entries in
* this Map conflict with entries in the FlowFile's attributes, the entries in this Map are given a higher priority.
* @param decorator the decorator to use in order to update the values returned by the Expression Language
*
* @return a PropertyValue with the new value is returned, supporting call
* chaining
*
* @throws ProcessException if the Expression cannot be compiled or evaluating
* the Expression against the given attributes causes an Exception to be thrown
*/
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator) throws ProcessException;
/**
* <p>
* Replaces values in the Property Value using the NiFi Expression
* Language. The supplied decorator is then given a chance to decorate the
* value, and a PropertyValue with the new value is then returned,
* supporting call chaining.
@ -162,14 +237,14 @@ public interface PropertyValue {
* @return a PropertyValue with the new value is then returned, supporting
* call chaining
*
* @throws ProcessException if the Query cannot be compiled or evaluating
* the query against the given attributes causes an Exception to be thrown
* @throws ProcessException if the Expression cannot be compiled or evaluating
* the Expression against the given attributes causes an Exception to be thrown
*/
public PropertyValue evaluateAttributeExpressions(AttributeValueDecorator decorator) throws ProcessException;
/**
* <p>
* Replaces values in the Property Value using the Attribute Expression
* Replaces values in the Property Value using the NiFi Expression
* Language. The supplied decorator is then given a chance to decorate the
* value, and a PropertyValue with the new value is then returned,
* supporting call chaining.
@ -182,8 +257,8 @@ public interface PropertyValue {
* @return a PropertyValue with the new value is then returned, supporting
* call chaining
*
* @throws ProcessException if the Query cannot be compiled or evaluating
* the query against the given attributes causes an Exception to be thrown
* @throws ProcessException if the Expression cannot be compiled or evaluating
* the Expression against the given attributes causes an Exception to be thrown
*/
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, AttributeValueDecorator decorator) throws ProcessException;
}

View File

@ -147,4 +147,11 @@ public interface ProcessContext {
*/
boolean hasConnection(Relationship relationship);
/**
* @param property the Property whose value should be inspected to determined if it contains an Expression Language Expression
* @return <code>true</code> if the value of the given Property contains a NiFi Expression
* Language Expression, <code>false</code> if it does not. Note that <code>false</code> will be returned if the Property Descriptor
* does not allow the Expression Language, even if a seemingly valid Expression is present in the value.
*/
boolean isExpressionLanguagePresent(PropertyDescriptor property);
}

View File

@ -36,6 +36,7 @@ import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.bootstrap.notification.NotificationContext;
import org.apache.nifi.bootstrap.notification.NotificationInitializationContext;
import org.apache.nifi.bootstrap.notification.NotificationService;
@ -246,7 +247,7 @@ public class NotificationServiceManager {
configuredValue = fullPropDescriptor.getDefaultValue();
}
return new NotificationServicePropertyValue(configuredValue);
return new StandardPropertyValue(configuredValue, null);
}
@Override
@ -363,7 +364,7 @@ public class NotificationServiceManager {
value = descriptor.getDefaultValue();
}
return new NotificationServicePropertyValue(value);
return new StandardPropertyValue(value, null);
}
@Override

View File

@ -1,120 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.AttributeValueDecorator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
public class NotificationServicePropertyValue implements PropertyValue {
private final String rawValue;
private final PreparedQuery preparedQuery;
public NotificationServicePropertyValue(final String rawValue) {
this.rawValue = rawValue;
this.preparedQuery = Query.prepare(rawValue);
}
@Override
public String getValue() {
return rawValue;
}
@Override
public Integer asInteger() {
return (rawValue == null) ? null : Integer.parseInt(rawValue.trim());
}
@Override
public Long asLong() {
return (rawValue == null) ? null : Long.parseLong(rawValue.trim());
}
@Override
public Boolean asBoolean() {
return (rawValue == null) ? null : Boolean.parseBoolean(rawValue.trim());
}
@Override
public Float asFloat() {
return (rawValue == null) ? null : Float.parseFloat(rawValue.trim());
}
@Override
public Double asDouble() {
return (rawValue == null) ? null : Double.parseDouble(rawValue.trim());
}
@Override
public Long asTimePeriod(final TimeUnit timeUnit) {
return (rawValue == null) ? null : FormatUtils.getTimeDuration(rawValue.trim(), timeUnit);
}
@Override
public Double asDataSize(final DataUnit dataUnit) {
return rawValue == null ? null : DataUnit.parseDataSize(rawValue.trim(), dataUnit);
}
@Override
public PropertyValue evaluateAttributeExpressions() throws ProcessException {
return evaluateAttributeExpressions((AttributeValueDecorator) null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException {
return new NotificationServicePropertyValue(preparedQuery.evaluateExpressions((FlowFile) null, decorator));
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException {
throw new UnsupportedOperationException();
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException {
throw new UnsupportedOperationException();
}
@Override
public String toString() {
return rawValue;
}
@Override
public ControllerService asControllerService() {
throw new UnsupportedOperationException();
}
@Override
public <T extends ControllerService> T asControllerService(final Class<T> serviceType) throws IllegalArgumentException {
throw new UnsupportedOperationException();
}
@Override
public boolean isSet() {
return rawValue != null;
}
}

View File

@ -23,7 +23,7 @@ import java.util.Map;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.attribute.expression.language.StandardExpressionLanguageCompiler;
import org.apache.nifi.bootstrap.NotificationServicePropertyValue;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
@ -48,7 +48,7 @@ public class NotificationValidationContext implements ValidationContext {
@Override
public PropertyValue newPropertyValue(final String rawValue) {
return new NotificationServicePropertyValue(rawValue);
return new StandardPropertyValue(rawValue, null);
}
@Override

View File

@ -52,5 +52,9 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -59,4 +59,9 @@ public class EmptyPreparedQuery implements PreparedQuery {
public String evaluateExpressions(Map<String, String> attributes, AttributeValueDecorator decorator) throws ProcessException {
return value;
}
@Override
public String evaluateExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator) throws ProcessException {
return value;
}
}

View File

@ -69,4 +69,8 @@ public class InvalidPreparedQuery implements PreparedQuery {
throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation);
}
@Override
public String evaluateExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator) throws ProcessException {
throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation);
}
}

View File

@ -36,4 +36,5 @@ public interface PreparedQuery {
String evaluateExpressions(Map<String, String> attributes, AttributeValueDecorator decorator) throws ProcessException;
String evaluateExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator) throws ProcessException;
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.attribute.expression.language;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -416,7 +417,12 @@ public class Query {
}
static Map<String, String> createExpressionMap(final FlowFile flowFile) {
final Map<String, String> attributeMap = flowFile == null ? new HashMap<String, String>() : flowFile.getAttributes();
return createExpressionMap(flowFile, null);
}
static Map<String, String> createExpressionMap(final FlowFile flowFile, final Map<String, String> additionalAttributes) {
final Map<String, String> attributeMap = flowFile == null ? Collections.<String, String> emptyMap() : flowFile.getAttributes();
final Map<String, String> additionalOrEmpty = additionalAttributes == null ? Collections.<String, String> emptyMap() : additionalAttributes;
final Map<String, String> envMap = System.getenv();
final Map<?, ?> sysProps = System.getProperties();
@ -428,13 +434,13 @@ public class Query {
flowFileProps.put("lineageStartDate", String.valueOf(flowFile.getLineageStartDate()));
}
return wrap(attributeMap, flowFileProps, envMap, sysProps);
return wrap(additionalOrEmpty, attributeMap, flowFileProps, envMap, sysProps);
}
private static Map<String, String> wrap(final Map<String, String> attributes, final Map<String, String> flowFileProps,
private static Map<String, String> wrap(final Map<String, String> additional, final Map<String, String> attributes, final Map<String, String> flowFileProps,
final Map<String, String> env, final Map<?, ?> sysProps) {
@SuppressWarnings("rawtypes")
final Map[] maps = new Map[]{attributes, flowFileProps, env, sysProps};
final Map[] maps = new Map[] {additional, attributes, flowFileProps, env, sysProps};
return new Map<String, String>() {
@Override

View File

@ -59,6 +59,12 @@ public class StandardPreparedQuery implements PreparedQuery {
return sb.toString();
}
@Override
public String evaluateExpressions(final FlowFile flowFile, final Map<String, String> additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException {
final Map<String, String> expressionMap = Query.createExpressionMap(flowFile, additionalAttributes);
return evaluateExpressions(expressionMap, decorator);
}
@Override
public String evaluateExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException {
final Map<String, String> expressionMap = Query.createExpressionMap(flowFile);

View File

@ -14,21 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor;
package org.apache.nifi.attribute.expression.language;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.expression.AttributeValueDecorator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
public final class StandardPropertyValue implements PropertyValue {
public class StandardPropertyValue implements PropertyValue {
private final String rawValue;
private final ControllerServiceLookup serviceLookup;
@ -95,26 +95,46 @@ public final class StandardPropertyValue implements PropertyValue {
@Override
public PropertyValue evaluateAttributeExpressions() throws ProcessException {
return evaluateAttributeExpressions(null, null);
return evaluateAttributeExpressions(null, null, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final Map<String, String> attributes) throws ProcessException {
return evaluateAttributeExpressions(null, attributes, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final Map<String, String> attributes, final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(null, attributes, decorator);
}
@Override
public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(null, decorator);
return evaluateAttributeExpressions(null, null, decorator);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException {
return evaluateAttributeExpressions(flowFile, null);
return evaluateAttributeExpressions(flowFile, null, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map<String, String> additionalAttributes) throws ProcessException {
return evaluateAttributeExpressions(flowFile, additionalAttributes, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(flowFile, null, decorator);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map<String, String> additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException {
if (rawValue == null || preparedQuery == null) {
return this;
}
return new StandardPropertyValue(preparedQuery.evaluateExpressions(flowFile, decorator), serviceLookup, null);
return new StandardPropertyValue(preparedQuery.evaluateExpressions(flowFile, additionalAttributes, decorator), serviceLookup, null);
}
@Override
@ -124,7 +144,7 @@ public final class StandardPropertyValue implements PropertyValue {
@Override
public ControllerService asControllerService() {
if (rawValue == null || rawValue.equals("")) {
if (rawValue == null || rawValue.equals("") || serviceLookup == null) {
return null;
}
@ -136,7 +156,7 @@ public final class StandardPropertyValue implements PropertyValue {
if (!serviceType.isInterface()) {
throw new IllegalArgumentException("ControllerServices may be referenced only via their interfaces; " + serviceType + " is not an interface");
}
if (rawValue == null || rawValue.equals("")) {
if (rawValue == null || rawValue.equals("") || serviceLookup == null) {
return null;
}

View File

@ -28,6 +28,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
@ -331,4 +333,13 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
}
}
@Override
public boolean isExpressionLanguagePresent(final PropertyDescriptor property) {
if (property == null || !property.isExpressionLanguageSupported()) {
return false;
}
final List<Range> elRanges = Query.extractExpressionRanges(getProperty(property).getValue());
return (elRanges != null && !elRanges.isEmpty());
}
}

View File

@ -16,9 +16,10 @@
*/
package org.apache.nifi.util;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerService;
@ -29,11 +30,11 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.exception.ProcessException;
public class MockPropertyValue implements PropertyValue {
private final String rawValue;
private final Boolean expectExpressions;
private final ControllerServiceLookup serviceLookup;
private final PropertyDescriptor propertyDescriptor;
private final PropertyValue stdPropValue;
private boolean expressionsEvaluated = false;
public MockPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup) {
@ -41,12 +42,20 @@ public class MockPropertyValue implements PropertyValue {
}
public MockPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup, final PropertyDescriptor propertyDescriptor) {
this(rawValue, serviceLookup, propertyDescriptor, false);
}
private MockPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup, final PropertyDescriptor propertyDescriptor, final boolean alreadyEvaluated) {
this.stdPropValue = new StandardPropertyValue(rawValue, serviceLookup);
this.rawValue = rawValue;
this.serviceLookup = serviceLookup;
this.expectExpressions = propertyDescriptor == null ? null : propertyDescriptor.isExpressionLanguageSupported();
this.propertyDescriptor = propertyDescriptor;
this.expressionsEvaluated = alreadyEvaluated;
}
private void ensureExpressionsEvaluated() {
if (Boolean.TRUE.equals(expectExpressions) && !expressionsEvaluated) {
throw new IllegalStateException("Attempting to retrieve value of " + propertyDescriptor
@ -59,49 +68,49 @@ public class MockPropertyValue implements PropertyValue {
@Override
public String getValue() {
ensureExpressionsEvaluated();
return rawValue;
return stdPropValue.getValue();
}
@Override
public Integer asInteger() {
ensureExpressionsEvaluated();
return (rawValue == null) ? null : Integer.parseInt(rawValue.trim());
return stdPropValue.asInteger();
}
@Override
public Long asLong() {
ensureExpressionsEvaluated();
return (rawValue == null) ? null : Long.parseLong(rawValue.trim());
return stdPropValue.asLong();
}
@Override
public Boolean asBoolean() {
ensureExpressionsEvaluated();
return (rawValue == null) ? null : Boolean.parseBoolean(rawValue.trim());
return stdPropValue.asBoolean();
}
@Override
public Float asFloat() {
ensureExpressionsEvaluated();
return (rawValue == null) ? null : Float.parseFloat(rawValue.trim());
return stdPropValue.asFloat();
}
@Override
public Double asDouble() {
ensureExpressionsEvaluated();
return (rawValue == null) ? null : Double.parseDouble(rawValue.trim());
return stdPropValue.asDouble();
}
@Override
public Long asTimePeriod(final TimeUnit timeUnit) {
ensureExpressionsEvaluated();
return (rawValue == null) ? null : FormatUtils.getTimeDuration(rawValue.trim(), timeUnit);
return stdPropValue.asTimePeriod(timeUnit);
}
@Override
public Double asDataSize(final DataUnit dataUnit) {
ensureExpressionsEvaluated();
return rawValue == null ? null : DataUnit.parseDataSize(rawValue.trim(), dataUnit);
return stdPropValue.asDataSize(dataUnit);
}
private void markEvaluated() {
@ -115,38 +124,48 @@ public class MockPropertyValue implements PropertyValue {
@Override
public PropertyValue evaluateAttributeExpressions() throws ProcessException {
markEvaluated();
if (rawValue == null) {
return this;
}
return evaluateAttributeExpressions(null, null);
return evaluateAttributeExpressions(null, null, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException {
markEvaluated();
if (rawValue == null) {
return this;
}
return evaluateAttributeExpressions(null, decorator);
return evaluateAttributeExpressions(null, null, decorator);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException {
markEvaluated();
if (rawValue == null) {
return this;
}
return evaluateAttributeExpressions(flowFile, null);
return evaluateAttributeExpressions(flowFile, null, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(flowFile, null, decorator);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map<String, String> additionalAttributes) throws ProcessException {
return evaluateAttributeExpressions(flowFile, additionalAttributes, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final Map<String, String> attributes) throws ProcessException {
return evaluateAttributeExpressions(null, attributes, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final Map<String, String> attributes, final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(null, attributes, decorator);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map<String, String> additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException {
markEvaluated();
if (rawValue == null) {
return this;
}
return new MockPropertyValue(Query.evaluateExpressions(rawValue, flowFile, decorator), serviceLookup);
final PropertyValue newValue = stdPropValue.evaluateAttributeExpressions(flowFile, additionalAttributes, decorator);
return new MockPropertyValue(newValue.getValue(), serviceLookup, propertyDescriptor, true);
}
@Override

View File

@ -24,6 +24,7 @@ import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@ -384,11 +385,22 @@ public class StandardProcessorTestRunner implements TestRunner {
enqueue(data, new HashMap<String, String>());
}
@Override
public void enqueue(final String data) {
enqueue(data.getBytes(StandardCharsets.UTF_8), Collections.<String, String> emptyMap());
}
@Override
public void enqueue(final byte[] data, final Map<String, String> attributes) {
enqueue(new ByteArrayInputStream(data), attributes);
}
@Override
public void enqueue(final String data, final Map<String, String> attributes) {
enqueue(data.getBytes(StandardCharsets.UTF_8), attributes);
}
@Override
public void enqueue(final InputStream data) {
enqueue(data, new HashMap<String, String>());

View File

@ -363,16 +363,33 @@ public interface TestRunner {
*/
void enqueue(byte[] data);
/**
* Creates a FlowFile with the content set to the given string (in UTF-8 format), with no attributes,
* and adds this FlowFile to the Processor's Input Queue
*
* @param data to enqueue
*/
void enqueue(String data);
/**
* Copies the content from the given byte array into memory and creates a
* FlowFile from this content with the given attributes and adds this
* FlowFile to the Processor's Input Queue
*
* @param data to enqueue
* @param attributes to use for enqueued items
* @param attributes to use for enqueued item
*/
void enqueue(byte[] data, Map<String, String> attributes);
/**
* Creates a FlowFile with the content set to the given string (in UTF-8 format), with the given attributes,
* and adds this FlowFile to the Processor's Input Queue
*
* @param data to enqueue
* @param attributes to use for enqueued item
*/
void enqueue(String data, Map<String, String> attributes);
/**
* Reads the content from the given {@link InputStream} into memory and
* creates a FlowFile from this content with no attributes and adds this

View File

@ -98,5 +98,8 @@ public class MockProcessContext implements ProcessContext {
return false;
}
@Override
public boolean isExpressionLanguagePresent(PropertyDescriptor property) {
return false;
}
}

View File

@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerService;
@ -32,7 +33,6 @@ import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.processor.StandardPropertyValue;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;

View File

@ -23,6 +23,7 @@ import java.util.Set;
import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.connectable.Connectable;
@ -32,7 +33,6 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.StandardPropertyValue;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.EventAccess;

View File

@ -59,6 +59,9 @@ public class ConnectableProcessContext implements ProcessContext {
@Override
public PropertyValue getProperty(final String propertyName) {
// None of the connectable components other than Processor's will ever need to evaluate these.
// Since Processors use a different implementation of ProcessContext all together, we will just
// return null for all values
return new PropertyValue() {
@Override
public String getValue() {
@ -134,6 +137,26 @@ public class ConnectableProcessContext implements ProcessContext {
public boolean isSet() {
return false;
}
@Override
public PropertyValue evaluateAttributeExpressions(Map<String, String> attributes) throws ProcessException {
return null;
}
@Override
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes) throws ProcessException {
return null;
}
@Override
public PropertyValue evaluateAttributeExpressions(Map<String, String> attributes, AttributeValueDecorator decorator) throws ProcessException {
return null;
}
@Override
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator) throws ProcessException {
return null;
}
};
}
@ -208,5 +231,8 @@ public class ConnectableProcessContext implements ProcessContext {
return connections != null && !connections.isEmpty();
}
@Override
public boolean isExpressionLanguagePresent(PropertyDescriptor property) {
return false;
}
}

View File

@ -22,12 +22,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.processor.StandardPropertyValue;
import org.apache.nifi.util.FormatUtils;
public class StandardConfigurationContext implements ConfigurationContext {

View File

@ -19,11 +19,14 @@ package org.apache.nifi.processor;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.connectable.Connection;
@ -196,4 +199,13 @@ public class StandardProcessContext implements ProcessContext, ControllerService
return connections != null && !connections.isEmpty();
}
@Override
public boolean isExpressionLanguagePresent(final PropertyDescriptor property) {
if (property == null || !property.isExpressionLanguageSupported()) {
return false;
}
final List<Range> elRanges = Query.extractExpressionRanges(getProperty(property).getValue());
return (elRanges != null && !elRanges.isEmpty());
}
}

View File

@ -16,9 +16,12 @@
*/
package org.apache.nifi.processor;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerServiceLookup;
@ -126,4 +129,14 @@ public class StandardSchedulingContext implements SchedulingContext {
public boolean hasConnection(Relationship relationship) {
return processContext.hasConnection(relationship);
}
@Override
public boolean isExpressionLanguagePresent(PropertyDescriptor property) {
if (property == null || !property.isExpressionLanguageSupported()) {
return false;
}
final List<Range> elRanges = Query.extractExpressionRanges(getProperty(property).getValue());
return (elRanges != null && !elRanges.isEmpty());
}
}

View File

@ -26,6 +26,7 @@ import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.attribute.expression.language.StandardExpressionLanguageCompiler;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;

View File

@ -16,8 +16,6 @@
*/
package org.apache.nifi.processor;
import org.apache.nifi.processor.StandardPropertyValue;
import static org.junit.Assert.assertEquals;
import java.util.Calendar;
@ -25,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;

View File

@ -18,11 +18,11 @@ package org.apache.nifi.web.controller;
import java.util.Map;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.processor.StandardPropertyValue;
import org.apache.nifi.search.SearchContext;
/**

View File

@ -25,17 +25,19 @@ import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -53,6 +55,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.FlowFileFilters;
@ -77,7 +80,8 @@ public class ReplaceText extends AbstractProcessor {
public static final String appendValue = "Append";
public static final String regexReplaceValue = "Regex Replace";
public static final String literalReplaceValue = "Literal Replace";
private final Pattern backReferencePattern = Pattern.compile("\\$(\\d+)");
public static final String alwaysReplace = "Always Replace";
private static final Pattern backReferencePattern = Pattern.compile("\\$(\\d+)");
private static final String DEFAULT_REGEX = "(?s:^.*$)";
private static final String DEFAULT_REPLACEMENT_VALUE = "$1";
@ -95,6 +99,9 @@ public class ReplaceText extends AbstractProcessor {
"Interpret the Search Value as a Regular Expression and replace all matches with the Replacement Value. The Replacement Value may reference Capturing Groups used "
+ "in the Search Value by using a dollar-sign followed by the Capturing Group number, such as $1 or $2. If the Search Value is set to .* then everything is replaced without "
+ "even evaluating the Regular Expression.");
static final AllowableValue ALWAYS_REPLACE = new AllowableValue(alwaysReplace, alwaysReplace,
"Always replaces the entire line or the entire contents of the FlowFile (depending on the value of the <Evaluation Mode> property) and does not bother searching "
+ "for any value. When this strategy is chosen, the <Search Value> property is ignored.");
public static final PropertyDescriptor SEARCH_VALUE = new PropertyDescriptor.Builder()
.name("Regular Expression")
@ -108,7 +115,9 @@ public class ReplaceText extends AbstractProcessor {
public static final PropertyDescriptor REPLACEMENT_VALUE = new PropertyDescriptor.Builder()
.name("Replacement Value")
.description("The value to insert using the 'Replacement Strategy'. Using \"Regex Replace\" back-references to Regular Expression capturing groups "
+ "are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value.")
+ "are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value. "
+ "Back References may also be referenced using the Expression Language, as '$1', '$2', etc. The single-tick marks MUST be included, as these variables are "
+ "not \"Standard\" attribute names (attribute names must be quoted unless they contain only numbers, letters, and _).")
.required(true)
.defaultValue(DEFAULT_REPLACEMENT_VALUE)
.addValidator(Validator.VALID)
@ -124,11 +133,11 @@ public class ReplaceText extends AbstractProcessor {
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Buffer Size")
.description("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to "
+ "apply the regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, "
+ "apply the replacement. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, "
+ "the FlowFile will be routed to 'failure'. "
+ "In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value "
+ "of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. "
+ "This value is ignored and the buffer is not used if 'Regular Expression' is set to '.*'")
+ "This value is ignored if the <Replacement Strategy> property is set to one of: Append, Prepend, Always Replace")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
@ -136,7 +145,7 @@ public class ReplaceText extends AbstractProcessor {
public static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder()
.name("Replacement Strategy")
.description("The strategy for how and what to replace within the FlowFile's text content.")
.allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE)
.allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE)
.defaultValue(REGEX_REPLACE.getValue())
.required(true)
.build();
@ -210,18 +219,6 @@ public class ReplaceText extends AbstractProcessor {
return;
}
final AttributeValueDecorator escapeBackRefDecorator = new AttributeValueDecorator() {
@Override
public String decorate(final String attributeValue) {
return attributeValue.replace("$", "\\$");
}
};
final String regexValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions().getValue();
final int numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount();
final boolean skipBuffer = ".*".equals(unsubstitutedRegex);
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
@ -233,70 +230,44 @@ public class ReplaceText extends AbstractProcessor {
buffer = null;
}
ReplacementStrategyExecutor replacementStrategyExecutor;
switch (replacementStrategy) {
case prependValue:
replacementStrategyExecutor = new PrependReplace();
break;
case appendValue:
replacementStrategyExecutor = new AppendReplace();
break;
case regexReplaceValue:
// for backward compatibility - if replacement regex is ".*" then we will simply always replace the content.
if (context.getProperty(SEARCH_VALUE).getValue().equals(".*")) {
replacementStrategyExecutor = new AlwaysReplace();
} else {
replacementStrategyExecutor = new RegexReplace(buffer, context);
}
break;
case literalReplaceValue:
replacementStrategyExecutor = new LiteralReplace(buffer);
break;
case alwaysReplace:
replacementStrategyExecutor = new AlwaysReplace();
break;
default:
throw new AssertionError();
}
for (FlowFile flowFile : flowFiles) {
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
if (flowFile.getSize() > maxBufferSize && !skipBuffer) {
if (flowFile.getSize() > maxBufferSize && replacementStrategyExecutor.isAllDataBufferedForEntireText()) {
session.transfer(flowFile, REL_FAILURE);
continue;
}
}
String replacement;
if (!replacementStrategy.equals(regexReplaceValue)) {
replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
} else {
replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, escapeBackRefDecorator).getValue();
final Matcher backRefMatcher = backReferencePattern.matcher(replacement);
while (backRefMatcher.find()) {
final String backRefNum = backRefMatcher.group(1);
if (backRefNum.startsWith("0")) {
continue;
}
final int originalBackRefIndex = Integer.parseInt(backRefNum);
int backRefIndex = originalBackRefIndex;
// if we have a replacement value like $123, and we have less than 123 capturing groups, then
// we want to truncate the 3 and use capturing group 12; if we have less than 12 capturing groups,
// then we want to truncate the 2 and use capturing group 1; if we don't have a capturing group then
// we want to truncate the 1 and get 0.
while (backRefIndex > numCapturingGroups && backRefIndex >= 10) {
backRefIndex /= 10;
}
if (backRefIndex > numCapturingGroups) {
final StringBuilder sb = new StringBuilder(replacement.length() + 1);
final int groupStart = backRefMatcher.start(1);
sb.append(replacement.substring(0, groupStart - 1));
sb.append("\\");
sb.append(replacement.substring(groupStart - 1));
replacement = sb.toString();
}
}
}
ReplacementStrategyExecutor replacementStrategyExecutor;
switch (replacementStrategy) {
case prependValue:
replacementStrategyExecutor = new PrependReplace();
break;
case appendValue:
replacementStrategyExecutor = new AppendReplace();
break;
case regexReplaceValue:
replacementStrategyExecutor = new RegexReplace(buffer);
break;
case literalReplaceValue:
replacementStrategyExecutor = new LiteralReplace(buffer);
break;
default:
throw new AssertionError();
}
final StopWatch stopWatch = new StopWatch(true);
flowFile = replacementStrategyExecutor.replace(flowFile, session, context, replacement, evaluateMode,
charset, maxBufferSize, skipBuffer);
flowFile = replacementStrategyExecutor.replace(flowFile, session, context, evaluateMode, charset, maxBufferSize);
logger.info("Transferred {} to 'success'", new Object[] {flowFile});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
@ -304,11 +275,106 @@ public class ReplaceText extends AbstractProcessor {
}
}
private static class PrependReplace implements ReplacementStrategyExecutor {
// If we find a back reference that is not valid, then we will treat it as a literal string. For example, if we have 3 capturing
// groups and the Replacement Value has the value is "I owe $8 to him", then we want to treat the $8 as a literal "$8", rather
// than attempting to use it as a back reference.
private static String escapeLiteralBackReferences(final String unescaped, final int numCapturingGroups) {
if (numCapturingGroups == 0) {
return unescaped;
}
String value = unescaped;
final Matcher backRefMatcher = backReferencePattern.matcher(value);
while (backRefMatcher.find()) {
final String backRefNum = backRefMatcher.group(1);
if (backRefNum.startsWith("0")) {
continue;
}
final int originalBackRefIndex = Integer.parseInt(backRefNum);
int backRefIndex = originalBackRefIndex;
// if we have a replacement value like $123, and we have less than 123 capturing groups, then
// we want to truncate the 3 and use capturing group 12; if we have less than 12 capturing groups,
// then we want to truncate the 2 and use capturing group 1; if we don't have a capturing group then
// we want to truncate the 1 and get 0.
while (backRefIndex > numCapturingGroups && backRefIndex >= 10) {
backRefIndex /= 10;
}
if (backRefIndex > numCapturingGroups) {
final StringBuilder sb = new StringBuilder(value.length() + 1);
final int groupStart = backRefMatcher.start(1);
sb.append(value.substring(0, groupStart - 1));
sb.append("\\");
sb.append(value.substring(groupStart - 1));
value = sb.toString();
}
}
return value;
}
private static class AlwaysReplace implements ReplacementStrategyExecutor {
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
final StringBuilder lineEndingBuilder = new StringBuilder(2);
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
out.write(replacementValue.getBytes(charset));
}
});
} else {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) {
String line;
while ((line = br.readLine()) != null) {
// We need to determine what line ending was used and use that after our replacement value.
lineEndingBuilder.setLength(0);
for (int i = line.length() - 1; i >= 0; i--) {
final char c = line.charAt(i);
if (c == '\r' || c == '\n') {
lineEndingBuilder.append(c);
} else {
break;
}
}
bw.write(replacementValue);
// Preserve original line endings. Reverse string because we iterated over original line ending in reverse order, appending to builder.
// So if builder has multiple characters, they are now reversed from the original string's ordering.
bw.write(lineEndingBuilder.reverse().toString());
}
}
}
});
}
return flowFile;
}
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String replacementValue, final String evaluateMode,
final Charset charset, final int maxBufferSize, final boolean skipBuffer) {
public boolean isAllDataBufferedForEntireText() {
return false;
}
}
private static class PrependReplace implements ReplacementStrategyExecutor {
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
@ -334,13 +400,19 @@ public class ReplaceText extends AbstractProcessor {
}
return flowFile;
}
@Override
public boolean isAllDataBufferedForEntireText() {
return false;
}
}
private static class AppendReplace implements ReplacementStrategyExecutor {
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String replacementValue, final String evaluateMode,
final Charset charset, final int maxBufferSize, final boolean skipBuffer) {
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
@ -387,81 +459,121 @@ public class ReplaceText extends AbstractProcessor {
}
return flowFile;
}
@Override
public boolean isAllDataBufferedForEntireText() {
return false;
}
}
private static class RegexReplace implements ReplacementStrategyExecutor {
private final byte[] buffer;
private final int numCapturingGroups;
private final Map<String, String> additionalAttrs;
public RegexReplace(final byte[] buffer) {
private static final AttributeValueDecorator escapeBackRefDecorator = new AttributeValueDecorator() {
@Override
public String decorate(final String attributeValue) {
return attributeValue.replace("$", "\\$");
}
};
public RegexReplace(final byte[] buffer, final ProcessContext context) {
this.buffer = buffer;
final String regexValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions().getValue();
numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount();
additionalAttrs = new HashMap<>(numCapturingGroups);
}
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String replacementValue, final String evaluateMode,
final Charset charset, final int maxBufferSize, final boolean skipBuffer) {
final String replacementFinal = replacementValue.replaceAll("(\\$\\D)", "\\\\$1");
public FlowFile replace(final FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() {
@Override
public String decorate(final String attributeValue) {
return Pattern.quote(attributeValue);
}
};
final String searchRegex = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
final Pattern searchPattern = Pattern.compile(searchRegex);
// always match; just overwrite value with the replacement value; this optimization prevents us
// from reading the file at all.
if (skipBuffer) {
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new OutputStreamCallback() {
final int flowFileSize = (int) flowFile.getSize();
FlowFile updatedFlowFile;
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
}
});
final String contentString = new String(buffer, 0, flowFileSize, charset);
additionalAttrs.clear();
final Matcher matcher = searchPattern.matcher(contentString);
if (matcher.find()) {
for (int i = 1; i <= matcher.groupCount(); i++) {
final String groupValue = matcher.group(i);
additionalAttrs.put("$" + i, groupValue);
}
String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue();
replacement = escapeLiteralBackReferences(replacement, numCapturingGroups);
// If we have a $ followed by anything other than a number, then escape it. E.g., $d becomes \$d so that it can be used as a literal in a regex.
final String replacementFinal = replacement.replaceAll("(\\$\\D)", "\\\\$1");
final String updatedValue = contentString.replaceAll(searchRegex, replacementFinal);
updatedFlowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(replacementFinal.getBytes(charset));
}
});
} else {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) {
while (null != br.readLine()) {
bw.write(replacementFinal);
}
}
}
});
}
} else {
final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() {
@Override
public String decorate(final String attributeValue) {
return Pattern.quote(attributeValue);
}
};
final String searchRegex = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
final int flowFileSize = (int) flowFile.getSize();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
final String contentString = new String(buffer, 0, flowFileSize, charset);
final String updatedValue = contentString.replaceAll(searchRegex, replacementFinal);
out.write(updatedValue.getBytes(charset));
}
});
} else {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
String oneLine;
while (null != (oneLine = br.readLine())) {
// If no match, just return the original. No need to write out any content.
return flowFile;
}
} else {
updatedFlowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
String oneLine;
while (null != (oneLine = br.readLine())) {
additionalAttrs.clear();
final Matcher matcher = searchPattern.matcher(oneLine);
if (matcher.find()) {
for (int i = 1; i <= matcher.groupCount(); i++) {
final String groupValue = matcher.group(i);
additionalAttrs.put("$" + i, groupValue);
}
String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue();
replacement = escapeLiteralBackReferences(replacement, numCapturingGroups);
// If we have a $ followed by anything other than a number, then escape it. E.g., $d becomes \$d so that it can be used as a literal in a regex.
final String replacementFinal = replacement.replaceAll("(\\$\\D)", "\\\\$1");
final String updatedValue = oneLine.replaceAll(searchRegex, replacementFinal);
bw.write(updatedValue);
} else {
// No match. Just write out the line as it was.
bw.write(oneLine);
}
}
}
});
}
}
});
}
return flowFile;
return updatedFlowFile;
}
@Override
public boolean isAllDataBufferedForEntireText() {
return true;
}
}
@ -473,8 +585,10 @@ public class ReplaceText extends AbstractProcessor {
}
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String replacementValue, final String evaluateMode,
final Charset charset, final int maxBufferSize, final boolean skipBuffer) {
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() {
@Override
public String decorate(final String attributeValue) {
@ -484,7 +598,6 @@ public class ReplaceText extends AbstractProcessor {
final String searchValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
final int flowFileSize = (int) flowFile.getSize();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() {
@ -515,9 +628,16 @@ public class ReplaceText extends AbstractProcessor {
}
return flowFile;
}
@Override
public boolean isAllDataBufferedForEntireText() {
return true;
}
}
private interface ReplacementStrategyExecutor {
FlowFile replace(FlowFile flowFile, ProcessSession session, ProcessContext context, String replacement, String evaluateMode, Charset charset, int maxBufferSize, boolean skipBuffer);
FlowFile replace(FlowFile flowFile, ProcessSession session, ProcessContext context, String evaluateMode, Charset charset, int maxBufferSize);
boolean isAllDataBufferedForEntireText();
}
}

View File

@ -914,13 +914,13 @@ public class TestReplaceText {
final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "Good");
runner.enqueue(translateNewLines(Paths.get("src/test/resources/TestReplaceTextLineByLine/testFile.txt")), attributes);
runner.enqueue(Paths.get("src/test/resources/TestReplaceTextLineByLine/testFile.txt"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
out.assertContentEquals(translateNewLines(new File("src/test/resources/TestReplaceTextLineByLine/Good.txt")));
out.assertContentEquals("Good\nGood\nGood\nGood\nGood\nGood\nGood\nGood\nGood\nGood\nGood");
}
@Test
@ -939,7 +939,7 @@ public class TestReplaceText {
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
out.assertContentEquals("GoodGoodGood");
out.assertContentEquals("Good\r\nGood\r\nGood\r");
}
@Test
@ -984,17 +984,108 @@ public class TestReplaceText {
System.out.println(outContent);
Assert.assertTrue(outContent.equals("attribute header\n\nabc.txt\n\ndata header\n\nHello\n\n\nfooter\n"
+ "attribute header\n\nabc.txt\n\ndata header\n\nWorld!\n\nfooter\n"));
}
private byte[] translateNewLines(final File file) throws IOException {
@Test
public void testCapturingGroupInExpressionLanguage() {
final TestRunner runner = TestRunners.newTestRunner(new ReplaceText());
runner.setValidateExpressionUsage(false);
runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.LINE_BY_LINE);
runner.setProperty(ReplaceText.SEARCH_VALUE, "(.*?),(.*?),(\\d+.*)");
runner.setProperty(ReplaceText.REPLACEMENT_VALUE, "$1,$2,${ '$3':toDate('ddMMMyyyy'):format('yyyy/MM/dd') }");
final String csvIn =
"2006,10-01-2004,10may2004\n"
+ "2007,15-05-2006,10jun2005\r\n"
+ "2009,8-8-2008,10aug2008";
final String expectedCsvOut =
"2006,10-01-2004,2004/05/10\n"
+ "2007,15-05-2006,2005/06/10\r\n"
+ "2009,8-8-2008,2008/08/10";
runner.enqueue(csvIn.getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
out.assertContentEquals(expectedCsvOut);
}
@Test
public void testCapturingGroupInExpressionLanguage2() {
final TestRunner runner = TestRunners.newTestRunner(new ReplaceText());
runner.setValidateExpressionUsage(false);
runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.LINE_BY_LINE);
runner.setProperty(ReplaceText.SEARCH_VALUE, "(.*)/(.*?).jpg");
runner.setProperty(ReplaceText.REPLACEMENT_VALUE, "$1/${ '$2':substring(0,1) }.png");
final String csvIn =
"1,2,3,https://123.jpg,email@mydomain.com\n"
+ "3,2,1,https://321.jpg,other.email@mydomain.com";
final String expectedCsvOut =
"1,2,3,https://1.png,email@mydomain.com\n"
+ "3,2,1,https://3.png,other.email@mydomain.com";
runner.enqueue(csvIn.getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
out.assertContentEquals(expectedCsvOut);
}
@Test
public void testAlwaysReplaceEntireText() {
final TestRunner runner = TestRunners.newTestRunner(new ReplaceText());
runner.setValidateExpressionUsage(false);
runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.ENTIRE_TEXT);
runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, ReplaceText.ALWAYS_REPLACE);
runner.setProperty(ReplaceText.SEARCH_VALUE, "i do not exist anywhere in the text");
runner.setProperty(ReplaceText.REPLACEMENT_VALUE, "${filename}");
final Map<String, String> attributes = new HashMap<>();
attributes.put("filename", "abc.txt");
runner.enqueue("Hello\nWorld!".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
out.assertContentEquals("abc.txt");
}
@Test
public void testAlwaysReplaceLineByLine() {
final TestRunner runner = TestRunners.newTestRunner(new ReplaceText());
runner.setValidateExpressionUsage(false);
runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.LINE_BY_LINE);
runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, ReplaceText.ALWAYS_REPLACE);
runner.setProperty(ReplaceText.SEARCH_VALUE, "i do not exist anywhere in the text");
runner.setProperty(ReplaceText.REPLACEMENT_VALUE, "${filename}");
final Map<String, String> attributes = new HashMap<>();
attributes.put("filename", "abc.txt");
runner.enqueue("Hello\nWorld!\r\ntoday!\n".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
out.assertContentEquals("abc.txt\nabc.txt\r\nabc.txt\n");
}
private String translateNewLines(final File file) throws IOException {
return translateNewLines(file.toPath());
}
private byte[] translateNewLines(final Path path) throws IOException {
private String translateNewLines(final Path path) throws IOException {
final byte[] data = Files.readAllBytes(path);
final String text = new String(data, StandardCharsets.UTF_8);
return translateNewLines(text).getBytes(StandardCharsets.UTF_8);
return translateNewLines(text);
}
private String translateNewLines(final String text) {