NIFI-1121: Added API changes for having one Property depend on another

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2020-07-06 17:19:28 -04:00 committed by Bryan Bende
parent d8d9aa9cec
commit f7f336a4b0
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
7 changed files with 309 additions and 63 deletions

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.components;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class PropertyDependency {
private final String propertyName;
private final Set<String> dependentValues;
/**
* Creates a dependency that is satisfied if any value is set for the property with the given name
* @param propertyName the name of the property that is depended upon
*/
public PropertyDependency(final String propertyName) {
this.propertyName = Objects.requireNonNull(propertyName);
this.dependentValues = null;
}
/**
* Creates a dependency that is satisfied only if the property with the given name has a value that is in the given set of dependent values
* @param propertyName the name of the property that is depended upon
* @param dependentValues the values that satisfy the dependency
*/
public PropertyDependency(final String propertyName, final Set<String> dependentValues) {
this.propertyName = Objects.requireNonNull(propertyName);
this.dependentValues = Collections.unmodifiableSet(new HashSet<>(Objects.requireNonNull(dependentValues)));
}
/**
* @return the name of the property that is depended upon
*/
public String getPropertyName() {
return propertyName;
}
/**
* @return the Set of values that satisfy the dependency
*/
public Set<String> getDependentValues() {
return dependentValues;
}
@Override
public String toString() {
return "PropertyDependency[propertyName=" + propertyName + ", dependentValues=" + dependentValues + "]";
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final PropertyDependency that = (PropertyDependency) o;
return Objects.equals(getPropertyName(), that.getPropertyName()) &&
Objects.equals(getDependentValues(), that.getDependentValues());
}
@Override
public int hashCode() {
return Objects.hash(getPropertyName(), getDependentValues());
}
}

View File

@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.controller.ControllerService;
@ -104,12 +105,18 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
*/
private final List<Validator> validators;
/**
* The list of dependencies that this property has on other properties
*/
private final Set<PropertyDependency> dependencies;
protected PropertyDescriptor(final Builder builder) {
this.displayName = builder.displayName == null ? builder.name : builder.displayName;
this.name = builder.name;
this.description = builder.description;
this.defaultValue = builder.defaultValue;
this.allowableValues = builder.allowableValues;
this.allowableValues = builder.allowableValues == null ? null : Collections.unmodifiableList(new ArrayList<>(builder.allowableValues));
this.required = builder.required;
this.sensitive = builder.sensitive;
this.dynamic = builder.dynamic;
@ -117,7 +124,8 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
this.expressionLanguageSupported = builder.expressionLanguageSupported;
this.expressionLanguageScope = builder.expressionLanguageScope;
this.controllerServiceDefinition = builder.controllerServiceDefinition;
this.validators = new ArrayList<>(builder.validators);
this.validators = Collections.unmodifiableList(new ArrayList<>(builder.validators));
this.dependencies = builder.dependencies == null ? Collections.emptySet() : Collections.unmodifiableSet(new HashSet<>(builder.dependencies));
}
@Override
@ -185,6 +193,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
private String description = "";
private String defaultValue = null;
private List<AllowableValue> allowableValues = null;
private Set<PropertyDependency> dependencies = null;
private boolean required = false;
private boolean sensitive = false;
@ -211,6 +220,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
this.expressionLanguageScope = specDescriptor.expressionLanguageScope;
this.controllerServiceDefinition = specDescriptor.getControllerServiceDefinition();
this.validators = new ArrayList<>(specDescriptor.validators);
this.dependencies = new HashSet<>(specDescriptor.dependencies);
return this;
}
@ -443,6 +453,74 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
return false;
}
/**
* Establishes a relationship between this Property and the given property by declaring that this Property is only relevant if the given Property has a non-null value.
* Furthermore, if one or more explicit Allowable Values are provided, this Property will not be relevant unless the given Property's value is equal to one of the given Allowable Values.
* If this method is called multiple times, each with a different dependency, then a relationship is established such that this Property is relevant only if all dependencies are satisfied.
*
* @param property the property that must be set in order for this property to become relevant
* @param dependentValues the possible values for the given property for which this Property is relevant
* @return the builder
*/
public Builder dependsOn(final PropertyDescriptor property, final AllowableValue... dependentValues) {
if (dependencies == null) {
dependencies = new HashSet<>();
}
if (dependentValues.length == 0) {
dependencies.add(new PropertyDependency(property.getName()));
} else {
final Set<String> dependentValueSet = new HashSet<>();
for (final AllowableValue value : dependentValues) {
dependentValueSet.add(value.getValue());
}
dependencies.add(new PropertyDependency(property.getName(), dependentValueSet));
}
return this;
}
/**
* Establishes a relationship between this Property and the given property by declaring that this Property is only relevant if the given Property has a non-null value.
* Furthermore, if one or more explicit Allowable Values are provided, this Property will not be relevant unless the given Property's value is equal to one of the given Allowable Values.
* If this method is called multiple times, each with a different dependency, then a relationship is established such that this Property is relevant only if all dependencies are satisfied.
*
* @param property the property that must be set in order for this property to become relevant
* @param dependentValues the possible values for the given property for which this Property is relevant
* @return the builder
*/
public Builder dependsOn(final PropertyDescriptor property, final String... dependentValues) {
return dependsOn(property.getName(), dependentValues);
}
/**
* Establishes a relationship between this Property and the given property by declaring that this Property is only relevant if the given Property has a non-null value.
* Furthermore, if one or more explicit Allowable Values are provided, this Property will not be relevant unless the given Property's value is equal to one of the given Allowable Values.
* If this method is called multiple times, each with a different dependency, then a relationship is established such that this Property is relevant only if all dependencies are satisfied.
*
* @param propertyName the property that must be set in order for this property to become relevant
* @param dependentValues the possible values for the given property for which this Property is relevant
* @return the builder
*/
public Builder dependsOn(final String propertyName, final String... dependentValues) {
if (dependencies == null) {
dependencies = new HashSet<>();
}
if (dependentValues.length == 0) {
dependencies.add(new PropertyDependency(propertyName));
} else {
final Set<String> dependentValueSet = new HashSet<>(Arrays.asList(dependentValues));
dependencies.add(new PropertyDependency(propertyName, dependentValueSet));
}
return this;
}
/**
* @return a PropertyDescriptor as configured
*
@ -507,11 +585,15 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
}
public List<Validator> getValidators() {
return Collections.unmodifiableList(validators);
return validators;
}
public List<AllowableValue> getAllowableValues() {
return allowableValues == null ? null : Collections.unmodifiableList(allowableValues);
return allowableValues;
}
public Set<PropertyDependency> getDependencies() {
return dependencies;
}
@Override

View File

@ -52,14 +52,6 @@ public class SchemaAccessUtils {
+ "This is based on version 3.2.x of the Confluent Schema Registry.");
public static final AllowableValue INFER_SCHEMA = new AllowableValue("infer", "Infer from Result");
public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
.name("schema-registry")
.displayName("Schema Registry")
.description("Specifies the Controller Service to use for the Schema Registry")
.identifiesControllerService(SchemaRegistry.class)
.required(false)
.build();
public static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
.name("schema-access-strategy")
.displayName("Schema Access Strategy")
@ -69,6 +61,15 @@ public class SchemaAccessUtils {
.required(true)
.build();
public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
.name("schema-registry")
.displayName("Schema Registry")
.description("Specifies the Controller Service to use for the Schema Registry")
.identifiesControllerService(SchemaRegistry.class)
.required(false)
.dependsOn(SCHEMA_ACCESS_STRATEGY, SCHEMA_NAME_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA)
.build();
public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
.name("schema-name")
.displayName("Schema Name")
@ -76,6 +77,7 @@ public class SchemaAccessUtils {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("${schema.name}")
.dependsOn(SCHEMA_ACCESS_STRATEGY, SCHEMA_NAME_PROPERTY)
.required(false)
.build();
@ -86,6 +88,7 @@ public class SchemaAccessUtils {
"If the chosen Schema Registry does not support branching, this value will be ignored.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dependsOn(SCHEMA_ACCESS_STRATEGY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
.required(false)
.build();
@ -96,6 +99,7 @@ public class SchemaAccessUtils {
"If not specified then the latest version of the schema will be retrieved.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dependsOn(SCHEMA_ACCESS_STRATEGY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
.required(false)
.build();
@ -106,6 +110,7 @@ public class SchemaAccessUtils {
.addValidator(new AvroSchemaValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("${avro.schema}")
.dependsOn(SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY)
.required(false)
.build();

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto;
import io.swagger.annotations.ApiModelProperty;
import javax.xml.bind.annotation.XmlType;
import java.util.Set;
@XmlType(name = "propertyDependency")
public class PropertyDependencyDTO {
private String propertyName;
private Set<String> dependentValues;
@ApiModelProperty("The name of the property that is being depended upon")
public String getPropertyName() {
return propertyName;
}
public void setPropertyName(final String propertyName) {
this.propertyName = propertyName;
}
@ApiModelProperty("The values for the property that satisfies the dependency, or null if the dependency is satisfied by the presence of any value for the associated property name")
public Set<String> getDependentValues() {
return dependentValues;
}
public void setDependentValues(final Set<String> dependentValues) {
this.dependentValues = dependentValues;
}
}

View File

@ -40,6 +40,7 @@ public class PropertyDescriptorDTO {
private String expressionLanguageScope;
private String identifiesControllerService;
private BundleDTO identifiesControllerServiceBundle;
private List<PropertyDependencyDTO> dependencies;
/**
* @return set of allowable values for this property. If empty then the allowable values are not constrained
@ -208,4 +209,14 @@ public class PropertyDescriptorDTO {
public void setIdentifiesControllerServiceBundle(BundleDTO identifiesControllerServiceBundle) {
this.identifiesControllerServiceBundle = identifiesControllerServiceBundle;
}
@ApiModelProperty(value="A list of dependencies that must be met in order for this Property to be relevant. If any of these dependencies is not met, the property described by this " +
"Property Descriptor is not relevant.")
public List<PropertyDependencyDTO> getDependencies() {
return dependencies;
}
public void setDependencies(final List<PropertyDependencyDTO> dependencies) {
this.dependencies = dependencies;
}
}

View File

@ -61,6 +61,7 @@ import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.manager.StatusMerger;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDependency;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
@ -4065,6 +4066,20 @@ public final class DtoFactory {
dto.setAllowableValues(allowableValues);
}
// Add any dependencies
final Set<PropertyDependency> dependencies = propertyDescriptor.getDependencies();
final List<PropertyDependencyDTO> dependencyDtos = dependencies.stream()
.map(this::createPropertyDependencyDto)
.collect(Collectors.toList());
dto.setDependencies(dependencyDtos);
return dto;
}
private PropertyDependencyDTO createPropertyDependencyDto(final PropertyDependency dependency) {
final PropertyDependencyDTO dto = new PropertyDependencyDTO();
dto.setPropertyName(dependency.getPropertyName());
dto.setDependentValues(dependency.getDependentValues());
return dto;
}

View File

@ -16,26 +16,9 @@
*/
package org.apache.nifi.processors.standard;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import lzma.sdk.lzma.Decoder;
import lzma.streams.LzmaInputStream;
import lzma.streams.LzmaOutputStream;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
@ -54,7 +37,6 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
@ -76,9 +58,25 @@ import org.xerial.snappy.SnappyHadoopCompatibleOutputStream;
import org.xerial.snappy.SnappyInputStream;
import org.xerial.snappy.SnappyOutputStream;
import lzma.sdk.lzma.Decoder;
import lzma.streams.LzmaInputStream;
import lzma.streams.LzmaOutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
@EventDriven
@SideEffectFree
@ -111,47 +109,50 @@ public class CompressContent extends AbstractProcessor {
public static final String MODE_DECOMPRESS = "decompress";
public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
.name("Compression Format")
.description("The compression format to use. Valid values are: GZIP, Deflate, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Hadoop, Snappy Framed, and LZ4-Framed")
.allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_BZIP2,
.name("Compression Format")
.description("The compression format to use. Valid values are: GZIP, Deflate, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Hadoop, Snappy Framed, and LZ4-Framed")
.allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_BZIP2,
COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_HADOOP, COMPRESSION_FORMAT_SNAPPY_FRAMED,
COMPRESSION_FORMAT_LZ4_FRAMED)
.defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
.required(true)
.build();
.defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
.required(true)
.build();
public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("Mode")
.description("Indicates whether the processor should compress content or decompress content. Must be either 'compress' or 'decompress'")
.allowableValues(MODE_COMPRESS, MODE_DECOMPRESS)
.defaultValue(MODE_COMPRESS)
.required(true)
.build();
public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder()
.name("Compression Level")
.description("The compression level to use; this is valid only when using gzip, deflate or xz-lzma2 compression. A lower value results in faster processing "
+ "but less compression; a value of 0 indicates no (that is, simple archiving) for gzip or minimal for xz-lzma2 compression."
+ " Higher levels can mean much larger memory usage such as the case with levels 7-9 for xz-lzma/2 so be careful relative to heap size.")
.name("Compression Level")
.description("The compression level to use; this is valid only when using gzip, deflate or xz-lzma2 compression. A lower value results in faster processing "
+ "but less compression; a value of 0 indicates no (that is, simple archiving) for gzip or minimal for xz-lzma2 compression."
+ " Higher levels can mean much larger memory usage such as the case with levels 7-9 for xz-lzma/2 so be careful relative to heap size.")
.defaultValue("1")
.required(true)
.allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
.dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_XZ_LZMA2)
.dependsOn(MODE, MODE_COMPRESS)
.build();
public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("Mode")
.description("Indicates whether the processor should compress content or decompress content. Must be either 'compress' or 'decompress'.")
.allowableValues(MODE_COMPRESS, MODE_DECOMPRESS)
.defaultValue(MODE_COMPRESS)
.required(true)
.build();
public static final PropertyDescriptor UPDATE_FILENAME = new PropertyDescriptor.Builder()
.name("Update Filename")
.description("If true, will remove the filename extension when decompressing data (only if the extension indicates the appropriate "
+ "compression format) and add the appropriate extension when compressing data")
.name("Update Filename")
.description("If true, will remove the filename extension when decompressing data (only if the extension indicates the appropriate "
+ "compression format) and add the appropriate extension when compressing data")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed")
.build();
.name("success")
.description("FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles will be transferred to the failure relationship if they fail to compress/decompress")
.build();
.name("failure")
.description("FlowFiles will be transferred to the failure relationship if they fail to compress/decompress")
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@ -200,7 +201,6 @@ public class CompressContent extends AbstractProcessor {
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
final Validator rateValidator;
if (context.getProperty(COMPRESSION_FORMAT).getValue().toLowerCase().equals(COMPRESSION_FORMAT_SNAPPY_HADOOP)
&& context.getProperty(MODE).getValue().toLowerCase().equals(MODE_DECOMPRESS)) {
validationResults.add(new ValidationResult.Builder().subject(COMPRESSION_FORMAT.getName())