NIFI-12068: This closes #7737. Added the ability to annotate components with @UseCase and @MultiProcessorUseCase annotations; updated several processors to make use of them.

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2023-09-15 12:04:10 -04:00 committed by Joseph Witt
parent d354c88f57
commit 140ac52e9d
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
23 changed files with 1360 additions and 148 deletions

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.annotation.documentation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* <p>
* An annotation that can be used for Processors in order to explain a specific use case that can be
* accomplished using this Processor in conjunction with at least one other Processor.
* For Processors that are able to be used for multiple use cases, the component
* may be annotated with multiple MultiProcessorUseCase annotations.
* </p>
* <p>
* Note that this annotation differs from {@link UseCase} in that UseCase should describe a use case that is
* accomplished using only the extension that is annotated. In contrast, MultiProcessorUseCase documents a use case
* that is accomplished by using both the Processor that is annotated as well as other Processors.
* </p>
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Repeatable(MultiProcessorUseCases.class)
public @interface MultiProcessorUseCase {
/**
* A simple 1 (at most 2) sentence description of the use case. This should not include any extraneous details, such
* as caveats, examples, etc. Those can be provided using the {@link #notes()} method.
*
* @return a simple description of the use case
*/
String description();
/**
* Most of the time, 1-2 sentences is sufficient to describe a use case. Those 1-2 sentence should then be returned
* by the {@link #description()}. In the event that the description is not sufficient, details may be provided to
* further explain, by providing examples, caveats, etc.
*
* @return any important notes that pertain to the use case
*/
String notes() default "";
/**
* An optional array of keywords that can be associated with the use case.
* @return keywords associated with the use case
*/
String[] keywords() default {};
/**
* An array of {@link ProcessorConfiguration}s that are necessary in order to accomplish the task described in this use case.
* @return an array of processor configurations
*/
ProcessorConfiguration[] configurations();
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.annotation.documentation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* An enclosing annotation that can be used in order to use the {@link MultiProcessorUseCase} annotation in a repeated manner.
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface MultiProcessorUseCases {
MultiProcessorUseCase[] value();
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.annotation.documentation;
import org.apache.nifi.processor.Processor;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* An annotation that can be used in conjunction with {@link MultiProcessorUseCase} in order to specify the different
* components that are involved in a given use case.
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface ProcessorConfiguration {
/**
* Returns the class of the Processor that is to be used in the use case, if it is provided. Either the
* Processor Class or the Processor Class Name must be provided.
*
* @return the Processor's class, or <code>Processor</code> if the processor's classname is specified instead
*/
Class<? extends Processor> processorClass() default Processor.class;
/**
* @return the fully qualified classname of the component
*/
String processorClassName() default "";
/**
* @return an explanation of how the Processor should be configured.
*/
String configuration();
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.annotation.documentation;
import org.apache.nifi.annotation.behavior.InputRequirement;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* <p>
* An annotation that can be used for extension points in order to explain a specific use case that can be
* accomplished using this extension. For components that are able to be used for multiple use cases, the component
* may be annotated with multiple UseCase annotations.
* </p>
* <p>
* Note that this annotation differs from {@link CapabilityDescription} in that CapabilityDescription should describe the
* general purpose of the extension point. UseCase, on the other hand, documents one very specific use case that
* can be accomplished. Some extension points may use only a single UseCase annotation while others may accomplish
* many use cases.
* </p>
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Repeatable(UseCases.class)
public @interface UseCase {
/**
* A simple 1 (at most 2) sentence description of the use case. This should not include any extraneous details, such
* as caveats, examples, etc. Those can be provided using the {@link #notes()} method.
*
* @return a simple description of the use case
*/
String description();
/**
* Most of the time, 1-2 sentences is sufficient to describe a use case. Those 1-2 sentence should then be returned
* by the {@link #description()}. In the event that the description is not sufficient, details may be provided to
* further explain, by providing examples, caveats, etc.
*
* @return any important notes that pertain to the use case
*/
String notes() default "";
/**
* Most Processors specify an InputRequirement of either {@link InputRequirement.Requirement#INPUT_REQUIRED INPUT_REQUIRED}
* or {@link InputRequirement.Requirement#INPUT_FORBIDDEN}. However, some Processors use {@link InputRequirement.Requirement#INPUT_ALLOWED}
* because some use cases require input while others do not. The inputRequirement here is only relevant for Processors that use
* an InputRequirement of {@link InputRequirement.Requirement#INPUT_ALLOWED} and can indicate whether or not the Processor should have
* input (aka incoming Connections) for this particular use case.
*
* @return the {@link InputRequirement} that corresponds to this use case.
*/
InputRequirement.Requirement inputRequirement() default InputRequirement.Requirement.INPUT_ALLOWED;
/**
* An optional array of keywords that can be associated with the use case.
* @return keywords associated with the use case
*/
String[] keywords() default {};
/**
* A description of how to configure the extension for this particular use case.
* @return a description of how to configure the extension for this particular use case.
*/
String configuration() default "";
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.annotation.documentation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* An enclosing annotation that can be used in order to use the {@link UseCase} annotation in a repeated manner.
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface UseCases {
UseCase[] value();
}

View File

@ -38,8 +38,10 @@ import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
@ -167,6 +169,8 @@ public abstract class AbstractDocumentationWriter implements ExtensionDocumentat
writeRestrictedInfo(component.getClass().getAnnotation(Restricted.class));
writeInputRequirementInfo(getInputRequirement(component));
writeSystemResourceConsiderationInfo(getSystemResourceConsiderations(component));
writeUseCases(getUseCases(component));
writeMultiProcessorUseCases(getMultiProcessorUseCases(component));
writeSeeAlso(component.getClass().getAnnotation(SeeAlso.class));
writeDefaultSchedule(component.getClass().getAnnotation(DefaultSchedule.class));
}
@ -251,13 +255,31 @@ public abstract class AbstractDocumentationWriter implements ExtensionDocumentat
private List<SystemResourceConsideration> getSystemResourceConsiderations(final ConfigurableComponent component) {
SystemResourceConsideration[] systemResourceConsiderations = component.getClass().getAnnotationsByType(SystemResourceConsideration.class);
if (systemResourceConsiderations == null) {
if (systemResourceConsiderations.length == 0) {
return Collections.emptyList();
}
return Arrays.asList(systemResourceConsiderations);
}
private List<UseCase> getUseCases(final ConfigurableComponent component) {
UseCase[] useCases = component.getClass().getAnnotationsByType(UseCase.class);
if (useCases.length == 0) {
return Collections.emptyList();
}
return Arrays.asList(useCases);
}
private List<MultiProcessorUseCase> getMultiProcessorUseCases(final ConfigurableComponent component) {
MultiProcessorUseCase[] useCases = component.getClass().getAnnotationsByType(MultiProcessorUseCase.class);
if (useCases.length == 0) {
return Collections.emptyList();
}
return Arrays.asList(useCases);
}
protected ExtensionType getExtensionType(final ConfigurableComponent component) {
if (component instanceof Processor) {
return ExtensionType.PROCESSOR;
@ -268,7 +290,7 @@ public abstract class AbstractDocumentationWriter implements ExtensionDocumentat
if (component instanceof ReportingTask) {
return ExtensionType.REPORTING_TASK;
}
if (component instanceof ReportingTask) {
if (component instanceof FlowAnalysisRule) {
return ExtensionType.FLOW_ANALYSIS_RULE;
}
if (component instanceof ParameterProvider) {
@ -305,6 +327,10 @@ public abstract class AbstractDocumentationWriter implements ExtensionDocumentat
protected abstract void writeSeeAlso(SeeAlso seeAlso) throws IOException;
protected abstract void writeUseCases(List<UseCase> useCases) throws IOException;
protected abstract void writeMultiProcessorUseCases(List<MultiProcessorUseCase> useCases) throws IOException;
protected abstract void writeDefaultSchedule(DefaultSchedule defaultSchedule) throws IOException;
// Processor-specific methods

View File

@ -35,7 +35,10 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDependency;
@ -371,6 +374,62 @@ public class XmlDocumentationWriter extends AbstractDocumentationWriter {
writeTextArray("seeAlso", "see", toSee);
}
@Override
protected void writeUseCases(final List<UseCase> useCases) throws IOException {
if (useCases.isEmpty()) {
return;
}
writeArray("useCases", useCases, this::writeUseCase);
}
private void writeUseCase(final UseCase useCase) throws IOException {
writeStartElement("useCase");
writeTextElement("description", useCase.description());
writeTextElement("notes", useCase.notes());
writeTextArray("keywords", "keyword", Arrays.asList(useCase.keywords()));
writeTextElement("inputRequirement", useCase.inputRequirement().name());
writeTextElement("configuration", useCase.configuration());
writeEndElement();
}
protected void writeMultiProcessorUseCases(final List<MultiProcessorUseCase> multiProcessorUseCases) throws IOException {
if (multiProcessorUseCases.isEmpty()) {
return;
}
writeArray("multiProcessorUseCases", multiProcessorUseCases, this::writeMultiProcessorUseCase);
}
private void writeMultiProcessorUseCase(final MultiProcessorUseCase useCase) throws IOException {
writeStartElement("multiProcessorUseCase");
writeTextElement("description", useCase.description());
writeTextElement("notes", useCase.notes());
writeTextArray("keywords", "keyword", Arrays.asList(useCase.keywords()));
writeArray("processorConfigurations", Arrays.asList(useCase.configurations()), this::writeUseCaseComponent);
writeEndElement();
}
private void writeUseCaseComponent(final ProcessorConfiguration processorConfig) throws IOException {
writeStartElement("processorConfiguration");
String processorClassName = processorConfig.processorClassName();
if (processorClassName.isEmpty()) {
processorClassName = processorConfig.processorClass().getName();
}
writeTextElement("processorClassName", processorClassName);
writeTextElement("configuration", processorConfig.configuration());
writeEndElement();
}
@Override
protected void writeRelationships(final Set<Relationship> relationships) throws IOException {
if (relationships == null || relationships.isEmpty()) {

View File

@ -101,6 +101,14 @@ public class Extension {
private boolean primaryNodeOnly;
private boolean sideEffectFree;
@XmlElementWrapper
@XmlElement(name = "useCase")
private List<UseCase> useCases;
@XmlElementWrapper
@XmlElement(name = "multiProcessorUseCase")
private List<MultiProcessorUseCase> multiProcessorUseCases;
@ApiModelProperty(value = "The name of the extension")
public String getName() {
return name;
@ -334,6 +342,24 @@ public class Extension {
this.sideEffectFree = sideEffectFree;
}
@ApiModelProperty(value = "Zero or more documented use cases for how the extension may be used")
public List<UseCase> getUseCases() {
return useCases;
}
public void setUseCases(final List<UseCase> useCases) {
this.useCases = useCases;
}
@ApiModelProperty(value = "Zero or more documented use cases for how the processor may be used in conjunction with other processors")
public List<MultiProcessorUseCase> getMultiProcessorUseCases() {
return multiProcessorUseCases;
}
public void setMultiProcessorUseCases(final List<MultiProcessorUseCase> multiProcessorUseCases) {
this.multiProcessorUseCases = multiProcessorUseCases;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.extension.manifest;
import io.swagger.annotations.ApiModel;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import java.util.List;
@ApiModel
@XmlAccessorType(XmlAccessType.FIELD)
public class MultiProcessorUseCase {
private String description;
private String notes;
@XmlElementWrapper
@XmlElement(name = "keyword")
private List<String> keywords;
@XmlElementWrapper
@XmlElement(name = "processorConfiguration")
private List<ProcessorConfiguration> processorConfigurations;
public String getDescription() {
return description;
}
public void setDescription(final String description) {
this.description = description;
}
public String getNotes() {
return notes;
}
public void setNotes(final String notes) {
this.notes = notes;
}
public List<String> getKeywords() {
return keywords;
}
public void setKeywords(final List<String> keywords) {
this.keywords = keywords;
}
public List<ProcessorConfiguration> getProcessorConfigurations() {
return processorConfigurations;
}
public void setProcessorConfigurations(final List<ProcessorConfiguration> processorConfigurations) {
this.processorConfigurations = processorConfigurations;
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.extension.manifest;
import io.swagger.annotations.ApiModel;
@ApiModel
public class ProcessorConfiguration {
private String processorClassName;
private String configuration;
public String getProcessorClassName() {
return processorClassName;
}
public void setProcessorClassName(final String processorClassName) {
this.processorClassName = processorClassName;
}
public String getConfiguration() {
return configuration;
}
public void setConfiguration(final String configuration) {
this.configuration = configuration;
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.extension.manifest;
import io.swagger.annotations.ApiModel;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import java.util.List;
@ApiModel
@XmlAccessorType(XmlAccessType.FIELD)
public class UseCase {
private String description;
private String notes;
private InputRequirement inputRequirement;
@XmlElementWrapper
@XmlElement(name = "keyword")
private List<String> keywords;
String configuration;
public String getDescription() {
return description;
}
public void setDescription(final String description) {
this.description = description;
}
public String getNotes() {
return notes;
}
public void setNotes(final String notes) {
this.notes = notes;
}
public InputRequirement getInputRequirement() {
return inputRequirement;
}
public void setInputRequirement(final InputRequirement inputRequirement) {
this.inputRequirement = inputRequirement;
}
public List<String> getKeywords() {
return keywords;
}
public void setKeywords(final List<String> keywords) {
this.keywords = keywords;
}
public String getConfiguration() {
return configuration;
}
public void setConfiguration(final String configuration) {
this.configuration = configuration;
}
}

View File

@ -30,8 +30,11 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
@ -47,6 +50,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.sqs.GetSQS;
import java.io.IOException;
import java.net.URLDecoder;
@ -84,6 +88,88 @@ import java.util.concurrent.TimeUnit;
@WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"),
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"),
@WritesAttribute(attribute = "s3.encryptionStrategy", description = "The name of the encryption strategy that was used to store the S3 object (if it is encrypted)"),})
@UseCase(
description = "Fetch a specific file from S3",
configuration = """
The "Bucket" property should be set to the name of the S3 bucket that contains the file. Typically this is defined as an attribute on an incoming FlowFile, \
so this property is set to `${s3.bucket}`.
The "Object Key" property denotes the fully qualified filename of the file to fetch. Typically, the FlowFile's `filename` attribute is used, so this property is \
set to `${filename}`.
The "Region" property must be set to denote the S3 region that the Bucket resides in. If the flow being built is to be reused elsewhere, it's a good idea to parameterize \
this property by setting it to something like `#{S3_REGION}`.
The "AWS Credentials Provider service" property should specify an instance of the AWSCredentialsProviderControllerService in order to provide credentials for accessing the file.
"""
)
@MultiProcessorUseCase(
description = "Retrieve all files in an S3 bucket",
keywords = {"s3", "state", "retrieve", "fetch", "all", "stream"},
configurations = {
@ProcessorConfiguration(
processorClass = ListS3.class,
configuration = """
The "Bucket" property should be set to the name of the S3 bucket that files reside in. If the flow being built is to be reused elsewhere, it's a good idea to parameterize \
this property by setting it to something like `#{S3_SOURCE_BUCKET}`.
The "Region" property must be set to denote the S3 region that the Bucket resides in. If the flow being built is to be reused elsewhere, it's a good idea to parameterize \
this property by setting it to something like `#{S3_SOURCE_REGION}`.
The "AWS Credentials Provider service" property should specify an instance of the AWSCredentialsProviderControllerService in order to provide credentials for accessing the bucket.
The 'success' Relationship of this Processor is then connected to FetchS3Object.
"""
),
@ProcessorConfiguration(
processorClass = FetchS3Object.class,
configuration = """
"Bucket" = "${s3.bucket}"
"Object Key" = "${filename}"
The "AWS Credentials Provider service" property should specify an instance of the AWSCredentialsProviderControllerService in order to provide credentials for accessing the bucket.
The "Region" property must be set to the same value as the "Region" property of the ListS3 Processor.
"""
)
}
)
@MultiProcessorUseCase(
description = "Retrieve new files as they arrive in an S3 bucket",
notes = "This method of retrieving files from S3 is more efficient than using ListS3 and more cost effective. It is the pattern recommended by AWS. " +
"However, it does require that the S3 bucket be configured to place notifications on an SQS queue when new files arrive. For more information, see " +
"https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html",
configurations = {
@ProcessorConfiguration(
processorClass = GetSQS.class,
configuration = """
The "Queue URL" must be set to the appropriate URL for the SQS queue. It is recommended that this property be parameterized, using a value such as `#{SQS_QUEUE_URL}`.
The "Region" property must be set to denote the SQS region that the queue resides in. It's a good idea to parameterize this property by setting it to something like `#{SQS_REGION}`.
The "AWS Credentials Provider service" property should specify an instance of the AWSCredentialsProviderControllerService in order to provide credentials for accessing the bucket.
The 'success' relationship is connected to EvaluateJsonPath.
"""
),
@ProcessorConfiguration(
processorClassName = "org.apache.nifi.processors.standard.EvaluateJsonPath",
configuration = """
"Destination" = "flowfile-attribute"
"s3.bucket" = "$.Records[0].s3.bucket.name"
"filename" = "$.Records[0].s3.object.key"
The 'success' relationship is connected to FetchS3Object.
"""
),
@ProcessorConfiguration(
processorClass = FetchS3Object.class,
configuration = """
"Bucket" = "${s3.bucket}"
"Object Key" = "${filename}"
The "Region" property must be set to the same value as the "Region" property of the GetSQS Processor.
The "AWS Credentials Provider service" property should specify an instance of the AWSCredentialsProviderControllerService in order to provide credentials for accessing the bucket.
"""
)
}
)
public class FetchS3Object extends AbstractS3Processor {
public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
@ -382,4 +468,4 @@ public class FetchS3Object extends AbstractS3Processor {
attributes.put(CoreAttributes.FILENAME.key(), filePathName);
}
}
}
}

View File

@ -27,8 +27,11 @@ import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.AllowableValue;
@ -58,6 +61,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
@ -166,6 +171,8 @@ public class HtmlDocumentationWriter implements DocumentationWriter {
writeStatefulInfo(configurableComponent, xmlStreamWriter);
writeRestrictedInfo(configurableComponent, xmlStreamWriter);
writeInputRequirementInfo(configurableComponent, xmlStreamWriter);
writeUseCases(configurableComponent, xmlStreamWriter);
writeMultiComponentUseCases(configurableComponent, xmlStreamWriter);
writeSystemResourceConsiderationInfo(configurableComponent, xmlStreamWriter);
writeSeeAlso(configurableComponent, xmlStreamWriter);
xmlStreamWriter.writeEndElement();
@ -448,6 +455,135 @@ public class HtmlDocumentationWriter implements DocumentationWriter {
return description;
}
protected void writeUseCases(final ConfigurableComponent component, final XMLStreamWriter xmlStreamWriter) throws XMLStreamException {
final UseCase[] useCases = component.getClass().getAnnotationsByType(UseCase.class);
if (useCases.length == 0) {
return;
}
writeSimpleElement(xmlStreamWriter, "h2", "Example Use Cases:");
for (final UseCase useCase : useCases) {
writeSimpleElement(xmlStreamWriter, "h3", "Use Case:");
writeSimpleElement(xmlStreamWriter, "p", useCase.description());
final String notes = useCase.notes();
if (!StringUtils.isEmpty(notes)) {
writeSimpleElement(xmlStreamWriter, "h4", "Notes:");
final String[] splits = notes.split("\\n");
for (final String split : splits) {
writeSimpleElement(xmlStreamWriter, "p", split);
}
}
final String[] keywords = useCase.keywords();
if (keywords.length > 0) {
writeSimpleElement(xmlStreamWriter, "h4", "Keywords:");
xmlStreamWriter.writeCharacters(String.join(", ", keywords));
}
final Requirement inputRequirement = useCase.inputRequirement();
if (inputRequirement != Requirement.INPUT_ALLOWED) {
writeSimpleElement(xmlStreamWriter, "h4", "Input Requirement:");
xmlStreamWriter.writeCharacters(inputRequirement.toString());
}
final String configuration = useCase.configuration();
writeUseCaseConfiguration(configuration, xmlStreamWriter);
writeSimpleElement(xmlStreamWriter, "br", null);
}
}
protected void writeMultiComponentUseCases(final ConfigurableComponent component, final XMLStreamWriter xmlStreamWriter) throws XMLStreamException {
final MultiProcessorUseCase[] useCases = component.getClass().getAnnotationsByType(MultiProcessorUseCase.class);
if (useCases.length == 0) {
return;
}
writeSimpleElement(xmlStreamWriter, "h2", "Example Use Cases Involving Other Components:");
for (final MultiProcessorUseCase useCase : useCases) {
writeSimpleElement(xmlStreamWriter, "h3", "Use Case:");
writeSimpleElement(xmlStreamWriter, "p", useCase.description());
final String notes = useCase.notes();
if (!StringUtils.isEmpty(notes)) {
writeSimpleElement(xmlStreamWriter, "h4", "Notes:");
final String[] splits = notes.split("\\n");
for (final String split : splits) {
writeSimpleElement(xmlStreamWriter, "p", split);
}
}
final String[] keywords = useCase.keywords();
if (keywords.length > 0) {
writeSimpleElement(xmlStreamWriter, "h4", "Keywords:");
xmlStreamWriter.writeCharacters(String.join(", ", keywords));
}
writeSimpleElement(xmlStreamWriter, "h4", "Components involved:");
final ProcessorConfiguration[] processorConfigurations = useCase.configurations();
for (final ProcessorConfiguration processorConfiguration : processorConfigurations) {
writeSimpleElement(xmlStreamWriter, "strong", "Component Type: ");
final String extensionClassName;
if (processorConfiguration.processorClassName().isEmpty()) {
extensionClassName = processorConfiguration.processorClass().getName();
} else {
extensionClassName = processorConfiguration.processorClassName();
}
writeSimpleElement(xmlStreamWriter, "span", extensionClassName);
final String configuration = processorConfiguration.configuration();
writeUseCaseConfiguration(configuration, xmlStreamWriter);
writeSimpleElement(xmlStreamWriter, "br", null);
}
writeSimpleElement(xmlStreamWriter, "br", null);
}
}
private void writeUseCaseConfiguration(final String configuration, final XMLStreamWriter xmlStreamWriter) throws XMLStreamException {
if (StringUtils.isEmpty(configuration)) {
return;
}
writeSimpleElement(xmlStreamWriter, "h4", "Configuration:");
final String[] splits = configuration.split("\\n");
for (final String split : splits) {
xmlStreamWriter.writeStartElement("p");
final Matcher matcher = Pattern.compile("`(.*?)`").matcher(split);
int startIndex = 0;
while (matcher.find()) {
final int start = matcher.start();
if (start > 0) {
xmlStreamWriter.writeCharacters(split.substring(startIndex, start));
}
writeSimpleElement(xmlStreamWriter, "code", matcher.group(1));
startIndex = matcher.end();
}
if (split.length() > startIndex) {
if (startIndex == 0) {
xmlStreamWriter.writeCharacters(split);
} else {
xmlStreamWriter.writeCharacters(split.substring(startIndex));
}
}
xmlStreamWriter.writeEndElement();
}
}
/**
* Writes the PropertyDescriptors out as a table.
*

View File

@ -38,7 +38,10 @@ import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -98,6 +101,40 @@ import java.util.zip.InflaterInputStream;
+ "property is set to decompress and the file is successfully decompressed, this attribute is removed, as the MIME Type is no longer known.")
@SystemResourceConsideration(resource = SystemResource.CPU)
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@UseCase(
description = "Compress the contents of a FlowFile",
configuration = """
"Mode" = "compress"
"Compression Format" should be set to whichever compression algorithm should be used."""
)
@UseCase(
description = "Decompress the contents of a FlowFile",
configuration = """
"Mode" = "decompress"
"Compression Format" should be set to whichever compression algorithm was used to compress the data previously."""
)
@MultiProcessorUseCase(
description = "Check whether or not a FlowFile is compressed and if so, decompress it.",
notes = "If IdentifyMimeType determines that the content is not compressed, CompressContent will pass the FlowFile " +
"along to the 'success' relationship without attempting to decompress it.",
keywords = {"auto", "detect", "mime type", "compress", "decompress", "gzip", "bzip2"},
configurations = {
@ProcessorConfiguration(
processorClass = IdentifyMimeType.class,
configuration = """
Default property values are sufficient.
Connect the 'success' relationship to CompressContent.
"""
),
@ProcessorConfiguration(
processorClass = CompressContent.class,
configuration = """
"Mode" = "decompress"
"Compression Format" = "use mime.type attribute"
"""
)
}
)
public class CompressContent extends AbstractProcessor {
public static final String COMPRESSION_FORMAT_ATTRIBUTE = "use mime.type attribute";

View File

@ -25,6 +25,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
@ -48,6 +49,11 @@ import java.util.List;
+ "the output schema can have a field named \"balance\" with a type of string, double, or float. If any field is present in the input that is not present in the output, "
+ "the field will be left out of the output. If any field is specified in the output schema but is not present in the input data/schema, then the field will not be "
+ "present in the output or will have a null value, depending on the writer.")
@UseCase(description = "Convert data from one record-oriented format to another",
configuration = """
The Record Reader should be configured according to the incoming data format.
The Record Writer should be configured according to the desired output format."""
)
public class ConvertRecord extends AbstractRecordProcessor {
@Override

View File

@ -41,6 +41,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@ -101,14 +102,15 @@ import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
import java.util.zip.ZipOutputStream;
@SideEffectFree
@TriggerWhenEmpty
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"merge", "content", "correlation", "tar", "zip", "stream", "concatenation", "archive", "flowfile-stream", "flowfile-stream-v3"})
@CapabilityDescription("Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. "
+ "It is recommended that the Processor be configured with only a single incoming connection, as Group of FlowFiles will not be "
+ "created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate. "
+ "NOTE: this processor should NOT be configured with Cron Driven for the Scheduling Strategy.")
+ "It is recommended that the Processor be configured with only a single incoming connection, as Group of FlowFiles will not be "
+ "created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate. "
+ "NOTE: this processor should NOT be configured with Cron Driven for the Scheduling Strategy.")
@ReadsAttributes({
@ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the <Merge Strategy> property is set to Defragment. "
+ "All FlowFiles with the same value for this attribute will be bundled together."),
@ -146,8 +148,55 @@ import java.util.zip.ZipOutputStream;
})
@SeeAlso({SegmentContent.class, MergeRecord.class})
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "While content is not stored in memory, the FlowFiles' attributes are. " +
"The configuration of MergeContent (maximum bin size, maximum group size, maximum bin age, max number of entries) will influence how much " +
"memory is used. If merging together many small FlowFiles, a two-stage approach may be necessary in order to avoid excessive use of memory.")
"The configuration of MergeContent (maximum bin size, maximum group size, maximum bin age, max number of entries) will influence how much " +
"memory is used. If merging together many small FlowFiles, a two-stage approach may be necessary in order to avoid excessive use of memory.")
@UseCase(
description = "Concatenate FlowFiles with textual content together in order to create fewer, larger FlowFiles.",
keywords = {"concatenate", "bundle", "aggregate", "bin", "merge", "combine", "smash"},
configuration = """
"Merge Strategy" = "Bin Packing Algorithm"
"Merge Format" = "Binary Concatenation"
"Delimiter Strategy" = "Text"
"Demarcator" = "\\n" (a newline can be inserted by pressing Shift + Enter)
"Minimum Number of Entries" = "1"
"Maximum Number of Entries" = "500000000"
"Minimum Group Size" = the minimum amount of data to write to an output FlowFile. A reasonable value might be "128 MB"
"Maximum Group Size" = the maximum amount of data to write to an output FlowFile. A reasonable value might be "256 MB"
"Max Bin Age" = the maximum amount of time to wait for incoming data before timing out and transferring the FlowFile along even though it is smaller \
than the Max Bin Age. A reasonable value might be "5 mins"
"""
)
@UseCase(
description = "Concatenate FlowFiles with binary content together in order to create fewer, larger FlowFiles.",
notes = "Not all binary data can be concatenated together. Whether or not this configuration is valid depends on the type of your data.",
keywords = {"concatenate", "bundle", "aggregate", "bin", "merge", "combine", "smash"},
configuration = """
"Merge Strategy" = "Bin Packing Algorithm"
"Merge Format" = "Binary Concatenation"
"Delimiter Strategy" = "Text"
"Minimum Number of Entries" = "1"
"Maximum Number of Entries" = "500000000"
"Minimum Group Size" = the minimum amount of data to write to an output FlowFile. A reasonable value might be "128 MB"
"Maximum Group Size" = the maximum amount of data to write to an output FlowFile. A reasonable value might be "256 MB"
"Max Bin Age" = the maximum amount of time to wait for incoming data before timing out and transferring the FlowFile along even though it is smaller \
than the Max Bin Age. A reasonable value might be "5 mins"
"""
)
@UseCase(
description = "Reassemble a FlowFile that was previously split apart into smaller FlowFiles by a processor such as SplitText, UnpackContext, SplitRecord, etc.",
keywords = {"reassemble", "repack", "merge", "recombine"},
configuration = """
"Merge Strategy" = "Defragment"
"Merge Format" = the value of Merge Format depends on the desired output format. If the file was previously zipped together and was split apart by UnpackContent,
a Merge Format of "ZIP" makes sense. If it was previously a .tar file, a Merge Format of "TAR" makes sense. If the data is textual, "Binary Concatenation" can be
used to combine the text into a single document.
"Delimiter Strategy" = "Text"
"Max Bin Age" = the maximum amount of time to wait for incoming data before timing out and transferring the fragments to 'failure'. A reasonable value might be "5 mins"
For textual data, "Demarcator" should be set to a newline (\\n), set by pressing Shift+Enter in the UI. For binary data, "Demarcator" should be left blank.
"""
)
public class MergeContent extends BinFiles {
// preferred attributes
@ -157,35 +206,35 @@ public class MergeContent extends BinFiles {
public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final AllowableValue METADATA_STRATEGY_USE_FIRST = new AllowableValue("Use First Metadata", "Use First Metadata",
"For any input format that supports metadata (Avro, e.g.), the metadata for the first FlowFile in the bin will be set on the output FlowFile.");
"For any input format that supports metadata (Avro, e.g.), the metadata for the first FlowFile in the bin will be set on the output FlowFile.");
public static final AllowableValue METADATA_STRATEGY_ALL_COMMON = new AllowableValue("Keep Only Common Metadata", "Keep Only Common Metadata",
"For any input format that supports metadata (Avro, e.g.), any FlowFile whose metadata values match those of the first FlowFile, any additional metadata "
+ "will be dropped but the FlowFile will be merged. Any FlowFile whose metadata values do not match those of the first FlowFile in the bin will not be merged.");
"For any input format that supports metadata (Avro, e.g.), any FlowFile whose metadata values match those of the first FlowFile, any additional metadata "
+ "will be dropped but the FlowFile will be merged. Any FlowFile whose metadata values do not match those of the first FlowFile in the bin will not be merged.");
public static final AllowableValue METADATA_STRATEGY_IGNORE = new AllowableValue("Ignore Metadata", "Ignore Metadata",
"Ignores (does not transfer, compare, etc.) any metadata from a FlowFile whose content supports embedded metadata.");
"Ignores (does not transfer, compare, etc.) any metadata from a FlowFile whose content supports embedded metadata.");
public static final AllowableValue METADATA_STRATEGY_DO_NOT_MERGE = new AllowableValue("Do Not Merge Uncommon Metadata", "Do Not Merge Uncommon Metadata",
"For any input format that supports metadata (Avro, e.g.), any FlowFile whose metadata values do not match those of the first FlowFile in the bin will not be merged.");
"For any input format that supports metadata (Avro, e.g.), any FlowFile whose metadata values do not match those of the first FlowFile in the bin will not be merged.");
public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue(
"Bin-Packing Algorithm",
"Bin-Packing Algorithm",
"Generates 'bins' of FlowFiles and fills each bin as full as possible. FlowFiles are placed into a bin based on their size and optionally "
"Bin-Packing Algorithm",
"Bin-Packing Algorithm",
"Generates 'bins' of FlowFiles and fills each bin as full as possible. FlowFiles are placed into a bin based on their size and optionally "
+ "their attributes (if the <Correlation Attribute> property is set)");
public static final AllowableValue MERGE_STRATEGY_DEFRAGMENT = new AllowableValue(
"Defragment",
"Defragment",
"Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must "
"Defragment",
"Defragment",
"Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must "
+ "have the attributes <fragment.identifier>, <fragment.count>, and <fragment.index>. All FlowFiles with the same value for \"fragment.identifier\" "
+ "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. All FlowFiles "
+ "in this group must have a unique value for the \"fragment.index\" attribute between 0 and the value of the \"fragment.count\" attribute.");
public static final AllowableValue DELIMITER_STRATEGY_FILENAME = new AllowableValue(
"Filename", "Filename", "The values of Header, Footer, and Demarcator will be retrieved from the contents of a file");
"Filename", "Filename", "The values of Header, Footer, and Demarcator will be retrieved from the contents of a file");
public static final AllowableValue DELIMITER_STRATEGY_TEXT = new AllowableValue(
"Text", "Text", "The values of Header, Footer, and Demarcator will be specified as property values");
"Text", "Text", "The values of Header, Footer, and Demarcator will be specified as property values");
public static final AllowableValue DELIMITER_STRATEGY_NONE = new AllowableValue(
"Do Not Use Delimiters", "Do Not Use Delimiters", "No Header, Footer, or Demarcator will be used");
@ -198,38 +247,38 @@ public class MergeContent extends BinFiles {
public static final String MERGE_FORMAT_AVRO_VALUE = "Avro";
public static final AllowableValue MERGE_FORMAT_TAR = new AllowableValue(
MERGE_FORMAT_TAR_VALUE,
MERGE_FORMAT_TAR_VALUE,
"A bin of FlowFiles will be combined into a single TAR file. The FlowFiles' <path> attribute will be used to create a directory in the "
MERGE_FORMAT_TAR_VALUE,
MERGE_FORMAT_TAR_VALUE,
"A bin of FlowFiles will be combined into a single TAR file. The FlowFiles' <path> attribute will be used to create a directory in the "
+ "TAR file if the <Keep Paths> property is set to true; otherwise, all FlowFiles will be added at the root of the TAR file. "
+ "If a FlowFile has an attribute named <tar.permissions> that is 3 characters, each between 0-7, that attribute will be used "
+ "as the TAR entry's 'mode'.");
public static final AllowableValue MERGE_FORMAT_ZIP = new AllowableValue(
MERGE_FORMAT_ZIP_VALUE,
MERGE_FORMAT_ZIP_VALUE,
"A bin of FlowFiles will be combined into a single ZIP file. The FlowFiles' <path> attribute will be used to create a directory in the "
MERGE_FORMAT_ZIP_VALUE,
MERGE_FORMAT_ZIP_VALUE,
"A bin of FlowFiles will be combined into a single ZIP file. The FlowFiles' <path> attribute will be used to create a directory in the "
+ "ZIP file if the <Keep Paths> property is set to true; otherwise, all FlowFiles will be added at the root of the ZIP file. "
+ "The <Compression Level> property indicates the ZIP compression to use.");
public static final AllowableValue MERGE_FORMAT_FLOWFILE_STREAM_V3 = new AllowableValue(
MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE,
MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE,
"A bin of FlowFiles will be combined into a single Version 3 FlowFile Stream");
MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE,
MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE,
"A bin of FlowFiles will be combined into a single Version 3 FlowFile Stream");
public static final AllowableValue MERGE_FORMAT_FLOWFILE_STREAM_V2 = new AllowableValue(
MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE,
MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE,
"A bin of FlowFiles will be combined into a single Version 2 FlowFile Stream");
MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE,
MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE,
"A bin of FlowFiles will be combined into a single Version 2 FlowFile Stream");
public static final AllowableValue MERGE_FORMAT_FLOWFILE_TAR_V1 = new AllowableValue(
MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE,
MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE,
"A bin of FlowFiles will be combined into a single Version 1 FlowFile Package");
MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE,
MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE,
"A bin of FlowFiles will be combined into a single Version 1 FlowFile Package");
public static final AllowableValue MERGE_FORMAT_CONCAT = new AllowableValue(
MERGE_FORMAT_CONCAT_VALUE,
MERGE_FORMAT_CONCAT_VALUE,
"The contents of all FlowFiles will be concatenated together into a single FlowFile");
MERGE_FORMAT_CONCAT_VALUE,
MERGE_FORMAT_CONCAT_VALUE,
"The contents of all FlowFiles will be concatenated together into a single FlowFile");
public static final AllowableValue MERGE_FORMAT_AVRO = new AllowableValue(
MERGE_FORMAT_AVRO_VALUE,
MERGE_FORMAT_AVRO_VALUE,
"The Avro contents of all FlowFiles will be concatenated together into a single FlowFile");
MERGE_FORMAT_AVRO_VALUE,
MERGE_FORMAT_AVRO_VALUE,
"The Avro contents of all FlowFiles will be concatenated together into a single FlowFile");
public static final String TAR_PERMISSIONS_ATTRIBUTE = "tar.permissions";
@ -239,21 +288,21 @@ public class MergeContent extends BinFiles {
public static final String REASON_FOR_MERGING = "merge.reason";
public static final PropertyDescriptor MERGE_STRATEGY = new PropertyDescriptor.Builder()
.name("Merge Strategy")
.description("Specifies the algorithm used to merge content. The 'Defragment' algorithm combines fragments that are associated by "
+ "attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily "
+ "chosen FlowFiles")
.required(true)
.allowableValues(MERGE_STRATEGY_BIN_PACK, MERGE_STRATEGY_DEFRAGMENT)
.defaultValue(MERGE_STRATEGY_BIN_PACK.getValue())
.build();
.name("Merge Strategy")
.description("Specifies the algorithm used to merge content. The 'Defragment' algorithm combines fragments that are associated by "
+ "attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily "
+ "chosen FlowFiles")
.required(true)
.allowableValues(MERGE_STRATEGY_BIN_PACK, MERGE_STRATEGY_DEFRAGMENT)
.defaultValue(MERGE_STRATEGY_BIN_PACK.getValue())
.build();
public static final PropertyDescriptor MERGE_FORMAT = new PropertyDescriptor.Builder()
.required(true)
.name("Merge Format")
.description("Determines the format that will be used to merge the content.")
.allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT, MERGE_FORMAT_AVRO)
.defaultValue(MERGE_FORMAT_CONCAT.getValue())
.build();
.required(true)
.name("Merge Format")
.description("Determines the format that will be used to merge the content.")
.allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT, MERGE_FORMAT_AVRO)
.defaultValue(MERGE_FORMAT_CONCAT.getValue())
.build();
public static final PropertyDescriptor METADATA_STRATEGY = new PropertyDescriptor.Builder()
.required(true)
@ -270,85 +319,85 @@ public class MergeContent extends BinFiles {
.build();
public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("Correlation Attribute Name")
.description("If specified, like FlowFiles will be binned together, where 'like FlowFiles' means FlowFiles that have the same value for "
+ "this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
.defaultValue(null)
.dependsOn(MERGE_STRATEGY, MERGE_STRATEGY_BIN_PACK)
.build();
.name("Correlation Attribute Name")
.description("If specified, like FlowFiles will be binned together, where 'like FlowFiles' means FlowFiles that have the same value for "
+ "this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
.defaultValue(null)
.dependsOn(MERGE_STRATEGY, MERGE_STRATEGY_BIN_PACK)
.build();
public static final PropertyDescriptor DELIMITER_STRATEGY = new PropertyDescriptor.Builder()
.required(true)
.name("Delimiter Strategy")
.description("Determines if Header, Footer, and Demarcator should point to files containing the respective content, or if "
+ "the values of the properties should be used as the content.")
.allowableValues(DELIMITER_STRATEGY_NONE, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT)
.defaultValue(DELIMITER_STRATEGY_NONE.getValue())
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT_VALUE)
.build();
.required(true)
.name("Delimiter Strategy")
.description("Determines if Header, Footer, and Demarcator should point to files containing the respective content, or if "
+ "the values of the properties should be used as the content.")
.allowableValues(DELIMITER_STRATEGY_NONE, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT)
.defaultValue(DELIMITER_STRATEGY_NONE.getValue())
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT_VALUE)
.build();
public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
.name("Header File")
.displayName("Header")
.description("Filename or text specifying the header to use. If not specified, no header is supplied.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dependsOn(DELIMITER_STRATEGY, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT)
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT)
.build();
.name("Header File")
.displayName("Header")
.description("Filename or text specifying the header to use. If not specified, no header is supplied.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dependsOn(DELIMITER_STRATEGY, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT)
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT)
.build();
public static final PropertyDescriptor FOOTER = new PropertyDescriptor.Builder()
.name("Footer File")
.displayName("Footer")
.description("Filename or text specifying the footer to use. If not specified, no footer is supplied.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dependsOn(DELIMITER_STRATEGY, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT)
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT)
.build();
.name("Footer File")
.displayName("Footer")
.description("Filename or text specifying the footer to use. If not specified, no footer is supplied.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dependsOn(DELIMITER_STRATEGY, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT)
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT)
.build();
public static final PropertyDescriptor DEMARCATOR = new PropertyDescriptor.Builder()
.name("Demarcator File")
.displayName("Demarcator")
.description("Filename or text specifying the demarcator to use. If not specified, no demarcator is supplied.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dependsOn(DELIMITER_STRATEGY, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT)
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT)
.build();
.name("Demarcator File")
.displayName("Demarcator")
.description("Filename or text specifying the demarcator to use. If not specified, no demarcator is supplied.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dependsOn(DELIMITER_STRATEGY, DELIMITER_STRATEGY_FILENAME, DELIMITER_STRATEGY_TEXT)
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_CONCAT)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT)
.build();
public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder()
.name("Compression Level")
.description("Specifies the compression level to use when using the Zip Merge Format; if not using the Zip Merge Format, this value is "
+ "ignored")
.required(true)
.allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
.defaultValue("1")
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_ZIP)
.build();
.name("Compression Level")
.description("Specifies the compression level to use when using the Zip Merge Format; if not using the Zip Merge Format, this value is "
+ "ignored")
.required(true)
.allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
.defaultValue("1")
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_ZIP)
.build();
public static final PropertyDescriptor KEEP_PATH = new PropertyDescriptor.Builder()
.name("Keep Path")
.description("If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry names.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP)
.build();
.name("Keep Path")
.description("If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry names.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP)
.build();
public static final PropertyDescriptor TAR_MODIFIED_TIME = new PropertyDescriptor.Builder()
.name("Tar Modified Time")
.description("If using the Tar Merge Format, specifies if the Tar entry should store the modified timestamp either by expression "
+ "(e.g. ${file.lastModifiedTime} or static value, both of which must match the ISO8601 format 'yyyy-MM-dd'T'HH:mm:ssZ'.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("${file.lastModifiedTime}")
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_TAR)
.build();
.name("Tar Modified Time")
.description("If using the Tar Merge Format, specifies if the Tar entry should store the modified timestamp either by expression "
+ "(e.g. ${file.lastModifiedTime} or static value, both of which must match the ISO8601 format 'yyyy-MM-dd'T'HH:mm:ssZ'.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("${file.lastModifiedTime}")
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_TAR)
.build();
public static final Relationship REL_MERGED = new Relationship.Builder().name("merged").description("The FlowFile containing the merged content").build();
@ -429,7 +478,7 @@ public class MergeContent extends BinFiles {
@Override
protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session) {
final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME)
.evaluateAttributeExpressions(flowFile).getValue();
.evaluateAttributeExpressions(flowFile).getValue();
String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName);
// when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier
@ -559,7 +608,7 @@ public class MergeContent extends BinFiles {
decidedFragmentCount = fragmentCount;
} else if (!decidedFragmentCount.equals(fragmentCount)) {
return "Cannot Defragment " + flowFile + " because it is grouped with another FlowFile, and the two have differing values for the "
+ FRAGMENT_COUNT_ATTRIBUTE + " attribute: " + decidedFragmentCount + " and " + fragmentCount;
+ FRAGMENT_COUNT_ATTRIBUTE + " attribute: " + decidedFragmentCount + " and " + fragmentCount;
}
}
}
@ -602,7 +651,7 @@ public class MergeContent extends BinFiles {
session.remove(flowFile);
} catch (final Exception e) {
getLogger().error("Failed to remove merged FlowFile from the session after merge failure during \""
+ context.getProperty(MERGE_FORMAT).getValue() + "\" merge.", e);
+ context.getProperty(MERGE_FORMAT).getValue() + "\" merge.", e);
}
}
@ -684,7 +733,7 @@ public class MergeContent extends BinFiles {
}
private byte[] getDelimiterFileContent(final ProcessContext context, final List<FlowFile> flowFiles, final PropertyDescriptor descriptor)
throws IOException {
throws IOException {
byte[] property = null;
if (flowFiles != null && flowFiles.size() > 0) {
final FlowFile flowFile = flowFiles.get(0);
@ -766,7 +815,7 @@ public class MergeContent extends BinFiles {
@Override
public void process(final OutputStream rawOut) throws IOException {
try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut);
final TarArchiveOutputStream out = new TarArchiveOutputStream(bufferedOut)) {
final TarArchiveOutputStream out = new TarArchiveOutputStream(bufferedOut)) {
out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
// if any one of the FlowFiles is larger than the default maximum tar entry size, then we set bigNumberMode to handle it
@ -923,7 +972,7 @@ public class MergeContent extends BinFiles {
@Override
public void process(final OutputStream rawOut) throws IOException {
try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut);
final ZipOutputStream out = new ZipOutputStream(bufferedOut)) {
final ZipOutputStream out = new ZipOutputStream(bufferedOut)) {
out.setLevel(compressionLevel);
for (final FlowFile flowFile : contents) {
final String path = keepPath ? getPath(flowFile) : "";

View File

@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -93,7 +94,37 @@ import java.util.stream.Stream;
})
@Tags({"record", "partition", "recordpath", "rpath", "segment", "split", "group", "bin", "organize"})
@SeeAlso({ConvertRecord.class, SplitRecord.class, UpdateRecord.class, QueryRecord.class})
@UseCase(
description = "Separate records into separate FlowFiles so that all of the records in a FlowFile have the same value for a given field or set of fields.",
keywords = {"separate", "split", "partition", "break apart", "colocate", "segregate", "record", "field", "recordpath"},
configuration = """
Choose a RecordReader that is appropriate based on the format of the incoming data.
Choose a RecordWriter that writes the data in the desired output format.
Add a single additional property. The name of the property should describe the type of data that is being used to partition the data. \
The property's value should be a RecordPath that specifies which output FlowFile the Record belongs to.
For example, if we want to separate records based on their `transactionType` field, we could add a new property named `transactionType`. \
The value of the property might be `/transaction/type`. An input FlowFile will then be separated into as few FlowFiles as possible such that each \
output FlowFile has the same value for the `transactionType` field.
"""
)
@UseCase(
description = "Separate records based on whether or not they adhere to a specific criteria",
keywords = {"separate", "split", "partition", "break apart", "segregate", "record", "field", "recordpath", "criteria"},
configuration = """
Choose a RecordReader that is appropriate based on the format of the incoming data.
Choose a RecordWriter that writes the data in the desired output format.
Add a single additional property. The name of the property should describe the criteria. \
The property's value should be a RecordPath that returns `true` if the Record meets the criteria or `false otherwise.
For example, if we want to separate records based on whether or not they have a transaction total of more than $1,000 we could add a new property named \
`largeTransaction` with a value of `/transaction/total > 1000`. This will create two FlowFiles. In the first, all records will have a total over `1000`. \
In the second, all records will have a transaction less than or equal to 1000. Each FlowFile will have an attribute named `largeTransaction` with a value \
of `true` or `false`.
"""
)
public class PartitionRecord extends AbstractProcessor {
private final RecordPathCache recordPathCache = new RecordPathCache(25);
@ -421,4 +452,4 @@ public class PartitionRecord extends AbstractProcessor {
return "RecordMapValue[" + values + "]";
}
}
}
}

View File

@ -25,6 +25,7 @@ import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
@ -109,6 +110,7 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGIST
+ "will be used to determine the type of statement (INSERT, UPDATE, DELETE, SQL, etc.) to generate and execute.")
@WritesAttribute(attribute = PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR, description = "If an error occurs during processing, the flow file will be routed to failure or retry, and this attribute "
+ "will be populated with the cause of the error.")
@UseCase(description = "Insert records into a database")
public class PutDatabaseRecord extends AbstractProcessor {
public static final String UPDATE_TYPE = "UPDATE";

View File

@ -35,6 +35,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@ -113,6 +114,46 @@ import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE;
@WritesAttribute(attribute = "record.count", description = "The number of records selected by the query"),
@WritesAttribute(attribute = QueryRecord.ROUTE_ATTRIBUTE_KEY, description = "The relation to which the FlowFile was routed")
})
@UseCase(
description = "Filter out records based on the values of the records' fields",
keywords = {"filter out", "remove", "drop", "strip out", "record field", "sql"},
configuration = """
"Record Reader" should be set to a Record Reader that is appropriate for your data.
"Record Writer" should be set to a Record Writer that writes out data in the desired format.
One additional property should be added.
The name of the property should be a short description of the data to keep.
Its value is a SQL statement that selects all columns from a table named `FLOW_FILE` for relevant rows.
The WHERE clause selects the data to keep. I.e., it is the exact opposite of what we want to remove.
It is recommended to always quote column names using double-quotes in order to avoid conflicts with SQL keywords.
For example, to remove records where either the name is George OR the age is less than 18, we would add a \
property named "adults not george" with a value that selects records where the name is not George AND the age is greater than or equal to 18. \
So the value would be `SELECT * FROM FLOWFILE WHERE "name" <> 'George' AND "age" >= 18`
Adding this property now gives us a new Relationship whose name is the same as the property name. So, the "adults not george" Relationship \
should be connected to the next Processor in our flow.
"""
)
@UseCase(
description = "Keep only specific records",
keywords = {"keep", "filter", "retain", "select", "include", "record", "sql"},
configuration = """
"Record Reader" should be set to a Record Reader that is appropriate for your data.
"Record Writer" should be set to a Record Writer that writes out data in the desired format.
One additional property should be added.
The name of the property should be a short description of the data to keep.
Its value is a SQL statement that selects all columns from a table named `FLOW_FILE` for relevant rows.
The WHERE clause selects the data to keep.
It is recommended to always quote column names using double-quotes in order to avoid conflicts with SQL keywords.
For example, to keep only records where the person is an adult (aged 18 or older), add a property named "adults" \
with a value that is a SQL statement that selects records where the age is at least 18. \
So the value would be `SELECT * FROM FLOWFILE WHERE "age" >= 18`
Adding this property now gives us a new Relationship whose name is the same as the property name. So, the "adults" Relationship \
should be connected to the next Processor in our flow.
"""
)
public class QueryRecord extends AbstractProcessor {
public static final String ROUTE_ATTRIBUTE_KEY = "QueryRecord.Route";
@ -559,4 +600,4 @@ public class QueryRecord extends AbstractProcessor {
}
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException;
import org.apache.nifi.attribute.expression.language.exception.IllegalAttributeException;
@ -79,6 +80,68 @@ import java.util.regex.Pattern;
@CapabilityDescription("Updates the content of a FlowFile by searching for some textual value in the FlowFile content (via Regular Expression/regex, or literal value) and replacing the " +
"section of the content that matches with some alternate value. It can also be used to append or prepend text to the contents of a FlowFile.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@UseCase(
description = "Append text to the end of every line in a FlowFile",
keywords = {"raw text", "append", "line"},
configuration = """
"Evaluation Mode" = "Line-by-Line"
"Replacement Strategy" = "Append"
"Replacement Value" is set to whatever text should be appended to the line.
For example, to insert the text `<fin>` at the end of every line, we would set "Replacement Value" to `<fin>`.
We can also use Expression Language. So to insert the filename at the end of every line, we set "Replacement Value" to `${filename}`
"""
)
@UseCase(
description = "Prepend text to the beginning of every line in a FlowFile",
keywords = {"raw text", "prepend", "line"},
configuration = """
"Evaluation Mode" = "Line-by-Line"
"Replacement Strategy" = "Prepend"
"Replacement Value" is set to whatever text should be prepended to the line.
For example, to insert the text `<start>` at the beginning of every line, we would set "Replacement Value" to `<start>`.
We can also use Expression Language. So to insert the filename at the beginning of every line, we set "Replacement Value" to `${filename}`
"""
)
@UseCase(
description = "Replace every occurrence of a literal string in the FlowFile with a different value",
keywords = {"replace", "string", "text", "literal"},
configuration = """
"Evaluation Mode" = "Line-by-Line"
"Replacement Strategy" = "Literal Replace"
"Search Value" is set to whatever text is in the FlowFile that needs to be replaced.
"Replacement Value" is set to the text that should replace the current text.
For example, to replace the word "spider" with "arachnid" we set "Search Value" to `spider` and set "Replacement Value" to `arachnid`.
"""
)
@UseCase(
description = "Transform every occurrence of a literal string in a FlowFile",
keywords = {"replace", "transform", "raw text"},
configuration = """
"Evaluation Mode" = "Line-by-Line"
"Replacement Strategy" = "Regex Replace"
"Search Value" is set to a regular expression that matches the text that should be transformed in a capturing group.
"Replacement Value" is set to a NiFi Expression Language expression that references `$1` (in quotes to escape the reference name).
For example, if we wanted to lowercase any occurrence of WOLF, TIGER, or LION, we would use a "Search Value" of `(WOLF|TIGER|LION)` \
and a "Replacement Value" of `${'$1':toLower()}`.
If we want to replace any identifier with a hash of that identifier, we might use a "Search Value" of `identifier: (.*)` and a \
"Replacement Value" of `identifier: ${'$1':hash('sha256')}`
"""
)
@UseCase(
description = "Completely replace the contents of a FlowFile to a specific text",
keywords = {"replace", "raw text"},
configuration = """
"Evaluation Mode" = "Entire text"
"Replacement Strategy" = "Always Replace"
"Replacement Value" is set to the new text that should be written to the FlowFile. \
This text might include NiFi Expression Language to reference one or more attributes.
"""
)
public class ReplaceText extends AbstractProcessor {
private static Pattern REPLACEMENT_NORMALIZATION_PATTERN = Pattern.compile("(\\$\\D)");
@ -831,4 +894,4 @@ public class ReplaceText extends AbstractProcessor {
}
}
}
}
}

View File

@ -16,15 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
@ -36,6 +27,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
@ -51,6 +43,16 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
/**
* <p>
* This processor routes a FlowFile based on its flow file attributes by using the Attribute Expression Language. The Expression Language is used by adding Optional Properties to the processor. The
@ -72,6 +74,65 @@ import org.apache.nifi.processor.util.StandardValidators;
@WritesAttributes({
@WritesAttribute(attribute = RouteOnAttribute.ROUTE_ATTRIBUTE_KEY, description = "The relation to which the FlowFile was routed")
})
@UseCase(
description = "Route data to one or more relationships based on its attributes.",
keywords = {"attributes", "routing", "expression language"},
configuration = """
Set the "Routing Strategy" property to "Route to Property name".
For each route that a FlowFile might be routed to, add a new property. The name of the property should describe the route.
The value of the property is an Attribute Expression Language expression that will be used to determine whether or not a given FlowFile will be routed to the \
associated relationship.
For example, we might route data based on its file extension using the following properties:
- "Routing Strategy" = "Route to Property Name"
- "jpg" = "${filename:endsWith('.jpg')}"
- "png" = "${filename:endsWith('.png')}"
- "pdf" = "${filename:endsWith('.pdf')}"
The Processor will now have 3 relationships: `jpg`, `png`, and `pdf`. Each of these should be connected to the appropriate downstream processor.
"""
)
@UseCase(
description = "Keep data only if its attributes meet some criteria, such as its filename ends with .txt.",
keywords = {"keep", "filter", "remove", "delete", "expression language"},
configuration = """
Add a new property for each condition that must be satisfied in order to keep the data.
If the data should be kept in the case that any of the provided conditions is met, set the "Routing Strategy" property to "Route to 'matched' if any matches".
If all conditions must be met in order to keep the data, set the "Routing Strategy" property to "Route to 'matched' if all match".
For example, to keep files whose filename ends with .txt and have a file size of at least 1000 bytes, we will use the following properties:
- "ends_with_txt" = "${filename:endsWith('.txt')}"
- "large_enough" = "${fileSize:ge(1000)}
- "Routing Strategy" = "Route to 'matched' if all match"
Auto-terminate the 'unmatched' relationship.
Connect the 'matched' relationship to the next processor in the flow.
"""
)
@UseCase(
description = "Discard or drop a file based on attributes, such as filename.",
keywords = {"discard", "drop", "filter", "remove", "delete", "expression language"},
configuration = """
Add a new property for each condition that must be satisfied in order to drop the data.
If the data should be dropped in the case that any of the provided conditions is met, set the "Routing Strategy" property to "Route to 'matched' if any matches".
If all conditions must be met in order to drop the data, set the "Routing Strategy" property to "Route to 'matched' if all match".
Here are a couple of examples for configuring the properties:
Example 1 Use Case: Data should be dropped if its "uuid" attribute has an 'a' in it or ends with '0'.
Here, we will use the following properties:
- "has_a" = "${uuid:contains('a')}"
- "ends_with_0" = "${uuid:endsWith('0')}
- "Routing Strategy" = "Route to 'matched' if any matches"
Example 2 Use Case: Data should be dropped if its 'uuid' attribute has an 'a' AND it ends with a '1'.
Here, we will use the following properties:
- "has_a" = "${uuid:contains('a')}"
- "ends_with_1" = "${uuid:endsWith('1')}
- "Routing Strategy" = "Route to 'matched' if all match"
Auto-terminate the 'matched' relationship.
Connect the 'unmatched' relationship to the next processor in the flow.
"""
)
public class RouteOnAttribute extends AbstractProcessor {
public static final String ROUTE_ATTRIBUTE_KEY = "RouteOnAttribute.Route";
@ -277,4 +338,4 @@ public class RouteOnAttribute extends AbstractProcessor {
session.transfer(flowFile, firstRelationship);
}
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
@ -82,6 +83,37 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "RouteText.Group", description = "The value captured by all capturing groups in the 'Grouping Regular Expression' property. "
+ "If this property is not set or contains no capturing groups, this attribute will not be added.")
})
@UseCase(
description = "Drop blank or empty lines from the FlowFile's content.",
keywords = {"filter", "drop", "empty", "blank", "remove", "delete", "strip out", "lines", "text"},
configuration = """
"Routing Strategy" = "Route to each matching Property Name"
"Matching Strategy" = "Matches Regular Expression"
"Empty Line" = "^$"
Auto-terminate the "Empty Line" relationship.
Connect the "unmatched" relationship to the next processor in your flow.
"""
)
@UseCase(
description = "Remove specific lines of text from a file, such as those containing a specific word or having a line length over some threshold.",
keywords = {"filter", "drop", "empty", "blank", "remove", "delete", "strip out", "lines", "text", "expression language"},
configuration = """
"Routing Strategy" = "Route to each matching Property Name"
"Matching Strategy" = "Satisfies Expression"
An additional property should be added named "Filter Out." The value should be a NiFi Expression Language Expression that can refer to two variables \
(in addition to FlowFile attributes): `line`, which is the line of text being evaluated; and `lineNo`, which is the line number in the file (starting with 1). \
The Expression should return `true` for any line that should be dropped.
For example, to remove any line that starts with a # symbol, we can set "Filter Out" to `${line:startsWith("#")}`.
We could also remove the first 2 lines of text by setting "Filter Out" to `${lineNo:le(2)}`. Note that we use the `le` function because we want lines numbers \
less than or equal to `2`, since the line index is 1-based.
Auto-terminate the "Filter Out" relationship.
Connect the "unmatched" relationship to the next processor in your flow.
"""
)
public class RouteText extends AbstractProcessor {
public static final String ROUTE_ATTRIBUTE_KEY = "RouteText.Route";
@ -646,4 +678,4 @@ public class RouteText extends AbstractProcessor {
}
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
@ -75,6 +76,65 @@ import java.util.stream.Stream;
description = "Allows users to specify values to use to replace fields in the record that match the RecordPath.",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@SeeAlso({ConvertRecord.class})
@UseCase(
description = "Combine multiple fields into a single field.",
keywords = {"combine", "concatenate", "recordpath"},
configuration = """
"Replacement Value Strategy" = "Record Path Value"
A single additional property is added to the Processor. The name of the property is a RecordPath identifying the field to place the result in.
The value of the property uses the CONCAT Record Path function to concatenate multiple values together, potentially using other string literal values.
For example, to combine the `title`, `firstName` and `lastName` fields into a single field named `fullName`, we add a property with the name `/fullName` \
and a value of `CONCAT(/title, ' ', /firstName, ' ', /lastName)`
"""
)
@UseCase(
description = "Change the value of a record field to an explicit value.",
keywords = {"change", "update", "replace", "transform"},
configuration = """
"Replacement Value Strategy" = "Literal Value"
A single additional property is added to the Processor. The name of the property is a RecordPath identifying the field to place the result in.
The value of the property is the explicit value to set the field to. For example, we can set any field with a name of `txId`, regardless of its level in the data's hierarchy, \
to `1111-1111` by adding a property with a name of `//txId` and a value of `1111-1111`
"""
)
@UseCase(
description = "Copy the value of one record field to another record field.",
keywords = {"change", "update", "copy", "recordpath", "hierarchy", "transform"},
configuration = """
"Replacement Value Strategy" = "Record Path Value"
A single additional property is added to the Processor. The name of the property is a RecordPath identifying the field to update.
The value of the property is a RecordPath identifying the field to copy the value from.
For example, we can copy the value of `/identifiers/all/imei` to the `identifier` field at the root level, by adding a property named \
`/identifier` with a value of `/identifiers/all/imei`.
"""
)
@UseCase(
description = "Enrich data by injecting the value of an attribute into each Record.",
keywords = {"enrich", "attribute", "change", "update", "replace", "insert", "transform"},
configuration = """
"Replacement Value Strategy" = "Literal Value"
A single additional property is added to the Processor. The name of the property is a RecordPath identifying the field to place the result in.
The value of the property is an Expression Language expression that references the attribute of interest. We can, for example, insert a new field name \
`filename` into each record by adding a property named `/filename` with a value of `${filename}`
"""
)
@UseCase(
description = "Change the format of a record field.",
keywords = {"change", "update", "replace", "insert", "transform", "format", "date/time", "timezone", "expression language"},
configuration = """
"Replacement Value Strategy" = "Literal Value"
A single additional property is added to the Processor. The name of the property is a RecordPath identifying the field to update.
The value is an Expression Language expression that references the `field.name` variable. For example, to change the date/time format of \
a field named `txDate` from `year-month-day` format to `month/day/year` format, we add a property named `/txDate` with a value of \
`${field.value:toDate('yyyy-MM-dd'):format('MM/dd/yyyy')}`. We could also change the timezone of a timestamp field (and insert the timezone for clarity) by using a value of \
`${field.value:toDate('yyyy-MM-dd HH:mm:ss', 'UTC-0400'):format('yyyy-MM-dd HH:mm:ss Z', 'UTC')}`.
"""
)
public class UpdateRecord extends AbstractRecordProcessor {
private static final String FIELD_NAME = "field.name";
private static final String FIELD_VALUE = "field.value";
@ -291,4 +351,4 @@ public class UpdateRecord extends AbstractRecordProcessor {
return selectedFields.get(0);
}
}
}
}