NIFI-11910: Clarified some points in docs

This closes #7574

Signed-off-by: Chris Sampson <chris.sampson82@gmail.com>
This commit is contained in:
Mark Payne 2023-08-03 17:25:54 -04:00 committed by Chris Sampson
parent ff87060b16
commit 964f56043c
No known key found for this signature in database
GPG Key ID: 546AEB0826587237
19 changed files with 214 additions and 209 deletions

View File

@ -124,6 +124,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
.name("Region")
.description("The AWS Region to connect to.")
.required(true)
.allowableValues(getAvailableRegions())
.defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
@ -131,6 +132,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout")
.description("The amount of time to wait in order to establish a connection to AWS or receive data from AWS before timing out.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 secs")

View File

@ -33,7 +33,6 @@ import com.amazonaws.services.s3.model.EmailAddressGrantee;
import com.amazonaws.services.s3.model.Grantee;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
import java.util.Map;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -48,9 +47,9 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AwsClientDetails;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.processors.aws.AwsClientDetails;
import org.apache.nifi.processors.aws.AwsPropertyDescriptors;
import org.apache.nifi.processors.aws.signer.AwsCustomSignerUtil;
import org.apache.nifi.processors.aws.signer.AwsSignerType;
@ -59,6 +58,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import static java.lang.String.format;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V2_SIGNER;
@ -70,70 +70,72 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
public static final PropertyDescriptor FULL_CONTROL_USER_LIST = new PropertyDescriptor.Builder()
.name("FullControl User List")
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Full Control for an object")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Full Control for an object")
.defaultValue("${s3.permissions.full.users}")
.build();
public static final PropertyDescriptor READ_USER_LIST = new PropertyDescriptor.Builder()
.name("Read Permission User List")
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Read Access for an object")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Read Access for an object")
.defaultValue("${s3.permissions.read.users}")
.build();
public static final PropertyDescriptor WRITE_USER_LIST = new PropertyDescriptor.Builder()
.name("Write Permission User List")
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Write Access for an object")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Write Access for an object")
.defaultValue("${s3.permissions.write.users}")
.build();
public static final PropertyDescriptor READ_ACL_LIST = new PropertyDescriptor.Builder()
.name("Read ACL User List")
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to read the Access Control List for an object")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to read the Access Control List for an object")
.defaultValue("${s3.permissions.readacl.users}")
.build();
public static final PropertyDescriptor WRITE_ACL_LIST = new PropertyDescriptor.Builder()
.name("Write ACL User List")
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to change the Access Control List for an object")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to change the Access Control List for an object")
.defaultValue("${s3.permissions.writeacl.users}")
.build();
public static final PropertyDescriptor CANNED_ACL = new PropertyDescriptor.Builder()
.name("canned-acl")
.displayName("Canned ACL")
.description("Amazon Canned ACL for an object, one of: BucketOwnerFullControl, BucketOwnerRead, LogDeliveryWrite, AuthenticatedRead, PublicReadWrite, PublicRead, Private; " +
"will be ignored if any other ACL/permission/owner property is specified")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("Amazon Canned ACL for an object, one of: BucketOwnerFullControl, BucketOwnerRead, LogDeliveryWrite, AuthenticatedRead, PublicReadWrite, PublicRead, Private; " +
"will be ignored if any other ACL/permission/owner property is specified")
.defaultValue("${s3.permissions.cannedacl}")
.build();
public static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
.name("Owner")
.description("The Amazon ID to use for the object's owner")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("The Amazon ID to use for the object's owner")
.defaultValue("${s3.owner}")
.build();
public static final PropertyDescriptor BUCKET = new PropertyDescriptor.Builder()
.name("Bucket")
.description("The S3 Bucket to interact with")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
.name("Object Key")
.description("The S3 Object Key to use. This is analogous to a filename for traditional file systems.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)

View File

@ -16,16 +16,10 @@
*/
package org.apache.nifi.processors.aws.s3;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteVersionRequest;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -41,6 +35,11 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@SupportsBatching
@WritesAttributes({
@ -52,8 +51,7 @@ import org.apache.nifi.processor.util.StandardValidators;
@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " +
"If attempting to delete a file that does not exist, FlowFile is routed to success.")
@CapabilityDescription("Deletes a file from an Amazon S3 Bucket. If attempting to delete a file that does not exist, FlowFile is routed to success.")
public class DeleteS3Object extends AbstractS3Processor {
public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()

View File

@ -246,9 +246,9 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
+ "the S3 bucket. This sets the 'x-amz-request-payer' header to 'requester'. Note that this "
+ "setting is not applicable when 'Use Versions' is 'true'.")
.addValidator(createRequesterPaysValidator())
.allowableValues(new AllowableValue("true", "True", "Indicates that the requester consents to pay any charges associated "
+ "with listing the S3 bucket."), new AllowableValue("false", "False", "Does not consent to pay "
+ "requester charges for listing the S3 bucket."))
.allowableValues(
new AllowableValue("true", "True", "Indicates that the requester consents to pay any charges associated with listing the S3 bucket."),
new AllowableValue("false", "False", "Does not consent to pay requester charges for listing the S3 bucket."))
.defaultValue("false")
.build();

View File

@ -64,7 +64,8 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
.build();
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name("directory-name").displayName("Directory Name")
.name("directory-name")
.displayName("Directory Name")
.description("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. The root directory can be designated by the empty string value. " +
"In case of the PutAzureDataLakeStorage processor, the directory will be created if not already existing.")
.addValidator(new DirectoryValidator())
@ -81,11 +82,13 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
.defaultValue(String.format("${%s}", ATTR_NAME_FILENAME))
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(
"Files that have been successfully written to Azure storage are transferred to this relationship")
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Files that have been successfully written to Azure storage are transferred to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description(
"Files that could not be written to Azure storage for some reason are transferred to this relationship")
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Files that could not be written to Azure storage for some reason are transferred to this relationship")
.build();
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(

View File

@ -69,7 +69,7 @@ import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileR
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
@CapabilityDescription("Writes the contents of a FlowFile as a file on Azure Data Lake Storage Gen 2")
@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_FILESYSTEM, description = ATTR_DESCRIPTION_FILESYSTEM),
@WritesAttribute(attribute = ATTR_NAME_DIRECTORY, description = ATTR_DESCRIPTION_DIRECTORY),
@WritesAttribute(attribute = ATTR_NAME_FILENAME, description = ATTR_DESCRIPTION_FILENAME),

View File

@ -85,8 +85,9 @@ import java.util.zip.InflaterInputStream;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"content", "compress", "recompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy-hadoop", "snappy framed", "lz4-framed", "deflate", "zstd", "brotli"})
@CapabilityDescription("Decompresses the contents of FlowFiles using a user-specified compression algorithm and recompresses the contents using the specified compression format properties. "
+ "This processor operates in a very memory efficient way so very large objects well beyond the heap size are generally fine to process")
@CapabilityDescription("Changes the compression algorithm used to compress the contents of a FlowFile by decompressing the contents of FlowFiles using a user-specified compression algorithm and " +
"recompressing the contents using the specified compression format properties. This processor operates in a very memory efficient way so very large objects well beyond " +
"the heap size are generally fine to process")
@ReadsAttribute(attribute = "mime.type", description = "If the Decompression Format is set to 'use mime.type attribute', this attribute is used to "
+ "determine the decompression type. Otherwise, this attribute is ignored.")
@WritesAttribute(attribute = "mime.type", description = "The appropriate MIME Type is set based on the value of the Compression Format property. If the Compression Format is 'no compression' this "

View File

@ -17,18 +17,15 @@
package org.apache.nifi.processors.gcp.bigquery;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BQTableSchemaToProtoDescriptor;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BQTableSchemaToProtoDescriptor;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.CivilTimeEncoder;
@ -46,8 +43,6 @@ import com.google.cloud.bigquery.storage.v1.stub.BigQueryWriteStubSettings;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.grpc.Status;
import java.time.LocalTime;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -75,6 +70,7 @@ import java.io.InputStream;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -85,13 +81,16 @@ import java.util.Optional;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
@TriggerSerially
@Tags({"google", "google cloud", "bq", "bigquery"})
@CapabilityDescription("Unified processor for batch and stream flow files content to a Google BigQuery table via the Storage Write API." +
"The processor is record based so the used schema is driven by the RecordReader. Attributes that are not matched to the target schema" +
"are skipped. Exactly once delivery semantics are achieved via stream offsets. The Storage Write API is more efficient than the older " +
"insertAll method because it uses gRPC streaming rather than REST over HTTP")
@CapabilityDescription("Writes the contents of a FlowFile to a Google BigQuery table. " +
"The processor is record based so the schema that is used is driven by the RecordReader. Attributes that are not matched to the target schema " +
"are skipped. Exactly once delivery semantics are achieved via stream offsets.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttributes({
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)

View File

@ -140,11 +140,10 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
}
public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
.Builder().name("drive-file-id")
public static final PropertyDescriptor FILE_ID = new PropertyDescriptor.Builder()
.name("drive-file-id")
.displayName("File ID")
.description("The Drive ID of the File to fetch. "
+ "Please see Additional Details to obtain Drive ID.")
.description("The Drive ID of the File to fetch. Please see Additional Details for information on how to obtain the Drive ID.")
.required(true)
.defaultValue("${drive.id}")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@ -198,16 +197,15 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
public static final Relationship REL_SUCCESS =
new Relationship.Builder()
.name("success")
.description("A FlowFile will be routed here for each successfully fetched File.")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile will be routed here for each successfully fetched File.")
.build();
public static final Relationship REL_FAILURE =
new Relationship.Builder().name("failure")
.description("A FlowFile will be routed here for each File for which fetch was attempted but failed.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile will be routed here for each File for which fetch was attempted but failed.")
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE,
@ -349,7 +347,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
private void handleErrorResponse(final ProcessSession session, final String fileId, FlowFile flowFile, final GoogleJsonResponseException e) {
getLogger().error("Fetching File [{}] failed", fileId, e);
flowFile = session.putAttribute(flowFile, ERROR_CODE, "" + e.getStatusCode());
flowFile = session.putAttribute(flowFile, ERROR_CODE, String.valueOf(e.getStatusCode()));
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE, e.getMessage());
flowFile = session.penalize(flowFile);

View File

@ -16,36 +16,12 @@
*/
package org.apache.nifi.processors.gcp.drive;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.util.DateTime;
import com.google.api.services.drive.Drive;
import com.google.api.services.drive.DriveScopes;
import com.google.api.services.drive.model.File;
import com.google.api.services.drive.model.FileList;
import java.io.IOException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@ -74,12 +50,37 @@ import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
@PrimaryNodeOnly
@TriggerSerially
@Tags({"google", "drive", "storage"})
@CapabilityDescription("Lists concrete files (shortcuts are ignored) in a Google Drive folder. " +
"Each listed file may result in one FlowFile, the metadata being written as FlowFile attributes. " +
"Or - in case the 'Record Writer' property is set - the entire result is written as records to a single FlowFile. " +
@CapabilityDescription("Performs a listing of concrete files (shortcuts are ignored) in a Google Drive folder. " +
"If the 'Record Writer' property is set, a single Output FlowFile is created, and each file in the listing is written as a single record to the output file. " +
"Otherwise, for each file in the listing, an individual FlowFile is created, the metadata being written as FlowFile attributes. " +
"This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the " +
"previous node left off without duplicating all of the data. " +
"Please see Additional Details to set up access to Google Drive.")

View File

@ -16,30 +16,6 @@
*/
package org.apache.nifi.processors.gcp.drive;
import static java.lang.String.format;
import static java.lang.String.valueOf;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.joining;
import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.FAIL;
import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.IGNORE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.media.MediaHttpUploader;
import com.google.api.client.http.GenericUrl;
@ -53,20 +29,6 @@ import com.google.api.services.drive.DriveRequest;
import com.google.api.services.drive.DriveScopes;
import com.google.api.services.drive.model.File;
import com.google.api.services.drive.model.FileList;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@ -95,10 +57,49 @@ import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.json.JSONObject;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
import static java.lang.String.valueOf;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.joining;
import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.FAIL;
import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.IGNORE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
@SeeAlso({ListGoogleDrive.class, FetchGoogleDrive.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"google", "drive", "storage", "put"})
@CapabilityDescription("Puts content to a Google Drive Folder.")
@CapabilityDescription("Writes the contents of a FlowFile as a file in Google Drive.")
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the Google Drive object.")
@WritesAttributes({
@WritesAttribute(attribute = ID, description = ID_DESC),

View File

@ -132,7 +132,7 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
@TriggerWhenEmpty
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"google cloud", "google", "storage", "gcs", "list"})
@CapabilityDescription("Retrieves a listing of objects from an GCS bucket. For each object that is listed, creates a FlowFile that represents "
@CapabilityDescription("Retrieves a listing of objects from a GCS bucket. For each object that is listed, creates a FlowFile that represents "
+ "the object so that it can be fetched in conjunction with FetchGCSObject. This Processor is designed to run on Primary Node only "
+ "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating "
+ "all of the data.")

View File

@ -107,17 +107,15 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"google", "google cloud", "gcs", "archive", "put"})
@CapabilityDescription("Puts flow files to a Google Cloud Bucket.")
@CapabilityDescription("Writes the contents of a FlowFile as an object in a Google Cloud Storage.")
@SeeAlso({FetchGCSObject.class, DeleteGCSObject.class, ListGCSBucket.class})
@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the GCS Object",
value = "The value of a User-Defined Metadata field to add to the GCS Object",
description = "Allows user-defined metadata to be added to the GCS object as key/value pairs",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@ReadsAttributes({
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the " +
"GCS object"),
@ReadsAttribute(attribute = "mime.type", description = "Uses the FlowFile's MIME type as the content-type for " +
"the GCS object")
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the GCS object"),
@ReadsAttribute(attribute = "mime.type", description = "Uses the FlowFile's MIME type as the content-type for the GCS object")
})
@WritesAttributes({
@WritesAttribute(attribute = BUCKET_ATTR, description = BUCKET_DESC),

View File

@ -88,8 +88,10 @@ import java.util.zip.InflaterInputStream;
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy-hadoop", "snappy framed", "lz4-framed", "deflate", "zstd", "brotli"})
@CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type "
+ "attribute as appropriate. This processor operates in a very memory efficient way so very large objects well beyond the heap size "
+ "are generally fine to process")
+ "attribute as appropriate. A common idiom is to precede CompressContent with IdentifyMimeType and configure Mode='decompress' AND Compression Format='use mime.type attribute'. "
+ "When used in this manner, the MIME type is automatically detected and the data is decompressed, if necessary. "
+ "If decompression is unnecessary, the data is passed through to the 'success' relationship."
+ " This processor operates in a very memory efficient way so very large objects well beyond the heap size are generally fine to process.")
@ReadsAttribute(attribute = "mime.type", description = "If the Compression Format is set to use mime.type attribute, this attribute is used to "
+ "determine the compression type. Otherwise, this attribute is ignored.")
@WritesAttribute(attribute = "mime.type", description = "If the Mode property is set to compress, the appropriate MIME Type is set. If the Mode "

View File

@ -16,27 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
@ -68,6 +47,27 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
@ -75,7 +75,7 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu
@SideEffectFree
@SupportsBatching
@SeeAlso(PutSQL.class)
@SeeAlso({PutSQL.class, PutDatabaseRecord.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"json", "sql", "database", "rdbms", "insert", "update", "delete", "relational", "flat"})
@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE, INSERT, or DELETE SQL statement. The incoming FlowFile is expected to be "

View File

@ -16,16 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Base32InputStream;
import org.apache.commons.codec.binary.Base32OutputStream;
@ -50,11 +40,21 @@ import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"encode", "decode", "base64", "hex"})
@CapabilityDescription("Encode or decode contents using configurable encoding schemes")
@Tags({"encode", "decode", "base64", "base32", "hex"})
@CapabilityDescription("Encode or decode the contents of a FlowFile using Base64, Base32, or hex encoding schemes")
public class EncodeContent extends AbstractProcessor {
public static final String ENCODE_MODE = "Encode";

View File

@ -16,25 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import static io.krakens.grok.api.GrokUtils.getNameGroups;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -61,6 +42,25 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static io.krakens.grok.api.GrokUtils.getNameGroups;
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -111,7 +111,8 @@ public class ExtractText extends AbstractProcessor {
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Buffer Size")
.description("Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. Files larger than the specified maximum will not be fully evaluated.")
.description("Specifies the maximum amount of data to buffer (per FlowFile) in order to apply the regular expressions. " +
"FlowFiles larger than the specified maximum will not be fully evaluated.")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.addValidator(StandardValidators.createDataSizeBoundsValidator(0, Integer.MAX_VALUE))
@ -120,7 +121,7 @@ public class ExtractText extends AbstractProcessor {
public static final PropertyDescriptor MAX_CAPTURE_GROUP_LENGTH = new PropertyDescriptor.Builder()
.name("Maximum Capture Group Length")
.description("Specifies the maximum number of characters a given capture group value can have. Any characters beyond the max will be truncated.")
.description("Specifies the maximum number of characters a given capture group value can have. Any characters beyond the max will be truncated.")
.required(false)
.defaultValue("1024")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)

View File

@ -69,12 +69,11 @@ import java.util.stream.Stream;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Receives Record-oriented data (i.e., data that can be read by the configured Record Reader) and evaluates one or more RecordPaths against the "
+ "each record in the incoming FlowFile. Each record is then grouped with other \"like records\" and a FlowFile is created for each group of \"like records.\" What it means for "
+ "two records to be \"like records\" is determined by user-defined properties. The user is required to enter at least one user-defined property whose value is a RecordPath. Two "
+ "records are considered alike if they have the same value for all configured RecordPaths. Because we know that all records in a given output FlowFile have the same value for the "
+ "fields that are specified by the RecordPath, an attribute is added for each field. See Additional Details on the Usage page for more information and examples.")
@DynamicProperty(name="The name given to the dynamic property is the name of the attribute that will be used to denote the value of the associted RecordPath.",
@CapabilityDescription("Splits, or partitions, record-oriented data based on the configured fields in the data. One or more properties must be added. The name of the property is the name " +
"of an attribute to add. The value of the property is a RecordPath to evaluate against each Record. Two records will go to the same outbound FlowFile only if they have the same value for each " +
"of the given RecordPaths. Because we know that all records in a given output FlowFile have the same value for the fields that are specified by the RecordPath, an attribute is added for each " +
"field. See Additional Details on the Usage page for more information and examples.")
@DynamicProperty(name="The name given to the dynamic property is the name of the attribute that will be used to denote the value of the associated RecordPath.",
value="A RecordPath that points to a field in the Record.",
description="Each dynamic property represents a RecordPath that will be evaluated against each record in an incoming FlowFile. When the value of the RecordPath is determined "
+ "for a Record, an attribute is added to the outgoing FlowFile. The name of the attribute is the same as the name of this property. The value of the attribute is the same as "

View File

@ -16,26 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import net.lingala.zip4j.io.inputstream.ZipInputStream;
import net.lingala.zip4j.model.LocalFileHeader;
import net.lingala.zip4j.model.enums.EncryptionMethod;
@ -78,12 +58,32 @@ import org.apache.nifi.util.FlowFileUnpackagerV1;
import org.apache.nifi.util.FlowFileUnpackagerV2;
import org.apache.nifi.util.FlowFileUnpackagerV3;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Unpack", "un-merge", "tar", "zip", "archive", "flowfile-stream", "flowfile-stream-v3"})
@CapabilityDescription("Unpacks the content of FlowFiles that have been packaged with one of several different Packaging Formats, emitting one to many "
+ "FlowFiles for each input FlowFile")
+ "FlowFiles for each input FlowFile. Supported formats are TAR, ZIP, and FlowFile Stream packages.")
@ReadsAttribute(attribute = "mime.type", description = "If the <Packaging Format> property is set to use mime.type attribute, this attribute is used "
+ "to determine the FlowFile's MIME Type. In this case, if the attribute is set to application/tar, the TAR Packaging Format will be used. If "
+ "the attribute is set to application/zip, the ZIP Packaging Format will be used. If the attribute is set to application/flowfile-v3 or "