NIFI-12671 Added S3FileResourceService

This closes #8368.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Balázs Gerner 2024-02-06 14:56:23 +01:00 committed by Peter Turcsanyi
parent 03bba7049a
commit 40d9750bb3
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
26 changed files with 513 additions and 130 deletions

View File

@ -29,7 +29,6 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
@ -58,6 +57,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;
/**
* Base class for AWS processors that uses AWSCredentialsProvider interface for creating AWS clients.
*
@ -92,14 +93,6 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends
// Property Descriptors
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
.name("Region")
.description("The AWS Region to connect to.")
.required(true)
.allowableValues(getAvailableRegions())
.defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout")
.description("The amount of time to wait in order to establish a connection to AWS or receive data from AWS before timing out.")
@ -177,19 +170,6 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends
this.clientCache.cleanUp();
}
public static AllowableValue createAllowableValue(final Regions region) {
return new AllowableValue(region.getName(), region.getDescription(), "AWS Region Code : " + region.getName());
}
public static AllowableValue[] getAvailableRegions() {
final List<AllowableValue> values = new ArrayList<>();
for (final Regions region : Regions.values()) {
values.add(createAllowableValue(region));
}
return values.toArray(new AllowableValue[0]);
}
@Override
public void migrateProperties(final PropertyConfiguration config) {
migrateAuthenticationProperties(config);

View File

@ -22,7 +22,6 @@ import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.Signer;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Builder;
import com.amazonaws.services.s3.AmazonS3Client;
@ -35,10 +34,8 @@ import com.amazonaws.services.s3.model.EmailAddressGrantee;
import com.amazonaws.services.s3.model.Grantee;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
@ -49,7 +46,6 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import org.apache.nifi.processors.aws.signer.AwsCustomSignerUtil;
@ -62,11 +58,13 @@ import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import static java.lang.String.format;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V2_SIGNER;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V4_SIGNER;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.CUSTOM_SIGNER;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.DEFAULT_SIGNER;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.ATTRIBUTE_DEFINED_REGION;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveS3Region;
public abstract class AbstractS3Processor extends AbstractAWSCredentialsProviderProcessor<AmazonS3Client> {
@ -182,16 +180,6 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
.dynamicallyModifiesClasspath(true)
.build();
public static final String S3_REGION_ATTRIBUTE = "s3.region" ;
static final AllowableValue ATTRIBUTE_DEFINED_REGION = new AllowableValue("attribute-defined-region",
"Use '" + S3_REGION_ATTRIBUTE + "' Attribute",
"Uses '" + S3_REGION_ATTRIBUTE + "' FlowFile attribute as region.");
public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(REGION)
.allowableValues(getAvailableS3Regions())
.build();
public static final PropertyDescriptor ENCRYPTION_SERVICE = new PropertyDescriptor.Builder()
.name("encryption-service")
.displayName("Encryption Service")
@ -292,7 +280,7 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
* @return The created S3 client
*/
protected AmazonS3Client getS3Client(final ProcessContext context, final Map<String, String> attributes) {
final Region region = resolveRegion(context, attributes);
final Region region = resolveS3Region(context, attributes);
return getClient(context, region);
}
@ -303,7 +291,7 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
* @return The newly created S3 client
*/
protected AmazonS3Client createClient(final ProcessContext context, final Map<String, String> attributes) {
final Region region = resolveRegion(context, attributes);
final Region region = resolveS3Region(context, attributes);
return createClient(context, region);
}
@ -451,36 +439,8 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
return cannedAcl;
}
private Region parseRegionValue(String regionValue) {
if (regionValue == null) {
throw new ProcessException(format("[%s] was selected as region source but [%s] attribute does not exist", ATTRIBUTE_DEFINED_REGION, S3_REGION_ATTRIBUTE));
}
try {
return Region.getRegion(Regions.fromName(regionValue));
} catch (Exception e) {
throw new ProcessException(format("The [%s] attribute contains an invalid region value [%s]", S3_REGION_ATTRIBUTE, regionValue), e);
}
}
private Region resolveRegion(final ProcessContext context, final Map<String, String> attributes) {
String regionValue = context.getProperty(S3_REGION).getValue();
if (ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue)) {
regionValue = attributes.get(S3_REGION_ATTRIBUTE);
}
return parseRegionValue(regionValue);
}
private boolean isAttributeDefinedRegion(final ProcessContext context) {
String regionValue = context.getProperty(S3_REGION).getValue();
return ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue);
}
private static AllowableValue[] getAvailableS3Regions() {
final AllowableValue[] availableRegions = getAvailableRegions();
return ArrayUtils.addAll(availableRegions, ATTRIBUTE_DEFINED_REGION);
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.util;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.processor.exception.ProcessException;
import java.util.Arrays;
import java.util.Map;
/**
* Utility class for AWS region methods. This class uses AWS SDK v1.
*
*/
public final class RegionUtilV1 {
private RegionUtilV1() {
}
public static final String S3_REGION_ATTRIBUTE = "s3.region" ;
public static final AllowableValue ATTRIBUTE_DEFINED_REGION = new AllowableValue("attribute-defined-region",
"Use '" + S3_REGION_ATTRIBUTE + "' Attribute",
"Uses '" + S3_REGION_ATTRIBUTE + "' FlowFile attribute as region.");
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
.name("Region")
.description("The AWS Region to connect to.")
.required(true)
.allowableValues(getAvailableRegions())
.defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
.build();
public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(REGION)
.allowableValues(getAvailableS3Regions())
.build();
public static Region resolveS3Region(final PropertyContext context, final Map<String, String> attributes) {
String regionValue = context.getProperty(S3_REGION).getValue();
if (ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue)) {
regionValue = attributes.get(S3_REGION_ATTRIBUTE);
}
return parseS3RegionValue(regionValue);
}
public static AllowableValue[] getAvailableS3Regions() {
final AllowableValue[] availableRegions = getAvailableRegions();
return ArrayUtils.add(availableRegions, ATTRIBUTE_DEFINED_REGION);
}
public static AllowableValue createAllowableValue(final Regions region) {
return new AllowableValue(region.getName(), region.getDescription(), "AWS Region Code : " + region.getName());
}
public static AllowableValue[] getAvailableRegions() {
return Arrays.stream(Regions.values())
.map(RegionUtilV1::createAllowableValue)
.toArray(AllowableValue[]::new);
}
private static Region parseS3RegionValue(String regionValue) {
if (regionValue == null) {
throw new ProcessException(String.format("[%s] was selected as region source but [%s] attribute does not exist", ATTRIBUTE_DEFINED_REGION, S3_REGION_ATTRIBUTE));
}
try {
return Region.getRegion(Regions.fromName(regionValue));
} catch (Exception e) {
throw new ProcessException(String.format("The [%s] attribute contains an invalid region value [%s]", S3_REGION_ATTRIBUTE, regionValue), e);
}
}
}

View File

@ -109,8 +109,8 @@ public abstract class AbstractAwsProcessor<T extends SdkClient> extends Abstract
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
.name("Region")
.required(true)
.allowableValues(RegionUtil.getAvailableRegions())
.defaultValue(RegionUtil.createAllowableValue(Region.US_WEST_2).getValue())
.allowableValues(RegionUtilV2.getAvailableRegions())
.defaultValue(RegionUtilV2.createAllowableValue(Region.US_WEST_2).getValue())
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()

View File

@ -25,10 +25,13 @@ import java.util.Comparator;
import java.util.List;
/**
* Utility class for AWS region methods.
* Utility class for AWS region methods. This class uses AWS SDK v2.
*
*/
public abstract class RegionUtil {
public final class RegionUtilV2 {
private RegionUtilV2() {
}
/**
* Creates an AllowableValue from a Region.

View File

@ -38,6 +38,8 @@ import org.apache.nifi.processor.util.StandardValidators;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@SupportsBatching
@WritesAttributes({

View File

@ -62,6 +62,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@SupportsBatching
@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class})
@InputRequirement(Requirement.INPUT_REQUIRED)

View File

@ -99,6 +99,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;
@PrimaryNodeOnly
@TriggerSerially
@TriggerWhenEmpty
@ -518,7 +520,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
if (writerFactory == null) {
writer = new AttributeObjectWriter(session);
} else {
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(S3_REGION).getValue());
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(REGION).getValue());
}
try {
@ -540,7 +542,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary);
// Write the entity to the listing
writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue());
writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(REGION).getValue());
listCount++;
}
@ -605,7 +607,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
if (writerFactory == null) {
writer = new AttributeObjectWriter(session);
} else {
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(S3_REGION).getValue());
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(REGION).getValue());
}
try {
@ -626,7 +628,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
// Write the entity to the listing
final GetObjectTaggingResult taggingResult = getTaggingResult(context, client, versionSummary);
final ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary);
writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue());
writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(REGION).getValue());
// Track the latest lastModified timestamp and keys having that timestamp.
// NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps.
@ -736,7 +738,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
if (writerFactory == null) {
writer = new AttributeObjectWriter(session);
} else {
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(S3_REGION).getValue());
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(REGION).getValue());
}
try {
@ -750,7 +752,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
final AmazonS3Client s3Client = getClient(context);
final GetObjectTaggingResult taggingResult = getTaggingResult(context, s3Client, s3VersionSummary);
final ObjectMetadata objectMetadata = getObjectMetadata(context, s3Client, s3VersionSummary);
writer.addToListing(s3VersionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue());
writer.addToListing(s3VersionSummary, taggingResult, objectMetadata, context.getProperty(REGION).getValue());
listCount++;

View File

@ -89,6 +89,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;

View File

@ -47,6 +47,8 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@SupportsBatching
@WritesAttributes({

View File

@ -36,8 +36,8 @@ import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processors.aws.s3.AbstractS3Processor;
import org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService;
import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
@ -108,8 +108,8 @@ public class StandardS3EncryptionService extends AbstractControllerService imple
.displayName("KMS Region")
.description("The Region of the AWS Key Management Service. Only used in case of Client-side KMS.")
.required(false)
.allowableValues(AbstractS3Processor.getAvailableRegions())
.defaultValue(AbstractS3Processor.createAllowableValue(Regions.DEFAULT_REGION).getValue())
.allowableValues(RegionUtilV1.getAvailableRegions())
.defaultValue(RegionUtilV1.createAllowableValue(Regions.DEFAULT_REGION).getValue())
.build();
private String keyValue = "";

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3.service;
import com.amazonaws.SdkClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.S3Object;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
import org.apache.nifi.processors.aws.s3.AbstractS3Processor;
import org.apache.nifi.processors.aws.s3.FetchS3Object;
import org.apache.nifi.processors.aws.util.RegionUtilV1;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveS3Region;
import static org.apache.nifi.util.StringUtils.isBlank;
@Tags({"Amazon", "S3", "AWS", "file", "resource"})
@SeeAlso({FetchS3Object.class})
@CapabilityDescription("Provides an Amazon Web Services (AWS) S3 file resource for other components.")
@UseCase(
description = "Fetch a specific file from S3. " +
"The service provides higher performance compared to fetch processors when the data should be moved between different storages without any transformation.",
configuration = """
"Bucket" = "${s3.bucket}"
"Object Key" = "${filename}"
The "Region" property must be set to denote the S3 region that the Bucket resides in.
The "AWS Credentials Provider Service" property should specify an instance of the AWSCredentialsProviderService in order to provide credentials for accessing the bucket.
"""
)
public class S3FileResourceService extends AbstractControllerService implements FileResourceService {
public static final PropertyDescriptor BUCKET_WITH_DEFAULT_VALUE = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractS3Processor.BUCKET_WITH_DEFAULT_VALUE)
.build();
public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractS3Processor.KEY)
.build();
public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(RegionUtilV1.S3_REGION)
.build();
private static final List<PropertyDescriptor> PROPERTIES = List.of(
BUCKET_WITH_DEFAULT_VALUE,
KEY,
S3_REGION,
AWS_CREDENTIALS_PROVIDER_SERVICE);
private final Cache<Region, AmazonS3> clientCache = Caffeine.newBuilder().build();
private volatile PropertyContext context;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.context = context;
}
@OnDisabled
public void onDisabled() {
this.context = null;
clientCache.asMap().values().forEach(AmazonS3::shutdown);
clientCache.invalidateAll();
clientCache.cleanUp();
}
@Override
public FileResource getFileResource(Map<String, String> attributes) {
final AWSCredentialsProviderService awsCredentialsProviderService = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
.asControllerService(AWSCredentialsProviderService.class);
final AmazonS3 client = getS3Client(attributes, awsCredentialsProviderService.getCredentialsProvider());
try {
return fetchObject(client, attributes);
} catch (final ProcessException | SdkClientException e) {
throw new ProcessException("Failed to fetch s3 object", e);
}
}
/**
* Fetches s3 object from the provided bucket and returns it as FileResource
*
* @param client amazon s3 client
* @param attributes configuration attributes
* @return fetched s3 object as FileResource
* @throws ProcessException if the object 'bucketName/key' does not exist
*/
private FileResource fetchObject(final AmazonS3 client, final Map<String, String> attributes) throws ProcessException,
SdkClientException {
final String bucketName = context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(attributes).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
if (isBlank(bucketName) || isBlank(key)) {
throw new ProcessException("Bucket name or key value is missing");
}
if (!client.doesObjectExist(bucketName, key)) {
throw new ProcessException(String.format("Object '%s/%s' does not exist in s3", bucketName, key));
}
final S3Object object = client.getObject(bucketName, key);
return new FileResource(object.getObjectContent(), object.getObjectMetadata().getContentLength());
}
protected AmazonS3 getS3Client(Map<String, String> attributes, AWSCredentialsProvider credentialsProvider) {
final Region region = resolveS3Region(context, attributes);
return clientCache.get(region, ignored -> AmazonS3Client.builder()
.withRegion(region.getName())
.withCredentials(credentialsProvider)
.build());
}
}

View File

@ -56,6 +56,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;
@SupportsBatching
@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"Amazon", "AWS", "Client", "Gateway-API", "Rest", "http", "https"})

View File

@ -15,3 +15,4 @@
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService
org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService
org.apache.nifi.processors.aws.s3.service.S3FileResourceService

View File

@ -37,6 +37,7 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.Tag;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -251,7 +252,7 @@ public abstract class AbstractS3IT {
Assertions.fail("Could not set security properties");
}
runner.setProperty(AbstractS3Processor.S3_REGION, getRegion());
runner.setProperty(RegionUtilV1.S3_REGION, getRegion());
runner.setProperty(AbstractS3Processor.ENDPOINT_OVERRIDE, getEndpointOverride());
runner.setProperty(AbstractS3Processor.BUCKET_WITHOUT_DEFAULT_VALUE, BUCKET_NAME);

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.aws.s3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -91,7 +92,7 @@ public class ITFetchS3Object extends AbstractS3IT {
final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object());
setSecureProperties(runner);
runner.setProperty(FetchS3Object.S3_REGION, getRegion());
runner.setProperty(RegionUtilV1.S3_REGION, getRegion());
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, BUCKET_NAME);
final Map<String, String> attrs = new HashMap<>();

View File

@ -34,6 +34,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService;
import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
@ -447,7 +448,7 @@ public class ITPutS3Object extends AbstractS3IT {
final TestRunner runner = initTestRunner();
setSecureProperties(runner);
runner.setProperty(PutS3Object.S3_REGION, getRegion());
runner.setProperty(RegionUtilV1.S3_REGION, getRegion());
runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, BUCKET_NAME);
runner.setProperty(PutS3Object.KEY, "${filename}");
@ -764,7 +765,7 @@ public class ITPutS3Object extends AbstractS3IT {
final TestRunner runner = initTestRunner();
setSecureProperties(runner);
runner.setProperty(PutS3Object.S3_REGION, getRegion());
runner.setProperty(RegionUtilV1.S3_REGION, getRegion());
runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, BUCKET_NAME);
runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, TEST_PARTSIZE_STRING);
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);

View File

@ -22,6 +22,7 @@ import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteVersionRequest;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
@ -56,7 +57,7 @@ public class TestDeleteS3Object {
@Test
public void testDeleteObjectSimple() {
runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2");
runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "delete-key");
@ -75,7 +76,7 @@ public class TestDeleteS3Object {
@Test
public void testDeleteObjectSimpleRegionFromFlowFileAttribute() {
runner.setProperty(DeleteS3Object.S3_REGION, "attribute-defined-region");
runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region");
runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "delete-key");
@ -89,7 +90,7 @@ public class TestDeleteS3Object {
@Test
public void testDeleteObjectS3Exception() {
runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2");
runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "delete-key");
@ -105,7 +106,7 @@ public class TestDeleteS3Object {
@Test
public void testDeleteVersionSimple() {
runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2");
runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(DeleteS3Object.VERSION_ID, "test-version");
final Map<String, String> attrs = new HashMap<>();
@ -126,7 +127,7 @@ public class TestDeleteS3Object {
@Test
public void testDeleteVersionFromExpressions() {
runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2");
runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "${s3.bucket}");
runner.setProperty(DeleteS3Object.VERSION_ID, "${s3.version}");
final Map<String, String> attrs = new HashMap<>();

View File

@ -32,6 +32,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -80,7 +81,7 @@ public class TestFetchS3Object {
@Test
public void testGetObject() throws IOException {
runner.setProperty(FetchS3Object.S3_REGION, "attribute-defined-region");
runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@ -147,7 +148,7 @@ public class TestFetchS3Object {
@Test
public void testGetObjectWithRequesterPays() throws IOException {
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket");
runner.setProperty(FetchS3Object.REQUESTER_PAYS, "true");
final Map<String, String> attrs = new HashMap<>();
@ -205,7 +206,7 @@ public class TestFetchS3Object {
@Test
public void testGetObjectVersion() throws IOException {
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket");
runner.setProperty(FetchS3Object.VERSION_ID, "${s3.version}");
final Map<String, String> attrs = new HashMap<>();
@ -245,7 +246,7 @@ public class TestFetchS3Object {
@Test
public void testGetObjectExceptionGoesToFailure() {
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@ -259,7 +260,7 @@ public class TestFetchS3Object {
@Test
public void testFetchObject_FailAdditionalAttributesBucketName() {
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket-bad-name");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@ -288,7 +289,7 @@ public class TestFetchS3Object {
@Test
public void testFetchObject_FailAdditionalAttributesAuthentication() {
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket-bad-name");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@ -312,7 +313,7 @@ public class TestFetchS3Object {
@Test
public void testFetchObject_FailAdditionalAttributesNetworkFailure() {
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket-bad-name");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@ -330,7 +331,7 @@ public class TestFetchS3Object {
@Test
public void testGetObjectReturnsNull() {
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@ -344,7 +345,7 @@ public class TestFetchS3Object {
@Test
public void testFlowFileAccessExceptionGoesToFailure() {
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");

View File

@ -40,6 +40,7 @@ import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.state.MockStateManager;
@ -102,7 +103,7 @@ public class TestListS3 {
@Test
public void testList() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
Date lastModified = new Date();
@ -154,7 +155,7 @@ public class TestListS3 {
@Test
public void testListWithRecords() throws InitializationException {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
final MockRecordWriter recordWriter = new MockRecordWriter(null, false);
@ -205,7 +206,7 @@ public class TestListS3 {
@Test
public void testListWithRequesterPays() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.REQUESTER_PAYS, "true");
@ -251,7 +252,7 @@ public class TestListS3 {
@Test
public void testListWithRequesterPays_invalid() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.USE_VERSIONS, "true"); // requester pays cannot be used with versions
runner.setProperty(ListS3.REQUESTER_PAYS, "true");
@ -261,7 +262,7 @@ public class TestListS3 {
@Test
public void testListVersion2() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.LIST_TYPE, "2");
@ -307,7 +308,7 @@ public class TestListS3 {
@Test
public void testListVersion2WithRequesterPays() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.REQUESTER_PAYS, "true");
runner.setProperty(ListS3.LIST_TYPE, "2");
@ -354,7 +355,7 @@ public class TestListS3 {
@Test
public void testListVersions() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.USE_VERSIONS, "true");
@ -398,7 +399,7 @@ public class TestListS3 {
@Test
public void testListObjectsNothingNew() throws IOException {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
Calendar calendar = Calendar.getInstance();
@ -434,7 +435,7 @@ public class TestListS3 {
@Test
public void testListIgnoreByMinAge() throws IOException {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.MIN_AGE, "30 sec");
@ -485,7 +486,7 @@ public class TestListS3 {
@Test
public void testListIgnoreByMaxAge() throws IOException {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.MAX_AGE, "30 sec");
Date lastModifiedNow = new Date();
@ -533,7 +534,7 @@ public class TestListS3 {
@Test
public void testWriteObjectTags() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.WRITE_OBJECT_TAGS, "true");
@ -560,7 +561,7 @@ public class TestListS3 {
@Test
public void testWriteUserMetadata() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.WRITE_USER_METADATA, "true");
@ -588,7 +589,7 @@ public class TestListS3 {
@Test
public void testNoTrackingList() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.LISTING_STRATEGY, ListS3.NO_TRACKING);
@ -676,7 +677,7 @@ public class TestListS3 {
assertEquals(TEST_TIMESTAMP, listS3.getListingSnapshot().getTimestamp());
runner.setProperty(ListS3.REGION, Regions.EU_CENTRAL_1.getName());
runner.setProperty(RegionUtilV1.REGION, Regions.EU_CENTRAL_1.getName());
assertTrue(listS3.isResetTracking());
@ -765,7 +766,7 @@ public class TestListS3 {
assertNotNull(listS3.getListedEntityTracker());
runner.setProperty(ListS3.REGION, Regions.EU_CENTRAL_1.getName());
runner.setProperty(RegionUtilV1.REGION, Regions.EU_CENTRAL_1.getName());
assertTrue(listS3.isResetTracking());

View File

@ -43,6 +43,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.signer.AwsSignerType;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -245,7 +246,7 @@ public class TestPutS3Object {
}
private void prepareTest(String filename) {
runner.setProperty(PutS3Object.S3_REGION, "ap-northeast-1");
runner.setProperty(RegionUtilV1.S3_REGION, "ap-northeast-1");
runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.assertValid();
@ -258,7 +259,7 @@ public class TestPutS3Object {
}
private void prepareTestWithRegionInAttributes(String filename, String region) {
runner.setProperty(PutS3Object.S3_REGION, "attribute-defined-region");
runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region");
runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.assertValid();

View File

@ -27,6 +27,7 @@ import com.amazonaws.services.s3.model.SetObjectTaggingRequest;
import com.amazonaws.services.s3.model.Tag;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -70,7 +71,7 @@ public class TestTagS3Object {
public void testTagObjectSimple() {
final String tagKey = "k";
final String tagVal = "v";
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
@ -98,7 +99,7 @@ public class TestTagS3Object {
@Test
public void testTagObjectSimpleRegionFromFlowFileAttribute() {
runner.setProperty(TagS3Object.S3_REGION, "attribute-defined-region");
runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, "k");
runner.setProperty(TagS3Object.TAG_VALUE, "v");
@ -118,7 +119,7 @@ public class TestTagS3Object {
public void testTagObjectVersion() {
final String tagKey = "k";
final String tagVal = "v";
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(TagS3Object.VERSION_ID, "test-version");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
@ -148,7 +149,7 @@ public class TestTagS3Object {
final String tagKey = "nk";
final String tagVal = "nv";
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
@ -183,7 +184,7 @@ public class TestTagS3Object {
final String tagKey = "nk";
final String tagVal = "nv";
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
@ -212,7 +213,7 @@ public class TestTagS3Object {
final String tagKey = "nk";
final String tagVal = "nv";
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
@ -247,7 +248,7 @@ public class TestTagS3Object {
final String tagKey = "nk";
final String tagVal = "nv";
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
@ -263,7 +264,7 @@ public class TestTagS3Object {
@Test
public void testBucketEvaluatedAsBlank() {
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "${not.existant.attribute}");
runner.setProperty(TagS3Object.TAG_KEY, "key");
runner.setProperty(TagS3Object.TAG_VALUE, "val");
@ -278,7 +279,7 @@ public class TestTagS3Object {
@Test
public void testTagKeyEvaluatedAsBlank() {
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, "${not.existant.attribute}");
runner.setProperty(TagS3Object.TAG_VALUE, "val");
@ -293,7 +294,7 @@ public class TestTagS3Object {
@Test
public void testTagValEvaluatedAsBlank() {
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, "tagKey");
runner.setProperty(TagS3Object.TAG_VALUE, "${not.existant.attribute}");

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3.service;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Map;
import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class S3FileResourceServiceTest {
private static final String CONTROLLER_SERVICE = "AWSCredentialsService";
private static final String BUCKET_NAME = "test-bucket";
private static final String KEY = "key";
private static final long CONTENT_LENGTH = 10L;
@Mock
private AmazonS3 client;
@Mock
private S3Object s3Object;
@Mock
private ObjectMetadata metadata;
@Mock
private S3ObjectInputStream inputStream;
@InjectMocks
private TestS3FileResourceService service;
private TestRunner runner;
@BeforeEach
public void setup() throws InitializationException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
runner.addControllerService("S3FileResourceService", service);
}
@Test
void testGetFileResourceHappyPath() throws InitializationException {
setupS3Client();
setupService();
FileResource fileResource = service.getFileResource(Map.of());
assertFileResource(fileResource);
}
@Test
void testNonExistingObject() throws InitializationException {
when(client.doesObjectExist(BUCKET_NAME, KEY)).thenReturn(false);
setupService();
assertThrows(ProcessException.class, () -> service.getFileResource(Map.of()), "Failed to fetch s3 object");
verify(client).doesObjectExist(BUCKET_NAME, KEY);
verifyNoMoreInteractions(client);
}
@Test
void testValidBlobUsingELButMissingAttribute() throws InitializationException {
setupService("${s3.bucket}", "${key}");
assertThrows(ProcessException.class,
() -> service.getFileResource(Map.of()), "Bucket name or key value is missing");
verifyNoInteractions(client);
}
@Test
void testValidBlobUsingEL() throws InitializationException {
String bucketProperty = "s3.bucket";
String keyProperty = "key";
setupService("${" + bucketProperty + "}", "${" + keyProperty + "}");
setupS3Client();
FileResource fileResource = service.getFileResource(Map.of(
bucketProperty, BUCKET_NAME,
keyProperty, KEY));
assertFileResource(fileResource);
}
private void assertFileResource(FileResource fileResource) {
assertNotNull(fileResource);
assertEquals(fileResource.getInputStream(), inputStream);
assertEquals(fileResource.getSize(), CONTENT_LENGTH);
verify(client).doesObjectExist(BUCKET_NAME, KEY);
verify(client).getObject(BUCKET_NAME, KEY);
verify(s3Object).getObjectMetadata();
verify(metadata).getContentLength();
verify(s3Object).getObjectContent();
}
private void setupService() throws InitializationException {
setupService(BUCKET_NAME, KEY);
}
private void setupService(String bucket, String key) throws InitializationException {
final AWSCredentialsProviderService credentialsService = new AWSCredentialsProviderControllerService();
runner.addControllerService(CONTROLLER_SERVICE, credentialsService);
runner.enableControllerService(credentialsService);
runner.setProperty(service, AWS_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(service, S3FileResourceService.KEY, key);
runner.setProperty(service, S3FileResourceService.BUCKET_WITH_DEFAULT_VALUE, bucket);
runner.enableControllerService(service);
}
private void setupS3Client() {
when(client.doesObjectExist(BUCKET_NAME, KEY)).thenReturn(true);
when(client.getObject(BUCKET_NAME, KEY)).thenReturn(s3Object);
when(s3Object.getObjectContent()).thenReturn(inputStream);
when(s3Object.getObjectMetadata()).thenReturn(metadata);
when(metadata.getContentLength()).thenReturn(CONTENT_LENGTH);
}
private static class TestS3FileResourceService extends S3FileResourceService {
private final AmazonS3 client;
private TestS3FileResourceService(AmazonS3 client) {
this.client = client;
}
@Override
protected AmazonS3 getS3Client(Map<String, String> attributes, AWSCredentialsProvider credentialsProvider) {
return client;
}
}
}

View File

@ -18,8 +18,8 @@
package org.apache.nifi.processors.aws.sqs;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.aws.s3.AbstractS3Processor;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.processors.aws.v2.AbstractAwsProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterAll;
@ -84,8 +84,8 @@ public abstract class AbstractSQSIT {
TestRunner runner = TestRunners.newTestRunner(processorClass);
AuthUtils.enableAccessKey(runner, localstack.getAccessKey(), localstack.getSecretKey());
runner.setProperty(AbstractS3Processor.S3_REGION, localstack.getRegion());
runner.setProperty(AbstractS3Processor.ENDPOINT_OVERRIDE, localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString());
runner.setProperty(AbstractAwsProcessor.REGION, localstack.getRegion());
runner.setProperty(AbstractAwsProcessor.ENDPOINT_OVERRIDE, localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString());
runner.setProperty("Queue URL", queueUrl);
return runner;
}

View File

@ -22,6 +22,7 @@ import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.proxy.StandardProxyConfigurationService;
@ -66,7 +67,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
public void setupEndpointAndRegion() {
runner.setProperty(InvokeAWSGatewayApi.REGION, "us-east-1");
runner.setProperty(RegionUtilV1.REGION, "us-east-1");
runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_API_KEY, "abcd");
runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_GATEWAY_API_ENDPOINT, mockWebServer.url("/").toString());
}

View File

@ -30,6 +30,7 @@ import org.apache.http.message.BasicStatusLine;
import org.apache.http.protocol.HttpContext;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -64,7 +65,7 @@ public class TestInvokeAmazonGatewayApiMock {
AuthUtils.enableAccessKey(runner, "awsAccessKey", "awsSecretKey");
runner.setProperty(InvokeAWSGatewayApi.REGION, "us-east-1");
runner.setProperty(RegionUtilV1.REGION, "us-east-1");
runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_API_KEY, "abcd");
runner.setProperty(InvokeAWSGatewayApi.PROP_RESOURCE_NAME, "/TEST");
runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_GATEWAY_API_ENDPOINT, "https://foobar.execute-api.us-east-1.amazonaws.com");