From f378ee902127bd29b168c9bb15e991abe4eab0fa Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 4 Dec 2015 13:17:37 -0500 Subject: [PATCH] NIFI-1249: Allow Processors to add their own variables to those referencable by Expression Language. Make ReplaceText allow users to reference back-references of regex matches --- .../apache/nifi/components/PropertyValue.java | 99 ++++- .../apache/nifi/processor/ProcessContext.java | 7 + .../bootstrap/NotificationServiceManager.java | 5 +- .../NotificationServicePropertyValue.java | 120 ------ .../NotificationValidationContext.java | 4 +- nifi-commons/nifi-expression-language/pom.xml | 4 + .../language/EmptyPreparedQuery.java | 5 + .../language/InvalidPreparedQuery.java | 4 + .../expression/language/PreparedQuery.java | 1 + .../attribute/expression/language/Query.java | 14 +- .../language/StandardPreparedQuery.java | 6 + .../language}/StandardPropertyValue.java | 40 +- .../apache/nifi/util/MockProcessContext.java | 11 + .../apache/nifi/util/MockPropertyValue.java | 71 ++-- .../util/StandardProcessorTestRunner.java | 12 + .../java/org/apache/nifi/util/TestRunner.java | 19 +- .../mock/MockProcessContext.java | 5 +- .../impl/ClusteredReportingContext.java | 2 +- .../reporting/StandardReportingContext.java | 2 +- .../scheduling/ConnectableProcessContext.java | 28 +- .../service/StandardConfigurationContext.java | 2 +- .../processor/StandardProcessContext.java | 12 + .../processor/StandardSchedulingContext.java | 13 + .../processor/StandardValidationContext.java | 1 + .../processor/TestStandardPropertyValue.java | 3 +- .../web/controller/StandardSearchContext.java | 2 +- .../nifi/processors/standard/ReplaceText.java | 388 ++++++++++++------ .../processors/standard/TestReplaceText.java | 105 ++++- .../TestReplaceTextLineByLine/Good.txt | 1 - 29 files changed, 659 insertions(+), 327 deletions(-) delete mode 100644 nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServicePropertyValue.java rename {nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor => nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language}/StandardPropertyValue.java (76%) delete mode 100755 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestReplaceTextLineByLine/Good.txt diff --git a/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java b/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java index 1845ed224c..72dcc2a4c8 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.components; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.ControllerService; @@ -120,7 +121,7 @@ public interface PropertyValue { /** *

- * Replaces values in the Property Value using the Attribute Expression + * Replaces values in the Property Value using the NiFi Expression * Language; a PropertyValue with the new value is then returned, supporting * call chaining. *

@@ -128,14 +129,49 @@ public interface PropertyValue { * @return a PropertyValue with the new value is returned, supporting call * chaining * - * @throws ProcessException if the Query cannot be compiled or evaluating - * the query against the given attributes causes an Exception to be thrown + * @throws ProcessException if the Expression cannot be compiled or evaluating + * the Expression against the given attributes causes an Exception to be thrown */ public PropertyValue evaluateAttributeExpressions() throws ProcessException; /** *

- * Replaces values in the Property Value using the Attribute Expression + * Replaces values in the Property Value using the NiFi Expression Language; + * a PropertyValue with the new value is then returned, supporting call chaining. + *

+ * + * @param attributes a Map of attributes that the Expression can reference, in addition + * to JVM System Properties and Environmental Properties. + * + * @return a PropertyValue with the new value + * + * @throws ProcessException if the Expression cannot be compiled or evaluating the Expression against + * the given attributes causes an Exception to be thrown + */ + public PropertyValue evaluateAttributeExpressions(Map attributes) throws ProcessException; + + /** + *

+ * Replaces values in the Property Value using the NiFi Expression Language. + * The supplied decorator is then given a chance to decorate the + * value, and a PropertyValue with the new value is then returned, + * supporting call chaining. + *

+ * + * @param attributes a Map of attributes that the Expression can reference, in addition + * to JVM System Properties and Environmental Properties. + * @param decorator the decorator to use in order to update the values returned by the Expression Language + * + * @return a PropertyValue with the new value + * + * @throws ProcessException if the Expression cannot be compiled or evaluating the Expression against + * the given attributes causes an Exception to be thrown + */ + public PropertyValue evaluateAttributeExpressions(Map attributes, AttributeValueDecorator decorator) throws ProcessException; + + /** + *

+ * Replaces values in the Property Value using the NiFi Expression * Language; a PropertyValue with the new value is then returned, supporting * call chaining. *

@@ -144,14 +180,53 @@ public interface PropertyValue { * @return a PropertyValue with the new value is returned, supporting call * chaining * - * @throws ProcessException if the Query cannot be compiled or evaluating - * the query against the given attributes causes an Exception to be thrown + * @throws ProcessException if the Expression cannot be compiled or evaluating + * the Expression against the given attributes causes an Exception to be thrown */ public PropertyValue evaluateAttributeExpressions(FlowFile flowFile) throws ProcessException; /** *

- * Replaces values in the Property Value using the Attribute Expression + * Replaces values in the Property Value using the NiFi Expression + * Language; a PropertyValue with the new value is then returned, supporting + * call chaining. + *

+ * + * @param flowFile to evaluate attributes of + * @param additionalAttributes a Map of additional attributes that the Expression can reference. If entries in + * this Map conflict with entries in the FlowFile's attributes, the entries in this Map are given a higher priority. + * + * @return a PropertyValue with the new value is returned, supporting call + * chaining + * + * @throws ProcessException if the Expression cannot be compiled or evaluating + * the Expression against the given attributes causes an Exception to be thrown + */ + public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes) throws ProcessException; + + /** + *

+ * Replaces values in the Property Value using the NiFi Expression + * Language; a PropertyValue with the new value is then returned, supporting + * call chaining. + *

+ * + * @param flowFile to evaluate attributes of + * @param additionalAttributes a Map of additional attributes that the Expression can reference. If entries in + * this Map conflict with entries in the FlowFile's attributes, the entries in this Map are given a higher priority. + * @param decorator the decorator to use in order to update the values returned by the Expression Language + * + * @return a PropertyValue with the new value is returned, supporting call + * chaining + * + * @throws ProcessException if the Expression cannot be compiled or evaluating + * the Expression against the given attributes causes an Exception to be thrown + */ + public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator) throws ProcessException; + + /** + *

+ * Replaces values in the Property Value using the NiFi Expression * Language. The supplied decorator is then given a chance to decorate the * value, and a PropertyValue with the new value is then returned, * supporting call chaining. @@ -162,14 +237,14 @@ public interface PropertyValue { * @return a PropertyValue with the new value is then returned, supporting * call chaining * - * @throws ProcessException if the Query cannot be compiled or evaluating - * the query against the given attributes causes an Exception to be thrown + * @throws ProcessException if the Expression cannot be compiled or evaluating + * the Expression against the given attributes causes an Exception to be thrown */ public PropertyValue evaluateAttributeExpressions(AttributeValueDecorator decorator) throws ProcessException; /** *

- * Replaces values in the Property Value using the Attribute Expression + * Replaces values in the Property Value using the NiFi Expression * Language. The supplied decorator is then given a chance to decorate the * value, and a PropertyValue with the new value is then returned, * supporting call chaining. @@ -182,8 +257,8 @@ public interface PropertyValue { * @return a PropertyValue with the new value is then returned, supporting * call chaining * - * @throws ProcessException if the Query cannot be compiled or evaluating - * the query against the given attributes causes an Exception to be thrown + * @throws ProcessException if the Expression cannot be compiled or evaluating + * the Expression against the given attributes causes an Exception to be thrown */ public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, AttributeValueDecorator decorator) throws ProcessException; } diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java index 7488b2d9a2..cf1bb6cdfa 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java @@ -147,4 +147,11 @@ public interface ProcessContext { */ boolean hasConnection(Relationship relationship); + /** + * @param property the Property whose value should be inspected to determined if it contains an Expression Language Expression + * @return true if the value of the given Property contains a NiFi Expression + * Language Expression, false if it does not. Note that false will be returned if the Property Descriptor + * does not allow the Expression Language, even if a seemingly valid Expression is present in the value. + */ + boolean isExpressionLanguagePresent(PropertyDescriptor property); } diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java index 2fda0222a5..21d8e82fe9 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java @@ -36,6 +36,7 @@ import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.bootstrap.notification.NotificationContext; import org.apache.nifi.bootstrap.notification.NotificationInitializationContext; import org.apache.nifi.bootstrap.notification.NotificationService; @@ -246,7 +247,7 @@ public class NotificationServiceManager { configuredValue = fullPropDescriptor.getDefaultValue(); } - return new NotificationServicePropertyValue(configuredValue); + return new StandardPropertyValue(configuredValue, null); } @Override @@ -363,7 +364,7 @@ public class NotificationServiceManager { value = descriptor.getDefaultValue(); } - return new NotificationServicePropertyValue(value); + return new StandardPropertyValue(value, null); } @Override diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServicePropertyValue.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServicePropertyValue.java deleted file mode 100644 index 582b342e66..0000000000 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServicePropertyValue.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.bootstrap; - -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.attribute.expression.language.PreparedQuery; -import org.apache.nifi.attribute.expression.language.Query; -import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.expression.AttributeValueDecorator; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.util.FormatUtils; - -public class NotificationServicePropertyValue implements PropertyValue { - private final String rawValue; - private final PreparedQuery preparedQuery; - - public NotificationServicePropertyValue(final String rawValue) { - this.rawValue = rawValue; - this.preparedQuery = Query.prepare(rawValue); - } - - @Override - public String getValue() { - return rawValue; - } - - @Override - public Integer asInteger() { - return (rawValue == null) ? null : Integer.parseInt(rawValue.trim()); - } - - @Override - public Long asLong() { - return (rawValue == null) ? null : Long.parseLong(rawValue.trim()); - } - - @Override - public Boolean asBoolean() { - return (rawValue == null) ? null : Boolean.parseBoolean(rawValue.trim()); - } - - @Override - public Float asFloat() { - return (rawValue == null) ? null : Float.parseFloat(rawValue.trim()); - } - - @Override - public Double asDouble() { - return (rawValue == null) ? null : Double.parseDouble(rawValue.trim()); - } - - @Override - public Long asTimePeriod(final TimeUnit timeUnit) { - return (rawValue == null) ? null : FormatUtils.getTimeDuration(rawValue.trim(), timeUnit); - } - - @Override - public Double asDataSize(final DataUnit dataUnit) { - return rawValue == null ? null : DataUnit.parseDataSize(rawValue.trim(), dataUnit); - } - - @Override - public PropertyValue evaluateAttributeExpressions() throws ProcessException { - return evaluateAttributeExpressions((AttributeValueDecorator) null); - } - - @Override - public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException { - return new NotificationServicePropertyValue(preparedQuery.evaluateExpressions((FlowFile) null, decorator)); - } - - @Override - public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException { - throw new UnsupportedOperationException(); - } - - @Override - public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException { - throw new UnsupportedOperationException(); - } - - @Override - public String toString() { - return rawValue; - } - - @Override - public ControllerService asControllerService() { - throw new UnsupportedOperationException(); - } - - @Override - public T asControllerService(final Class serviceType) throws IllegalArgumentException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isSet() { - return rawValue != null; - } - -} diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java index a02b108310..49afc1622f 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java @@ -23,7 +23,7 @@ import java.util.Map; import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.Query.Range; import org.apache.nifi.attribute.expression.language.StandardExpressionLanguageCompiler; -import org.apache.nifi.bootstrap.NotificationServicePropertyValue; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; @@ -48,7 +48,7 @@ public class NotificationValidationContext implements ValidationContext { @Override public PropertyValue newPropertyValue(final String rawValue) { - return new NotificationServicePropertyValue(rawValue); + return new StandardPropertyValue(rawValue, null); } @Override diff --git a/nifi-commons/nifi-expression-language/pom.xml b/nifi-commons/nifi-expression-language/pom.xml index 2dc8839022..fc3a4a9f73 100644 --- a/nifi-commons/nifi-expression-language/pom.xml +++ b/nifi-commons/nifi-expression-language/pom.xml @@ -52,5 +52,9 @@ org.apache.nifi nifi-api + + org.apache.nifi + nifi-utils + diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java index 81da47ee31..d85c9ef17c 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java @@ -59,4 +59,9 @@ public class EmptyPreparedQuery implements PreparedQuery { public String evaluateExpressions(Map attributes, AttributeValueDecorator decorator) throws ProcessException { return value; } + + @Override + public String evaluateExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator) throws ProcessException { + return value; + } } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java index a29e7922da..aa2428d9c1 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java @@ -69,4 +69,8 @@ public class InvalidPreparedQuery implements PreparedQuery { throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation); } + @Override + public String evaluateExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator) throws ProcessException { + throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation); + } } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java index 0d1b2c7b79..ad9225d96f 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java @@ -36,4 +36,5 @@ public interface PreparedQuery { String evaluateExpressions(Map attributes, AttributeValueDecorator decorator) throws ProcessException; + String evaluateExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator) throws ProcessException; } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Query.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Query.java index 496d80cf10..2c27e4d2d3 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Query.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Query.java @@ -19,6 +19,7 @@ package org.apache.nifi.attribute.expression.language; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -416,7 +417,12 @@ public class Query { } static Map createExpressionMap(final FlowFile flowFile) { - final Map attributeMap = flowFile == null ? new HashMap() : flowFile.getAttributes(); + return createExpressionMap(flowFile, null); + } + + static Map createExpressionMap(final FlowFile flowFile, final Map additionalAttributes) { + final Map attributeMap = flowFile == null ? Collections. emptyMap() : flowFile.getAttributes(); + final Map additionalOrEmpty = additionalAttributes == null ? Collections. emptyMap() : additionalAttributes; final Map envMap = System.getenv(); final Map sysProps = System.getProperties(); @@ -428,13 +434,13 @@ public class Query { flowFileProps.put("lineageStartDate", String.valueOf(flowFile.getLineageStartDate())); } - return wrap(attributeMap, flowFileProps, envMap, sysProps); + return wrap(additionalOrEmpty, attributeMap, flowFileProps, envMap, sysProps); } - private static Map wrap(final Map attributes, final Map flowFileProps, + private static Map wrap(final Map additional, final Map attributes, final Map flowFileProps, final Map env, final Map sysProps) { @SuppressWarnings("rawtypes") - final Map[] maps = new Map[]{attributes, flowFileProps, env, sysProps}; + final Map[] maps = new Map[] {additional, attributes, flowFileProps, env, sysProps}; return new Map() { @Override diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java index 0affb7f5c5..b81a5837d3 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java @@ -59,6 +59,12 @@ public class StandardPreparedQuery implements PreparedQuery { return sb.toString(); } + @Override + public String evaluateExpressions(final FlowFile flowFile, final Map additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException { + final Map expressionMap = Query.createExpressionMap(flowFile, additionalAttributes); + return evaluateExpressions(expressionMap, decorator); + } + @Override public String evaluateExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException { final Map expressionMap = Query.createExpressionMap(flowFile); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardPropertyValue.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java similarity index 76% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardPropertyValue.java rename to nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java index acb86aa06d..a5c336aaa5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardPropertyValue.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java @@ -14,21 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processor; +package org.apache.nifi.attribute.expression.language; +import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.nifi.attribute.expression.language.PreparedQuery; -import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.expression.AttributeValueDecorator; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.FormatUtils; -public final class StandardPropertyValue implements PropertyValue { +public class StandardPropertyValue implements PropertyValue { private final String rawValue; private final ControllerServiceLookup serviceLookup; @@ -95,26 +95,46 @@ public final class StandardPropertyValue implements PropertyValue { @Override public PropertyValue evaluateAttributeExpressions() throws ProcessException { - return evaluateAttributeExpressions(null, null); + return evaluateAttributeExpressions(null, null, null); + } + + @Override + public PropertyValue evaluateAttributeExpressions(final Map attributes) throws ProcessException { + return evaluateAttributeExpressions(null, attributes, null); + } + + @Override + public PropertyValue evaluateAttributeExpressions(final Map attributes, final AttributeValueDecorator decorator) throws ProcessException { + return evaluateAttributeExpressions(null, attributes, decorator); } @Override public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException { - return evaluateAttributeExpressions(null, decorator); + return evaluateAttributeExpressions(null, null, decorator); } @Override public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException { - return evaluateAttributeExpressions(flowFile, null); + return evaluateAttributeExpressions(flowFile, null, null); + } + + @Override + public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map additionalAttributes) throws ProcessException { + return evaluateAttributeExpressions(flowFile, additionalAttributes, null); } @Override public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException { + return evaluateAttributeExpressions(flowFile, null, decorator); + } + + @Override + public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException { if (rawValue == null || preparedQuery == null) { return this; } - return new StandardPropertyValue(preparedQuery.evaluateExpressions(flowFile, decorator), serviceLookup, null); + return new StandardPropertyValue(preparedQuery.evaluateExpressions(flowFile, additionalAttributes, decorator), serviceLookup, null); } @Override @@ -124,7 +144,7 @@ public final class StandardPropertyValue implements PropertyValue { @Override public ControllerService asControllerService() { - if (rawValue == null || rawValue.equals("")) { + if (rawValue == null || rawValue.equals("") || serviceLookup == null) { return null; } @@ -136,7 +156,7 @@ public final class StandardPropertyValue implements PropertyValue { if (!serviceType.isInterface()) { throw new IllegalArgumentException("ControllerServices may be referenced only via their interfaces; " + serviceType + " is not an interface"); } - if (rawValue == null || rawValue.equals("")) { + if (rawValue == null || rawValue.equals("") || serviceLookup == null) { return null; } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index 49021d17af..e8e4dd56b3 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.attribute.expression.language.Query.Range; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -331,4 +333,13 @@ public class MockProcessContext extends MockControllerServiceLookup implements S } } + @Override + public boolean isExpressionLanguagePresent(final PropertyDescriptor property) { + if (property == null || !property.isExpressionLanguageSupported()) { + return false; + } + + final List elRanges = Query.extractExpressionRanges(getProperty(property).getValue()); + return (elRanges != null && !elRanges.isEmpty()); + } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java index 12436d4c29..090a8eb797 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java @@ -16,9 +16,10 @@ */ package org.apache.nifi.util; +import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.controller.ControllerService; @@ -29,11 +30,11 @@ import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.exception.ProcessException; public class MockPropertyValue implements PropertyValue { - private final String rawValue; private final Boolean expectExpressions; private final ControllerServiceLookup serviceLookup; private final PropertyDescriptor propertyDescriptor; + private final PropertyValue stdPropValue; private boolean expressionsEvaluated = false; public MockPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup) { @@ -41,12 +42,20 @@ public class MockPropertyValue implements PropertyValue { } public MockPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup, final PropertyDescriptor propertyDescriptor) { + this(rawValue, serviceLookup, propertyDescriptor, false); + } + + private MockPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup, final PropertyDescriptor propertyDescriptor, final boolean alreadyEvaluated) { + this.stdPropValue = new StandardPropertyValue(rawValue, serviceLookup); + this.rawValue = rawValue; this.serviceLookup = serviceLookup; this.expectExpressions = propertyDescriptor == null ? null : propertyDescriptor.isExpressionLanguageSupported(); this.propertyDescriptor = propertyDescriptor; + this.expressionsEvaluated = alreadyEvaluated; } + private void ensureExpressionsEvaluated() { if (Boolean.TRUE.equals(expectExpressions) && !expressionsEvaluated) { throw new IllegalStateException("Attempting to retrieve value of " + propertyDescriptor @@ -59,49 +68,49 @@ public class MockPropertyValue implements PropertyValue { @Override public String getValue() { ensureExpressionsEvaluated(); - return rawValue; + return stdPropValue.getValue(); } @Override public Integer asInteger() { ensureExpressionsEvaluated(); - return (rawValue == null) ? null : Integer.parseInt(rawValue.trim()); + return stdPropValue.asInteger(); } @Override public Long asLong() { ensureExpressionsEvaluated(); - return (rawValue == null) ? null : Long.parseLong(rawValue.trim()); + return stdPropValue.asLong(); } @Override public Boolean asBoolean() { ensureExpressionsEvaluated(); - return (rawValue == null) ? null : Boolean.parseBoolean(rawValue.trim()); + return stdPropValue.asBoolean(); } @Override public Float asFloat() { ensureExpressionsEvaluated(); - return (rawValue == null) ? null : Float.parseFloat(rawValue.trim()); + return stdPropValue.asFloat(); } @Override public Double asDouble() { ensureExpressionsEvaluated(); - return (rawValue == null) ? null : Double.parseDouble(rawValue.trim()); + return stdPropValue.asDouble(); } @Override public Long asTimePeriod(final TimeUnit timeUnit) { ensureExpressionsEvaluated(); - return (rawValue == null) ? null : FormatUtils.getTimeDuration(rawValue.trim(), timeUnit); + return stdPropValue.asTimePeriod(timeUnit); } @Override public Double asDataSize(final DataUnit dataUnit) { ensureExpressionsEvaluated(); - return rawValue == null ? null : DataUnit.parseDataSize(rawValue.trim(), dataUnit); + return stdPropValue.asDataSize(dataUnit); } private void markEvaluated() { @@ -115,38 +124,48 @@ public class MockPropertyValue implements PropertyValue { @Override public PropertyValue evaluateAttributeExpressions() throws ProcessException { - markEvaluated(); - if (rawValue == null) { - return this; - } - return evaluateAttributeExpressions(null, null); + return evaluateAttributeExpressions(null, null, null); } @Override public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException { - markEvaluated(); - if (rawValue == null) { - return this; - } - return evaluateAttributeExpressions(null, decorator); + return evaluateAttributeExpressions(null, null, decorator); } @Override public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException { - markEvaluated(); - if (rawValue == null) { - return this; - } - return evaluateAttributeExpressions(flowFile, null); + return evaluateAttributeExpressions(flowFile, null, null); } @Override public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException { + return evaluateAttributeExpressions(flowFile, null, decorator); + } + + @Override + public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map additionalAttributes) throws ProcessException { + return evaluateAttributeExpressions(flowFile, additionalAttributes, null); + } + + @Override + public PropertyValue evaluateAttributeExpressions(final Map attributes) throws ProcessException { + return evaluateAttributeExpressions(null, attributes, null); + } + + @Override + public PropertyValue evaluateAttributeExpressions(final Map attributes, final AttributeValueDecorator decorator) throws ProcessException { + return evaluateAttributeExpressions(null, attributes, decorator); + } + + @Override + public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException { markEvaluated(); if (rawValue == null) { return this; } - return new MockPropertyValue(Query.evaluateExpressions(rawValue, flowFile, decorator), serviceLookup); + + final PropertyValue newValue = stdPropValue.evaluateAttributeExpressions(flowFile, additionalAttributes, decorator); + return new MockPropertyValue(newValue.getValue(), serviceLookup, propertyDescriptor, true); } @Override diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 2f384ba4d7..925f0d8f2c 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -24,6 +24,7 @@ import java.io.InputStream; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -384,11 +385,22 @@ public class StandardProcessorTestRunner implements TestRunner { enqueue(data, new HashMap()); } + @Override + public void enqueue(final String data) { + enqueue(data.getBytes(StandardCharsets.UTF_8), Collections. emptyMap()); + } + @Override public void enqueue(final byte[] data, final Map attributes) { enqueue(new ByteArrayInputStream(data), attributes); } + @Override + public void enqueue(final String data, final Map attributes) { + enqueue(data.getBytes(StandardCharsets.UTF_8), attributes); + } + + @Override public void enqueue(final InputStream data) { enqueue(data, new HashMap()); diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index b1e7c8c7a7..6c8f192b35 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -363,16 +363,33 @@ public interface TestRunner { */ void enqueue(byte[] data); + /** + * Creates a FlowFile with the content set to the given string (in UTF-8 format), with no attributes, + * and adds this FlowFile to the Processor's Input Queue + * + * @param data to enqueue + */ + void enqueue(String data); + /** * Copies the content from the given byte array into memory and creates a * FlowFile from this content with the given attributes and adds this * FlowFile to the Processor's Input Queue * * @param data to enqueue - * @param attributes to use for enqueued items + * @param attributes to use for enqueued item */ void enqueue(byte[] data, Map attributes); + /** + * Creates a FlowFile with the content set to the given string (in UTF-8 format), with the given attributes, + * and adds this FlowFile to the Processor's Input Queue + * + * @param data to enqueue + * @param attributes to use for enqueued item + */ + void enqueue(String data, Map attributes); + /** * Reads the content from the given {@link InputStream} into memory and * creates a FlowFile from this content with no attributes and adds this diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java index c5ec0e2dfd..edf04751da 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java @@ -98,5 +98,8 @@ public class MockProcessContext implements ProcessContext { return false; } - + @Override + public boolean isExpressionLanguagePresent(PropertyDescriptor property) { + return false; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java index c6624cc99c..4a85b5bf7e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.controller.ControllerService; @@ -32,7 +33,6 @@ import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.events.BulletinFactory; -import org.apache.nifi.processor.StandardPropertyValue; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.ComponentType; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java index 2536d6f8ca..a4d337f622 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java @@ -23,6 +23,7 @@ import java.util.Set; import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.connectable.Connectable; @@ -32,7 +33,6 @@ import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.processor.StandardPropertyValue; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.EventAccess; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java index 08e25040b4..c68c78d05f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java @@ -59,6 +59,9 @@ public class ConnectableProcessContext implements ProcessContext { @Override public PropertyValue getProperty(final String propertyName) { + // None of the connectable components other than Processor's will ever need to evaluate these. + // Since Processors use a different implementation of ProcessContext all together, we will just + // return null for all values return new PropertyValue() { @Override public String getValue() { @@ -134,6 +137,26 @@ public class ConnectableProcessContext implements ProcessContext { public boolean isSet() { return false; } + + @Override + public PropertyValue evaluateAttributeExpressions(Map attributes) throws ProcessException { + return null; + } + + @Override + public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes) throws ProcessException { + return null; + } + + @Override + public PropertyValue evaluateAttributeExpressions(Map attributes, AttributeValueDecorator decorator) throws ProcessException { + return null; + } + + @Override + public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator) throws ProcessException { + return null; + } }; } @@ -208,5 +231,8 @@ public class ConnectableProcessContext implements ProcessContext { return connections != null && !connections.isEmpty(); } - + @Override + public boolean isExpressionLanguagePresent(PropertyDescriptor property) { + return false; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java index f57dc7ccc3..d57e61f0ca 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java @@ -22,12 +22,12 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerServiceLookup; -import org.apache.nifi.processor.StandardPropertyValue; import org.apache.nifi.util.FormatUtils; public class StandardConfigurationContext implements ConfigurationContext { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index a6302ca8fd..8e52a46d95 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@ -19,11 +19,14 @@ package org.apache.nifi.processor; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.attribute.expression.language.Query.Range; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.connectable.Connection; @@ -196,4 +199,13 @@ public class StandardProcessContext implements ProcessContext, ControllerService return connections != null && !connections.isEmpty(); } + @Override + public boolean isExpressionLanguagePresent(final PropertyDescriptor property) { + if (property == null || !property.isExpressionLanguageSupported()) { + return false; + } + + final List elRanges = Query.extractExpressionRanges(getProperty(property).getValue()); + return (elRanges != null && !elRanges.isEmpty()); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java index a3c2a5debd..8c903ee3d9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java @@ -16,9 +16,12 @@ */ package org.apache.nifi.processor; +import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.attribute.expression.language.Query.Range; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.controller.ControllerServiceLookup; @@ -126,4 +129,14 @@ public class StandardSchedulingContext implements SchedulingContext { public boolean hasConnection(Relationship relationship) { return processContext.hasConnection(relationship); } + + @Override + public boolean isExpressionLanguagePresent(PropertyDescriptor property) { + if (property == null || !property.isExpressionLanguageSupported()) { + return false; + } + + final List elRanges = Query.extractExpressionRanges(getProperty(property).getValue()); + return (elRanges != null && !elRanges.isEmpty()); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java index e627dd33e1..c24e14665a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java @@ -26,6 +26,7 @@ import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.Query.Range; import org.apache.nifi.attribute.expression.language.StandardExpressionLanguageCompiler; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java index be40e90e41..cebedaff21 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.processor; -import org.apache.nifi.processor.StandardPropertyValue; - import static org.junit.Assert.assertEquals; import java.util.Calendar; @@ -25,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/StandardSearchContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/StandardSearchContext.java index ddb53506c5..f6730757c4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/StandardSearchContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/StandardSearchContext.java @@ -18,11 +18,11 @@ package org.apache.nifi.web.controller; import java.util.Map; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ProcessorNode; -import org.apache.nifi.processor.StandardPropertyValue; import org.apache.nifi.search.SearchContext; /** diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java index fb51d45381..3cb7edae20 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java @@ -25,17 +25,19 @@ import java.io.OutputStreamWriter; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -53,6 +55,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.FlowFileFilters; @@ -77,7 +80,8 @@ public class ReplaceText extends AbstractProcessor { public static final String appendValue = "Append"; public static final String regexReplaceValue = "Regex Replace"; public static final String literalReplaceValue = "Literal Replace"; - private final Pattern backReferencePattern = Pattern.compile("\\$(\\d+)"); + public static final String alwaysReplace = "Always Replace"; + private static final Pattern backReferencePattern = Pattern.compile("\\$(\\d+)"); private static final String DEFAULT_REGEX = "(?s:^.*$)"; private static final String DEFAULT_REPLACEMENT_VALUE = "$1"; @@ -95,6 +99,9 @@ public class ReplaceText extends AbstractProcessor { "Interpret the Search Value as a Regular Expression and replace all matches with the Replacement Value. The Replacement Value may reference Capturing Groups used " + "in the Search Value by using a dollar-sign followed by the Capturing Group number, such as $1 or $2. If the Search Value is set to .* then everything is replaced without " + "even evaluating the Regular Expression."); + static final AllowableValue ALWAYS_REPLACE = new AllowableValue(alwaysReplace, alwaysReplace, + "Always replaces the entire line or the entire contents of the FlowFile (depending on the value of the property) and does not bother searching " + + "for any value. When this strategy is chosen, the property is ignored."); public static final PropertyDescriptor SEARCH_VALUE = new PropertyDescriptor.Builder() .name("Regular Expression") @@ -108,7 +115,9 @@ public class ReplaceText extends AbstractProcessor { public static final PropertyDescriptor REPLACEMENT_VALUE = new PropertyDescriptor.Builder() .name("Replacement Value") .description("The value to insert using the 'Replacement Strategy'. Using \"Regex Replace\" back-references to Regular Expression capturing groups " - + "are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value.") + + "are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value. " + + "Back References may also be referenced using the Expression Language, as '$1', '$2', etc. The single-tick marks MUST be included, as these variables are " + + "not \"Standard\" attribute names (attribute names must be quoted unless they contain only numbers, letters, and _).") .required(true) .defaultValue(DEFAULT_REPLACEMENT_VALUE) .addValidator(Validator.VALID) @@ -124,11 +133,11 @@ public class ReplaceText extends AbstractProcessor { public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() .name("Maximum Buffer Size") .description("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to " - + "apply the regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, " + + "apply the replacement. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, " + "the FlowFile will be routed to 'failure'. " + "In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value " + "of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. " - + "This value is ignored and the buffer is not used if 'Regular Expression' is set to '.*'") + + "This value is ignored if the property is set to one of: Append, Prepend, Always Replace") .required(true) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .defaultValue("1 MB") @@ -136,7 +145,7 @@ public class ReplaceText extends AbstractProcessor { public static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder() .name("Replacement Strategy") .description("The strategy for how and what to replace within the FlowFile's text content.") - .allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE) + .allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE) .defaultValue(REGEX_REPLACE.getValue()) .required(true) .build(); @@ -210,18 +219,6 @@ public class ReplaceText extends AbstractProcessor { return; } - final AttributeValueDecorator escapeBackRefDecorator = new AttributeValueDecorator() { - @Override - public String decorate(final String attributeValue) { - return attributeValue.replace("$", "\\$"); - } - }; - - final String regexValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions().getValue(); - final int numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount(); - - final boolean skipBuffer = ".*".equals(unsubstitutedRegex); - final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); @@ -233,70 +230,44 @@ public class ReplaceText extends AbstractProcessor { buffer = null; } + ReplacementStrategyExecutor replacementStrategyExecutor; + switch (replacementStrategy) { + case prependValue: + replacementStrategyExecutor = new PrependReplace(); + break; + case appendValue: + replacementStrategyExecutor = new AppendReplace(); + break; + case regexReplaceValue: + // for backward compatibility - if replacement regex is ".*" then we will simply always replace the content. + if (context.getProperty(SEARCH_VALUE).getValue().equals(".*")) { + replacementStrategyExecutor = new AlwaysReplace(); + } else { + replacementStrategyExecutor = new RegexReplace(buffer, context); + } + + break; + case literalReplaceValue: + replacementStrategyExecutor = new LiteralReplace(buffer); + break; + case alwaysReplace: + replacementStrategyExecutor = new AlwaysReplace(); + break; + default: + throw new AssertionError(); + } + for (FlowFile flowFile : flowFiles) { if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { - if (flowFile.getSize() > maxBufferSize && !skipBuffer) { + if (flowFile.getSize() > maxBufferSize && replacementStrategyExecutor.isAllDataBufferedForEntireText()) { session.transfer(flowFile, REL_FAILURE); continue; } } - String replacement; - if (!replacementStrategy.equals(regexReplaceValue)) { - replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); - } else { - replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, escapeBackRefDecorator).getValue(); - final Matcher backRefMatcher = backReferencePattern.matcher(replacement); - while (backRefMatcher.find()) { - final String backRefNum = backRefMatcher.group(1); - if (backRefNum.startsWith("0")) { - continue; - } - final int originalBackRefIndex = Integer.parseInt(backRefNum); - int backRefIndex = originalBackRefIndex; - - // if we have a replacement value like $123, and we have less than 123 capturing groups, then - // we want to truncate the 3 and use capturing group 12; if we have less than 12 capturing groups, - // then we want to truncate the 2 and use capturing group 1; if we don't have a capturing group then - // we want to truncate the 1 and get 0. - while (backRefIndex > numCapturingGroups && backRefIndex >= 10) { - backRefIndex /= 10; - } - - if (backRefIndex > numCapturingGroups) { - final StringBuilder sb = new StringBuilder(replacement.length() + 1); - final int groupStart = backRefMatcher.start(1); - - sb.append(replacement.substring(0, groupStart - 1)); - sb.append("\\"); - sb.append(replacement.substring(groupStart - 1)); - replacement = sb.toString(); - } - } - } - - ReplacementStrategyExecutor replacementStrategyExecutor; - switch (replacementStrategy) { - case prependValue: - replacementStrategyExecutor = new PrependReplace(); - break; - case appendValue: - replacementStrategyExecutor = new AppendReplace(); - break; - case regexReplaceValue: - replacementStrategyExecutor = new RegexReplace(buffer); - break; - case literalReplaceValue: - replacementStrategyExecutor = new LiteralReplace(buffer); - break; - default: - throw new AssertionError(); - } - final StopWatch stopWatch = new StopWatch(true); - flowFile = replacementStrategyExecutor.replace(flowFile, session, context, replacement, evaluateMode, - charset, maxBufferSize, skipBuffer); + flowFile = replacementStrategyExecutor.replace(flowFile, session, context, evaluateMode, charset, maxBufferSize); logger.info("Transferred {} to 'success'", new Object[] {flowFile}); session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); @@ -304,11 +275,106 @@ public class ReplaceText extends AbstractProcessor { } } - private static class PrependReplace implements ReplacementStrategyExecutor { + + // If we find a back reference that is not valid, then we will treat it as a literal string. For example, if we have 3 capturing + // groups and the Replacement Value has the value is "I owe $8 to him", then we want to treat the $8 as a literal "$8", rather + // than attempting to use it as a back reference. + private static String escapeLiteralBackReferences(final String unescaped, final int numCapturingGroups) { + if (numCapturingGroups == 0) { + return unescaped; + } + + String value = unescaped; + final Matcher backRefMatcher = backReferencePattern.matcher(value); + while (backRefMatcher.find()) { + final String backRefNum = backRefMatcher.group(1); + if (backRefNum.startsWith("0")) { + continue; + } + final int originalBackRefIndex = Integer.parseInt(backRefNum); + int backRefIndex = originalBackRefIndex; + + // if we have a replacement value like $123, and we have less than 123 capturing groups, then + // we want to truncate the 3 and use capturing group 12; if we have less than 12 capturing groups, + // then we want to truncate the 2 and use capturing group 1; if we don't have a capturing group then + // we want to truncate the 1 and get 0. + while (backRefIndex > numCapturingGroups && backRefIndex >= 10) { + backRefIndex /= 10; + } + + if (backRefIndex > numCapturingGroups) { + final StringBuilder sb = new StringBuilder(value.length() + 1); + final int groupStart = backRefMatcher.start(1); + + sb.append(value.substring(0, groupStart - 1)); + sb.append("\\"); + sb.append(value.substring(groupStart - 1)); + value = sb.toString(); + } + } + + return value; + } + + private static class AlwaysReplace implements ReplacementStrategyExecutor { + @Override + public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) { + + final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); + final StringBuilder lineEndingBuilder = new StringBuilder(2); + + if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(final InputStream in, final OutputStream out) throws IOException { + out.write(replacementValue.getBytes(charset)); + } + }); + } else { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(final InputStream in, final OutputStream out) throws IOException { + try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize); + BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) { + + String line; + while ((line = br.readLine()) != null) { + // We need to determine what line ending was used and use that after our replacement value. + lineEndingBuilder.setLength(0); + for (int i = line.length() - 1; i >= 0; i--) { + final char c = line.charAt(i); + if (c == '\r' || c == '\n') { + lineEndingBuilder.append(c); + } else { + break; + } + } + + bw.write(replacementValue); + + // Preserve original line endings. Reverse string because we iterated over original line ending in reverse order, appending to builder. + // So if builder has multiple characters, they are now reversed from the original string's ordering. + bw.write(lineEndingBuilder.reverse().toString()); + } + } + } + }); + } + + return flowFile; + } @Override - public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String replacementValue, final String evaluateMode, - final Charset charset, final int maxBufferSize, final boolean skipBuffer) { + public boolean isAllDataBufferedForEntireText() { + return false; + } + } + + private static class PrependReplace implements ReplacementStrategyExecutor { + @Override + public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) { + final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); + if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { flowFile = session.write(flowFile, new StreamCallback() { @Override @@ -334,13 +400,19 @@ public class ReplaceText extends AbstractProcessor { } return flowFile; } + + @Override + public boolean isAllDataBufferedForEntireText() { + return false; + } } private static class AppendReplace implements ReplacementStrategyExecutor { @Override - public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String replacementValue, final String evaluateMode, - final Charset charset, final int maxBufferSize, final boolean skipBuffer) { + public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) { + final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); + if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { flowFile = session.write(flowFile, new StreamCallback() { @Override @@ -387,81 +459,121 @@ public class ReplaceText extends AbstractProcessor { } return flowFile; } + + @Override + public boolean isAllDataBufferedForEntireText() { + return false; + } } private static class RegexReplace implements ReplacementStrategyExecutor { private final byte[] buffer; + private final int numCapturingGroups; + private final Map additionalAttrs; - public RegexReplace(final byte[] buffer) { + private static final AttributeValueDecorator escapeBackRefDecorator = new AttributeValueDecorator() { + @Override + public String decorate(final String attributeValue) { + return attributeValue.replace("$", "\\$"); + } + }; + + public RegexReplace(final byte[] buffer, final ProcessContext context) { this.buffer = buffer; + + final String regexValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions().getValue(); + numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount(); + additionalAttrs = new HashMap<>(numCapturingGroups); } @Override - public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String replacementValue, final String evaluateMode, - final Charset charset, final int maxBufferSize, final boolean skipBuffer) { - final String replacementFinal = replacementValue.replaceAll("(\\$\\D)", "\\\\$1"); + public FlowFile replace(final FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) { + final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() { + @Override + public String decorate(final String attributeValue) { + return Pattern.quote(attributeValue); + } + }; + final String searchRegex = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue(); + final Pattern searchPattern = Pattern.compile(searchRegex); - // always match; just overwrite value with the replacement value; this optimization prevents us - // from reading the file at all. - if (skipBuffer) { - if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { - flowFile = session.write(flowFile, new OutputStreamCallback() { + final int flowFileSize = (int) flowFile.getSize(); + FlowFile updatedFlowFile; + if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer, false); + } + }); + + final String contentString = new String(buffer, 0, flowFileSize, charset); + additionalAttrs.clear(); + final Matcher matcher = searchPattern.matcher(contentString); + if (matcher.find()) { + for (int i = 1; i <= matcher.groupCount(); i++) { + final String groupValue = matcher.group(i); + additionalAttrs.put("$" + i, groupValue); + } + + String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue(); + replacement = escapeLiteralBackReferences(replacement, numCapturingGroups); + + // If we have a $ followed by anything other than a number, then escape it. E.g., $d becomes \$d so that it can be used as a literal in a regex. + final String replacementFinal = replacement.replaceAll("(\\$\\D)", "\\\\$1"); + + final String updatedValue = contentString.replaceAll(searchRegex, replacementFinal); + updatedFlowFile = session.write(flowFile, new OutputStreamCallback() { @Override public void process(final OutputStream out) throws IOException { - out.write(replacementFinal.getBytes(charset)); - } - }); - } else { - flowFile = session.write(flowFile, new StreamCallback() { - @Override - public void process(final InputStream in, final OutputStream out) throws IOException { - try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize); - BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) { - while (null != br.readLine()) { - bw.write(replacementFinal); - } - } - } - }); - } - } else { - final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() { - @Override - public String decorate(final String attributeValue) { - return Pattern.quote(attributeValue); - } - }; - final String searchRegex = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue(); - - final int flowFileSize = (int) flowFile.getSize(); - if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { - flowFile = session.write(flowFile, new StreamCallback() { - @Override - public void process(final InputStream in, final OutputStream out) throws IOException { - StreamUtils.fillBuffer(in, buffer, false); - final String contentString = new String(buffer, 0, flowFileSize, charset); - final String updatedValue = contentString.replaceAll(searchRegex, replacementFinal); out.write(updatedValue.getBytes(charset)); } }); } else { - flowFile = session.write(flowFile, new StreamCallback() { - @Override - public void process(final InputStream in, final OutputStream out) throws IOException { - try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize); - BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) { - String oneLine; - while (null != (oneLine = br.readLine())) { + // If no match, just return the original. No need to write out any content. + return flowFile; + } + } else { + updatedFlowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(final InputStream in, final OutputStream out) throws IOException { + try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize); + BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) { + String oneLine; + while (null != (oneLine = br.readLine())) { + additionalAttrs.clear(); + final Matcher matcher = searchPattern.matcher(oneLine); + if (matcher.find()) { + for (int i = 1; i <= matcher.groupCount(); i++) { + final String groupValue = matcher.group(i); + additionalAttrs.put("$" + i, groupValue); + } + + String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue(); + replacement = escapeLiteralBackReferences(replacement, numCapturingGroups); + + // If we have a $ followed by anything other than a number, then escape it. E.g., $d becomes \$d so that it can be used as a literal in a regex. + final String replacementFinal = replacement.replaceAll("(\\$\\D)", "\\\\$1"); + final String updatedValue = oneLine.replaceAll(searchRegex, replacementFinal); bw.write(updatedValue); + } else { + // No match. Just write out the line as it was. + bw.write(oneLine); } } } - }); - } + } + }); } - return flowFile; + + return updatedFlowFile; + } + + @Override + public boolean isAllDataBufferedForEntireText() { + return true; } } @@ -473,8 +585,10 @@ public class ReplaceText extends AbstractProcessor { } @Override - public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String replacementValue, final String evaluateMode, - final Charset charset, final int maxBufferSize, final boolean skipBuffer) { + public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) { + + final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); + final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() { @Override public String decorate(final String attributeValue) { @@ -484,7 +598,6 @@ public class ReplaceText extends AbstractProcessor { final String searchValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue(); - final int flowFileSize = (int) flowFile.getSize(); if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { flowFile = session.write(flowFile, new StreamCallback() { @@ -515,9 +628,16 @@ public class ReplaceText extends AbstractProcessor { } return flowFile; } + + @Override + public boolean isAllDataBufferedForEntireText() { + return true; + } } private interface ReplacementStrategyExecutor { - FlowFile replace(FlowFile flowFile, ProcessSession session, ProcessContext context, String replacement, String evaluateMode, Charset charset, int maxBufferSize, boolean skipBuffer); + FlowFile replace(FlowFile flowFile, ProcessSession session, ProcessContext context, String evaluateMode, Charset charset, int maxBufferSize); + + boolean isAllDataBufferedForEntireText(); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java index 3a311a339e..561a1e06fe 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java @@ -914,13 +914,13 @@ public class TestReplaceText { final Map attributes = new HashMap<>(); attributes.put("abc", "Good"); - runner.enqueue(translateNewLines(Paths.get("src/test/resources/TestReplaceTextLineByLine/testFile.txt")), attributes); + runner.enqueue(Paths.get("src/test/resources/TestReplaceTextLineByLine/testFile.txt"), attributes); runner.run(); runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0); - out.assertContentEquals(translateNewLines(new File("src/test/resources/TestReplaceTextLineByLine/Good.txt"))); + out.assertContentEquals("Good\nGood\nGood\nGood\nGood\nGood\nGood\nGood\nGood\nGood\nGood"); } @Test @@ -939,7 +939,7 @@ public class TestReplaceText { runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0); - out.assertContentEquals("GoodGoodGood"); + out.assertContentEquals("Good\r\nGood\r\nGood\r"); } @Test @@ -984,17 +984,108 @@ public class TestReplaceText { System.out.println(outContent); Assert.assertTrue(outContent.equals("attribute header\n\nabc.txt\n\ndata header\n\nHello\n\n\nfooter\n" + "attribute header\n\nabc.txt\n\ndata header\n\nWorld!\n\nfooter\n")); - } - private byte[] translateNewLines(final File file) throws IOException { + @Test + public void testCapturingGroupInExpressionLanguage() { + final TestRunner runner = TestRunners.newTestRunner(new ReplaceText()); + runner.setValidateExpressionUsage(false); + runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.LINE_BY_LINE); + runner.setProperty(ReplaceText.SEARCH_VALUE, "(.*?),(.*?),(\\d+.*)"); + runner.setProperty(ReplaceText.REPLACEMENT_VALUE, "$1,$2,${ '$3':toDate('ddMMMyyyy'):format('yyyy/MM/dd') }"); + + final String csvIn = + "2006,10-01-2004,10may2004\n" + + "2007,15-05-2006,10jun2005\r\n" + + "2009,8-8-2008,10aug2008"; + final String expectedCsvOut = + "2006,10-01-2004,2004/05/10\n" + + "2007,15-05-2006,2005/06/10\r\n" + + "2009,8-8-2008,2008/08/10"; + + runner.enqueue(csvIn.getBytes()); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0); + out.assertContentEquals(expectedCsvOut); + } + + @Test + public void testCapturingGroupInExpressionLanguage2() { + final TestRunner runner = TestRunners.newTestRunner(new ReplaceText()); + runner.setValidateExpressionUsage(false); + runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.LINE_BY_LINE); + runner.setProperty(ReplaceText.SEARCH_VALUE, "(.*)/(.*?).jpg"); + runner.setProperty(ReplaceText.REPLACEMENT_VALUE, "$1/${ '$2':substring(0,1) }.png"); + + final String csvIn = + "1,2,3,https://123.jpg,email@mydomain.com\n" + + "3,2,1,https://321.jpg,other.email@mydomain.com"; + final String expectedCsvOut = + "1,2,3,https://1.png,email@mydomain.com\n" + + "3,2,1,https://3.png,other.email@mydomain.com"; + + runner.enqueue(csvIn.getBytes()); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0); + out.assertContentEquals(expectedCsvOut); + } + + @Test + public void testAlwaysReplaceEntireText() { + final TestRunner runner = TestRunners.newTestRunner(new ReplaceText()); + runner.setValidateExpressionUsage(false); + runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.ENTIRE_TEXT); + runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, ReplaceText.ALWAYS_REPLACE); + runner.setProperty(ReplaceText.SEARCH_VALUE, "i do not exist anywhere in the text"); + runner.setProperty(ReplaceText.REPLACEMENT_VALUE, "${filename}"); + + final Map attributes = new HashMap<>(); + attributes.put("filename", "abc.txt"); + runner.enqueue("Hello\nWorld!".getBytes(), attributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0); + out.assertContentEquals("abc.txt"); + } + + @Test + public void testAlwaysReplaceLineByLine() { + final TestRunner runner = TestRunners.newTestRunner(new ReplaceText()); + runner.setValidateExpressionUsage(false); + runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.LINE_BY_LINE); + runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, ReplaceText.ALWAYS_REPLACE); + runner.setProperty(ReplaceText.SEARCH_VALUE, "i do not exist anywhere in the text"); + runner.setProperty(ReplaceText.REPLACEMENT_VALUE, "${filename}"); + + final Map attributes = new HashMap<>(); + attributes.put("filename", "abc.txt"); + runner.enqueue("Hello\nWorld!\r\ntoday!\n".getBytes(), attributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0); + out.assertContentEquals("abc.txt\nabc.txt\r\nabc.txt\n"); + } + + + + private String translateNewLines(final File file) throws IOException { return translateNewLines(file.toPath()); } - private byte[] translateNewLines(final Path path) throws IOException { + private String translateNewLines(final Path path) throws IOException { final byte[] data = Files.readAllBytes(path); final String text = new String(data, StandardCharsets.UTF_8); - return translateNewLines(text).getBytes(StandardCharsets.UTF_8); + return translateNewLines(text); } private String translateNewLines(final String text) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestReplaceTextLineByLine/Good.txt b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestReplaceTextLineByLine/Good.txt deleted file mode 100755 index 6e90a7df23..0000000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestReplaceTextLineByLine/Good.txt +++ /dev/null @@ -1 +0,0 @@ -GoodGoodGoodGoodGoodGoodGoodGoodGoodGoodGood \ No newline at end of file