From 40d9750bb3dab405bd0ba7df14737837d9c3021c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bal=C3=A1zs=20Gerner?= Date: Tue, 6 Feb 2024 14:56:23 +0100 Subject: [PATCH] NIFI-12671 Added S3FileResourceService This closes #8368. Signed-off-by: Peter Turcsanyi --- ...stractAWSCredentialsProviderProcessor.java | 24 +-- .../aws/s3/AbstractS3Processor.java | 50 +----- .../processors/aws/util/RegionUtilV1.java | 94 ++++++++++ .../aws/v2/AbstractAwsProcessor.java | 4 +- .../v2/{RegionUtil.java => RegionUtilV2.java} | 7 +- .../processors/aws/s3/DeleteS3Object.java | 2 + .../nifi/processors/aws/s3/FetchS3Object.java | 2 + .../apache/nifi/processors/aws/s3/ListS3.java | 14 +- .../nifi/processors/aws/s3/PutS3Object.java | 1 + .../nifi/processors/aws/s3/TagS3Object.java | 2 + .../StandardS3EncryptionService.java | 6 +- .../aws/s3/service/S3FileResourceService.java | 154 ++++++++++++++++ .../aws/wag/InvokeAWSGatewayApi.java | 2 + ...g.apache.nifi.controller.ControllerService | 1 + .../nifi/processors/aws/s3/AbstractS3IT.java | 3 +- .../processors/aws/s3/ITFetchS3Object.java | 3 +- .../nifi/processors/aws/s3/ITPutS3Object.java | 5 +- .../processors/aws/s3/TestDeleteS3Object.java | 11 +- .../processors/aws/s3/TestFetchS3Object.java | 19 +- .../nifi/processors/aws/s3/TestListS3.java | 31 ++-- .../processors/aws/s3/TestPutS3Object.java | 5 +- .../processors/aws/s3/TestTagS3Object.java | 21 +-- .../s3/service/S3FileResourceServiceTest.java | 170 ++++++++++++++++++ .../processors/aws/sqs/AbstractSQSIT.java | 6 +- .../wag/TestInvokeAWSGatewayApiCommon.java | 3 +- .../wag/TestInvokeAmazonGatewayApiMock.java | 3 +- 26 files changed, 513 insertions(+), 130 deletions(-) create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/util/RegionUtilV1.java rename nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/{RegionUtil.java => RegionUtilV2.java} (93%) create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/service/S3FileResourceService.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/service/S3FileResourceServiceTest.java diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java index f9a81ad208..5215186e36 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java @@ -29,7 +29,6 @@ import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.http.conn.ssl.DefaultHostnameVerifier; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ConfigVerificationResult.Outcome; import org.apache.nifi.components.PropertyDescriptor; @@ -58,6 +57,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION; + /** * Base class for AWS processors that uses AWSCredentialsProvider interface for creating AWS clients. * @@ -92,14 +93,6 @@ public abstract class AbstractAWSCredentialsProviderProcessor values = new ArrayList<>(); - for (final Regions region : Regions.values()) { - values.add(createAllowableValue(region)); - } - return values.toArray(new AllowableValue[0]); - } - - @Override public void migrateProperties(final PropertyConfiguration config) { migrateAuthenticationProperties(config); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java index 9d8f643c44..d449218d4f 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java @@ -22,7 +22,6 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.Signer; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Region; -import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Builder; import com.amazonaws.services.s3.AmazonS3Client; @@ -35,10 +34,8 @@ import com.amazonaws.services.s3.model.EmailAddressGrantee; import com.amazonaws.services.s3.model.Grantee; import com.amazonaws.services.s3.model.Owner; import com.amazonaws.services.s3.model.Permission; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ConfigVerificationResult.Outcome; import org.apache.nifi.components.PropertyDescriptor; @@ -49,7 +46,6 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; import org.apache.nifi.processors.aws.signer.AwsCustomSignerUtil; @@ -62,11 +58,13 @@ import java.util.List; import java.util.Map; import java.util.function.Consumer; -import static java.lang.String.format; import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V2_SIGNER; import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V4_SIGNER; import static org.apache.nifi.processors.aws.signer.AwsSignerType.CUSTOM_SIGNER; import static org.apache.nifi.processors.aws.signer.AwsSignerType.DEFAULT_SIGNER; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.ATTRIBUTE_DEFINED_REGION; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveS3Region; public abstract class AbstractS3Processor extends AbstractAWSCredentialsProviderProcessor { @@ -182,16 +180,6 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider .dynamicallyModifiesClasspath(true) .build(); - public static final String S3_REGION_ATTRIBUTE = "s3.region" ; - static final AllowableValue ATTRIBUTE_DEFINED_REGION = new AllowableValue("attribute-defined-region", - "Use '" + S3_REGION_ATTRIBUTE + "' Attribute", - "Uses '" + S3_REGION_ATTRIBUTE + "' FlowFile attribute as region."); - - public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(REGION) - .allowableValues(getAvailableS3Regions()) - .build(); - public static final PropertyDescriptor ENCRYPTION_SERVICE = new PropertyDescriptor.Builder() .name("encryption-service") .displayName("Encryption Service") @@ -292,7 +280,7 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider * @return The created S3 client */ protected AmazonS3Client getS3Client(final ProcessContext context, final Map attributes) { - final Region region = resolveRegion(context, attributes); + final Region region = resolveS3Region(context, attributes); return getClient(context, region); } @@ -303,7 +291,7 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider * @return The newly created S3 client */ protected AmazonS3Client createClient(final ProcessContext context, final Map attributes) { - final Region region = resolveRegion(context, attributes); + final Region region = resolveS3Region(context, attributes); return createClient(context, region); } @@ -451,36 +439,8 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider return cannedAcl; } - private Region parseRegionValue(String regionValue) { - if (regionValue == null) { - throw new ProcessException(format("[%s] was selected as region source but [%s] attribute does not exist", ATTRIBUTE_DEFINED_REGION, S3_REGION_ATTRIBUTE)); - } - - try { - return Region.getRegion(Regions.fromName(regionValue)); - } catch (Exception e) { - throw new ProcessException(format("The [%s] attribute contains an invalid region value [%s]", S3_REGION_ATTRIBUTE, regionValue), e); - } - } - - private Region resolveRegion(final ProcessContext context, final Map attributes) { - String regionValue = context.getProperty(S3_REGION).getValue(); - - if (ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue)) { - regionValue = attributes.get(S3_REGION_ATTRIBUTE); - } - - return parseRegionValue(regionValue); - } - private boolean isAttributeDefinedRegion(final ProcessContext context) { String regionValue = context.getProperty(S3_REGION).getValue(); return ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue); } - - private static AllowableValue[] getAvailableS3Regions() { - final AllowableValue[] availableRegions = getAvailableRegions(); - return ArrayUtils.addAll(availableRegions, ATTRIBUTE_DEFINED_REGION); - } - } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/util/RegionUtilV1.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/util/RegionUtilV1.java new file mode 100644 index 0000000000..5bb534a577 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/util/RegionUtilV1.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.util; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.Arrays; +import java.util.Map; + +/** + * Utility class for AWS region methods. This class uses AWS SDK v1. + * + */ +public final class RegionUtilV1 { + + private RegionUtilV1() { + } + + public static final String S3_REGION_ATTRIBUTE = "s3.region" ; + public static final AllowableValue ATTRIBUTE_DEFINED_REGION = new AllowableValue("attribute-defined-region", + "Use '" + S3_REGION_ATTRIBUTE + "' Attribute", + "Uses '" + S3_REGION_ATTRIBUTE + "' FlowFile attribute as region."); + + public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder() + .name("Region") + .description("The AWS Region to connect to.") + .required(true) + .allowableValues(getAvailableRegions()) + .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue()) + .build(); + + public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(REGION) + .allowableValues(getAvailableS3Regions()) + .build(); + + public static Region resolveS3Region(final PropertyContext context, final Map attributes) { + String regionValue = context.getProperty(S3_REGION).getValue(); + + if (ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue)) { + regionValue = attributes.get(S3_REGION_ATTRIBUTE); + } + + return parseS3RegionValue(regionValue); + } + + public static AllowableValue[] getAvailableS3Regions() { + final AllowableValue[] availableRegions = getAvailableRegions(); + return ArrayUtils.add(availableRegions, ATTRIBUTE_DEFINED_REGION); + } + + public static AllowableValue createAllowableValue(final Regions region) { + return new AllowableValue(region.getName(), region.getDescription(), "AWS Region Code : " + region.getName()); + } + + public static AllowableValue[] getAvailableRegions() { + return Arrays.stream(Regions.values()) + .map(RegionUtilV1::createAllowableValue) + .toArray(AllowableValue[]::new); + } + + private static Region parseS3RegionValue(String regionValue) { + if (regionValue == null) { + throw new ProcessException(String.format("[%s] was selected as region source but [%s] attribute does not exist", ATTRIBUTE_DEFINED_REGION, S3_REGION_ATTRIBUTE)); + } + + try { + return Region.getRegion(Regions.fromName(regionValue)); + } catch (Exception e) { + throw new ProcessException(String.format("The [%s] attribute contains an invalid region value [%s]", S3_REGION_ATTRIBUTE, regionValue), e); + } + } + +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java index a6a2a29bb1..7a0b6d9bda 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java @@ -109,8 +109,8 @@ public abstract class AbstractAwsProcessor extends Abstract public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder() .name("Region") .required(true) - .allowableValues(RegionUtil.getAvailableRegions()) - .defaultValue(RegionUtil.createAllowableValue(Region.US_WEST_2).getValue()) + .allowableValues(RegionUtilV2.getAvailableRegions()) + .defaultValue(RegionUtilV2.createAllowableValue(Region.US_WEST_2).getValue()) .build(); public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtil.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtilV2.java similarity index 93% rename from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtil.java rename to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtilV2.java index 45814f5c93..40be091198 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtil.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtilV2.java @@ -25,10 +25,13 @@ import java.util.Comparator; import java.util.List; /** - * Utility class for AWS region methods. + * Utility class for AWS region methods. This class uses AWS SDK v2. * */ -public abstract class RegionUtil { +public final class RegionUtilV2 { + + private RegionUtilV2() { + } /** * Creates an AllowableValue from a Region. diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java index 4c4ca71589..7e4694ed79 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java @@ -38,6 +38,8 @@ import org.apache.nifi.processor.util.StandardValidators; import java.util.List; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + @SupportsBatching @WritesAttributes({ diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index c0cae164b8..21f6371c61 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -62,6 +62,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + @SupportsBatching @SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class}) @InputRequirement(Requirement.INPUT_REQUIRED) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index b1b085ca38..b703b14c26 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -99,6 +99,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION; + @PrimaryNodeOnly @TriggerSerially @TriggerWhenEmpty @@ -518,7 +520,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { if (writerFactory == null) { writer = new AttributeObjectWriter(session); } else { - writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(S3_REGION).getValue()); + writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(REGION).getValue()); } try { @@ -540,7 +542,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary); // Write the entity to the listing - writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue()); + writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(REGION).getValue()); listCount++; } @@ -605,7 +607,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { if (writerFactory == null) { writer = new AttributeObjectWriter(session); } else { - writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(S3_REGION).getValue()); + writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(REGION).getValue()); } try { @@ -626,7 +628,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { // Write the entity to the listing final GetObjectTaggingResult taggingResult = getTaggingResult(context, client, versionSummary); final ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary); - writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue()); + writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(REGION).getValue()); // Track the latest lastModified timestamp and keys having that timestamp. // NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps. @@ -736,7 +738,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { if (writerFactory == null) { writer = new AttributeObjectWriter(session); } else { - writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(S3_REGION).getValue()); + writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(REGION).getValue()); } try { @@ -750,7 +752,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { final AmazonS3Client s3Client = getClient(context); final GetObjectTaggingResult taggingResult = getTaggingResult(context, s3Client, s3VersionSummary); final ObjectMetadata objectMetadata = getObjectMetadata(context, s3Client, s3VersionSummary); - writer.addToListing(s3VersionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue()); + writer.addToListing(s3VersionSummary, taggingResult, objectMetadata, context.getProperty(REGION).getValue()); listCount++; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 26c759417a..9237f77774 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -89,6 +89,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE; import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE; import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java index cb06bf735d..8665c6378b 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java @@ -47,6 +47,8 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.Collectors; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + @SupportsBatching @WritesAttributes({ diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java index c47911abd5..254435299c 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java @@ -36,8 +36,8 @@ import org.apache.nifi.components.Validator; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.processors.aws.s3.AbstractS3Processor; import org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.StringUtils; import org.slf4j.Logger; @@ -108,8 +108,8 @@ public class StandardS3EncryptionService extends AbstractControllerService imple .displayName("KMS Region") .description("The Region of the AWS Key Management Service. Only used in case of Client-side KMS.") .required(false) - .allowableValues(AbstractS3Processor.getAvailableRegions()) - .defaultValue(AbstractS3Processor.createAllowableValue(Regions.DEFAULT_REGION).getValue()) + .allowableValues(RegionUtilV1.getAvailableRegions()) + .defaultValue(RegionUtilV1.createAllowableValue(Regions.DEFAULT_REGION).getValue()) .build(); private String keyValue = ""; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/service/S3FileResourceService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/service/S3FileResourceService.java new file mode 100644 index 0000000000..c797d854d1 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/service/S3FileResourceService.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.service; + +import com.amazonaws.SdkClientException; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.S3Object; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.fileresource.service.api.FileResourceService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; +import org.apache.nifi.processors.aws.s3.AbstractS3Processor; +import org.apache.nifi.processors.aws.s3.FetchS3Object; +import org.apache.nifi.processors.aws.util.RegionUtilV1; + +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveS3Region; +import static org.apache.nifi.util.StringUtils.isBlank; + +@Tags({"Amazon", "S3", "AWS", "file", "resource"}) +@SeeAlso({FetchS3Object.class}) +@CapabilityDescription("Provides an Amazon Web Services (AWS) S3 file resource for other components.") +@UseCase( + description = "Fetch a specific file from S3. " + + "The service provides higher performance compared to fetch processors when the data should be moved between different storages without any transformation.", + configuration = """ + "Bucket" = "${s3.bucket}" + "Object Key" = "${filename}" + + The "Region" property must be set to denote the S3 region that the Bucket resides in. + + The "AWS Credentials Provider Service" property should specify an instance of the AWSCredentialsProviderService in order to provide credentials for accessing the bucket. + """ +) +public class S3FileResourceService extends AbstractControllerService implements FileResourceService { + + public static final PropertyDescriptor BUCKET_WITH_DEFAULT_VALUE = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractS3Processor.BUCKET_WITH_DEFAULT_VALUE) + .build(); + + public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractS3Processor.KEY) + .build(); + + public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(RegionUtilV1.S3_REGION) + .build(); + + private static final List PROPERTIES = List.of( + BUCKET_WITH_DEFAULT_VALUE, + KEY, + S3_REGION, + AWS_CREDENTIALS_PROVIDER_SERVICE); + + private final Cache clientCache = Caffeine.newBuilder().build(); + + private volatile PropertyContext context; + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.context = context; + } + + @OnDisabled + public void onDisabled() { + this.context = null; + clientCache.asMap().values().forEach(AmazonS3::shutdown); + clientCache.invalidateAll(); + clientCache.cleanUp(); + } + + @Override + public FileResource getFileResource(Map attributes) { + final AWSCredentialsProviderService awsCredentialsProviderService = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE) + .asControllerService(AWSCredentialsProviderService.class); + final AmazonS3 client = getS3Client(attributes, awsCredentialsProviderService.getCredentialsProvider()); + + try { + return fetchObject(client, attributes); + } catch (final ProcessException | SdkClientException e) { + throw new ProcessException("Failed to fetch s3 object", e); + } + } + + /** + * Fetches s3 object from the provided bucket and returns it as FileResource + * + * @param client amazon s3 client + * @param attributes configuration attributes + * @return fetched s3 object as FileResource + * @throws ProcessException if the object 'bucketName/key' does not exist + */ + private FileResource fetchObject(final AmazonS3 client, final Map attributes) throws ProcessException, + SdkClientException { + final String bucketName = context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(attributes).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue(); + + if (isBlank(bucketName) || isBlank(key)) { + throw new ProcessException("Bucket name or key value is missing"); + } + + if (!client.doesObjectExist(bucketName, key)) { + throw new ProcessException(String.format("Object '%s/%s' does not exist in s3", bucketName, key)); + } + + final S3Object object = client.getObject(bucketName, key); + return new FileResource(object.getObjectContent(), object.getObjectMetadata().getContentLength()); + } + + protected AmazonS3 getS3Client(Map attributes, AWSCredentialsProvider credentialsProvider) { + final Region region = resolveS3Region(context, attributes); + return clientCache.get(region, ignored -> AmazonS3Client.builder() + .withRegion(region.getName()) + .withCredentials(credentialsProvider) + .build()); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java index 7fab520043..20311eb977 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java @@ -56,6 +56,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION; + @SupportsBatching @InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"Amazon", "AWS", "Client", "Gateway-API", "Rest", "http", "https"}) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 390d4c908b..ca338c3972 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -15,3 +15,4 @@ org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService +org.apache.nifi.processors.aws.s3.service.S3FileResourceService diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java index f8eb6dd5b2..3dff94a4fa 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java @@ -37,6 +37,7 @@ import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.Tag; import org.apache.nifi.processors.aws.testutil.AuthUtils; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -251,7 +252,7 @@ public abstract class AbstractS3IT { Assertions.fail("Could not set security properties"); } - runner.setProperty(AbstractS3Processor.S3_REGION, getRegion()); + runner.setProperty(RegionUtilV1.S3_REGION, getRegion()); runner.setProperty(AbstractS3Processor.ENDPOINT_OVERRIDE, getEndpointOverride()); runner.setProperty(AbstractS3Processor.BUCKET_WITHOUT_DEFAULT_VALUE, BUCKET_NAME); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java index 265de4f45f..7d57ff007b 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.aws.s3; import com.amazonaws.services.s3.model.ObjectMetadata; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -91,7 +92,7 @@ public class ITFetchS3Object extends AbstractS3IT { final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object()); setSecureProperties(runner); - runner.setProperty(FetchS3Object.S3_REGION, getRegion()); + runner.setProperty(RegionUtilV1.S3_REGION, getRegion()); runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, BUCKET_NAME); final Map attrs = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java index 43f936c9f0..cf2b660977 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java @@ -34,6 +34,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; @@ -447,7 +448,7 @@ public class ITPutS3Object extends AbstractS3IT { final TestRunner runner = initTestRunner(); setSecureProperties(runner); - runner.setProperty(PutS3Object.S3_REGION, getRegion()); + runner.setProperty(RegionUtilV1.S3_REGION, getRegion()); runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, BUCKET_NAME); runner.setProperty(PutS3Object.KEY, "${filename}"); @@ -764,7 +765,7 @@ public class ITPutS3Object extends AbstractS3IT { final TestRunner runner = initTestRunner(); setSecureProperties(runner); - runner.setProperty(PutS3Object.S3_REGION, getRegion()); + runner.setProperty(RegionUtilV1.S3_REGION, getRegion()); runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, BUCKET_NAME); runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, TEST_PARTSIZE_STRING); runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java index c434c012fb..58fb0c2d1b 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java @@ -22,6 +22,7 @@ import com.amazonaws.services.s3.model.DeleteObjectRequest; import com.amazonaws.services.s3.model.DeleteVersionRequest; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.aws.testutil.AuthUtils; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; @@ -56,7 +57,7 @@ public class TestDeleteS3Object { @Test public void testDeleteObjectSimple() { - runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); final Map attrs = new HashMap<>(); attrs.put("filename", "delete-key"); @@ -75,7 +76,7 @@ public class TestDeleteS3Object { @Test public void testDeleteObjectSimpleRegionFromFlowFileAttribute() { - runner.setProperty(DeleteS3Object.S3_REGION, "attribute-defined-region"); + runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region"); runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); final Map attrs = new HashMap<>(); attrs.put("filename", "delete-key"); @@ -89,7 +90,7 @@ public class TestDeleteS3Object { @Test public void testDeleteObjectS3Exception() { - runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); final Map attrs = new HashMap<>(); attrs.put("filename", "delete-key"); @@ -105,7 +106,7 @@ public class TestDeleteS3Object { @Test public void testDeleteVersionSimple() { - runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(DeleteS3Object.VERSION_ID, "test-version"); final Map attrs = new HashMap<>(); @@ -126,7 +127,7 @@ public class TestDeleteS3Object { @Test public void testDeleteVersionFromExpressions() { - runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "${s3.bucket}"); runner.setProperty(DeleteS3Object.VERSION_ID, "${s3.version}"); final Map attrs = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java index db1406a0d5..9d30ac0770 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java @@ -32,6 +32,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processors.aws.testutil.AuthUtils; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -80,7 +81,7 @@ public class TestFetchS3Object { @Test public void testGetObject() throws IOException { - runner.setProperty(FetchS3Object.S3_REGION, "attribute-defined-region"); + runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region"); runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket"); final Map attrs = new HashMap<>(); attrs.put("filename", "request-key"); @@ -147,7 +148,7 @@ public class TestFetchS3Object { @Test public void testGetObjectWithRequesterPays() throws IOException { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket"); runner.setProperty(FetchS3Object.REQUESTER_PAYS, "true"); final Map attrs = new HashMap<>(); @@ -205,7 +206,7 @@ public class TestFetchS3Object { @Test public void testGetObjectVersion() throws IOException { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket"); runner.setProperty(FetchS3Object.VERSION_ID, "${s3.version}"); final Map attrs = new HashMap<>(); @@ -245,7 +246,7 @@ public class TestFetchS3Object { @Test public void testGetObjectExceptionGoesToFailure() { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket"); final Map attrs = new HashMap<>(); attrs.put("filename", "request-key"); @@ -259,7 +260,7 @@ public class TestFetchS3Object { @Test public void testFetchObject_FailAdditionalAttributesBucketName() { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket-bad-name"); final Map attrs = new HashMap<>(); attrs.put("filename", "request-key"); @@ -288,7 +289,7 @@ public class TestFetchS3Object { @Test public void testFetchObject_FailAdditionalAttributesAuthentication() { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket-bad-name"); final Map attrs = new HashMap<>(); attrs.put("filename", "request-key"); @@ -312,7 +313,7 @@ public class TestFetchS3Object { @Test public void testFetchObject_FailAdditionalAttributesNetworkFailure() { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket-bad-name"); final Map attrs = new HashMap<>(); attrs.put("filename", "request-key"); @@ -330,7 +331,7 @@ public class TestFetchS3Object { @Test public void testGetObjectReturnsNull() { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket"); final Map attrs = new HashMap<>(); attrs.put("filename", "request-key"); @@ -344,7 +345,7 @@ public class TestFetchS3Object { @Test public void testFlowFileAccessExceptionGoesToFailure() { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "request-bucket"); final Map attrs = new HashMap<>(); attrs.put("filename", "request-key"); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java index e5655ebdf0..52cf531f44 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java @@ -40,6 +40,7 @@ import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.VerifiableProcessor; import org.apache.nifi.processors.aws.testutil.AuthUtils; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.state.MockStateManager; @@ -102,7 +103,7 @@ public class TestListS3 { @Test public void testList() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); Date lastModified = new Date(); @@ -154,7 +155,7 @@ public class TestListS3 { @Test public void testListWithRecords() throws InitializationException { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); final MockRecordWriter recordWriter = new MockRecordWriter(null, false); @@ -205,7 +206,7 @@ public class TestListS3 { @Test public void testListWithRequesterPays() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(ListS3.REQUESTER_PAYS, "true"); @@ -251,7 +252,7 @@ public class TestListS3 { @Test public void testListWithRequesterPays_invalid() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(ListS3.USE_VERSIONS, "true"); // requester pays cannot be used with versions runner.setProperty(ListS3.REQUESTER_PAYS, "true"); @@ -261,7 +262,7 @@ public class TestListS3 { @Test public void testListVersion2() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(ListS3.LIST_TYPE, "2"); @@ -307,7 +308,7 @@ public class TestListS3 { @Test public void testListVersion2WithRequesterPays() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(ListS3.REQUESTER_PAYS, "true"); runner.setProperty(ListS3.LIST_TYPE, "2"); @@ -354,7 +355,7 @@ public class TestListS3 { @Test public void testListVersions() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(ListS3.USE_VERSIONS, "true"); @@ -398,7 +399,7 @@ public class TestListS3 { @Test public void testListObjectsNothingNew() throws IOException { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); Calendar calendar = Calendar.getInstance(); @@ -434,7 +435,7 @@ public class TestListS3 { @Test public void testListIgnoreByMinAge() throws IOException { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(ListS3.MIN_AGE, "30 sec"); @@ -485,7 +486,7 @@ public class TestListS3 { @Test public void testListIgnoreByMaxAge() throws IOException { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(ListS3.MAX_AGE, "30 sec"); Date lastModifiedNow = new Date(); @@ -533,7 +534,7 @@ public class TestListS3 { @Test public void testWriteObjectTags() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(ListS3.WRITE_OBJECT_TAGS, "true"); @@ -560,7 +561,7 @@ public class TestListS3 { @Test public void testWriteUserMetadata() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(ListS3.WRITE_USER_METADATA, "true"); @@ -588,7 +589,7 @@ public class TestListS3 { @Test public void testNoTrackingList() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(ListS3.LISTING_STRATEGY, ListS3.NO_TRACKING); @@ -676,7 +677,7 @@ public class TestListS3 { assertEquals(TEST_TIMESTAMP, listS3.getListingSnapshot().getTimestamp()); - runner.setProperty(ListS3.REGION, Regions.EU_CENTRAL_1.getName()); + runner.setProperty(RegionUtilV1.REGION, Regions.EU_CENTRAL_1.getName()); assertTrue(listS3.isResetTracking()); @@ -765,7 +766,7 @@ public class TestListS3 { assertNotNull(listS3.getListedEntityTracker()); - runner.setProperty(ListS3.REGION, Regions.EU_CENTRAL_1.getName()); + runner.setProperty(RegionUtilV1.REGION, Regions.EU_CENTRAL_1.getName()); assertTrue(listS3.isResetTracking()); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java index ab7bf59fb4..de42e4cc20 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java @@ -43,6 +43,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.aws.signer.AwsSignerType; import org.apache.nifi.processors.aws.testutil.AuthUtils; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -245,7 +246,7 @@ public class TestPutS3Object { } private void prepareTest(String filename) { - runner.setProperty(PutS3Object.S3_REGION, "ap-northeast-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "ap-northeast-1"); runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.assertValid(); @@ -258,7 +259,7 @@ public class TestPutS3Object { } private void prepareTestWithRegionInAttributes(String filename, String region) { - runner.setProperty(PutS3Object.S3_REGION, "attribute-defined-region"); + runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region"); runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.assertValid(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java index 263214f584..0968fbceba 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java @@ -27,6 +27,7 @@ import com.amazonaws.services.s3.model.SetObjectTaggingRequest; import com.amazonaws.services.s3.model.Tag; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.aws.testutil.AuthUtils; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -70,7 +71,7 @@ public class TestTagS3Object { public void testTagObjectSimple() { final String tagKey = "k"; final String tagVal = "v"; - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, tagKey); runner.setProperty(TagS3Object.TAG_VALUE, tagVal); @@ -98,7 +99,7 @@ public class TestTagS3Object { @Test public void testTagObjectSimpleRegionFromFlowFileAttribute() { - runner.setProperty(TagS3Object.S3_REGION, "attribute-defined-region"); + runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region"); runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, "k"); runner.setProperty(TagS3Object.TAG_VALUE, "v"); @@ -118,7 +119,7 @@ public class TestTagS3Object { public void testTagObjectVersion() { final String tagKey = "k"; final String tagVal = "v"; - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(TagS3Object.VERSION_ID, "test-version"); runner.setProperty(TagS3Object.TAG_KEY, tagKey); @@ -148,7 +149,7 @@ public class TestTagS3Object { final String tagKey = "nk"; final String tagVal = "nv"; - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, tagKey); runner.setProperty(TagS3Object.TAG_VALUE, tagVal); @@ -183,7 +184,7 @@ public class TestTagS3Object { final String tagKey = "nk"; final String tagVal = "nv"; - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, tagKey); runner.setProperty(TagS3Object.TAG_VALUE, tagVal); @@ -212,7 +213,7 @@ public class TestTagS3Object { final String tagKey = "nk"; final String tagVal = "nv"; - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, tagKey); runner.setProperty(TagS3Object.TAG_VALUE, tagVal); @@ -247,7 +248,7 @@ public class TestTagS3Object { final String tagKey = "nk"; final String tagVal = "nv"; - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, tagKey); runner.setProperty(TagS3Object.TAG_VALUE, tagVal); @@ -263,7 +264,7 @@ public class TestTagS3Object { @Test public void testBucketEvaluatedAsBlank() { - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "${not.existant.attribute}"); runner.setProperty(TagS3Object.TAG_KEY, "key"); runner.setProperty(TagS3Object.TAG_VALUE, "val"); @@ -278,7 +279,7 @@ public class TestTagS3Object { @Test public void testTagKeyEvaluatedAsBlank() { - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, "${not.existant.attribute}"); runner.setProperty(TagS3Object.TAG_VALUE, "val"); @@ -293,7 +294,7 @@ public class TestTagS3Object { @Test public void testTagValEvaluatedAsBlank() { - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, "tagKey"); runner.setProperty(TagS3Object.TAG_VALUE, "${not.existant.attribute}"); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/service/S3FileResourceServiceTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/service/S3FileResourceServiceTest.java new file mode 100644 index 0000000000..674bd5504a --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/service/S3FileResourceServiceTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3.service; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Map; + +import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class S3FileResourceServiceTest { + private static final String CONTROLLER_SERVICE = "AWSCredentialsService"; + private static final String BUCKET_NAME = "test-bucket"; + private static final String KEY = "key"; + private static final long CONTENT_LENGTH = 10L; + + @Mock + private AmazonS3 client; + + @Mock + private S3Object s3Object; + + @Mock + private ObjectMetadata metadata; + + @Mock + private S3ObjectInputStream inputStream; + + @InjectMocks + private TestS3FileResourceService service; + private TestRunner runner; + + @BeforeEach + public void setup() throws InitializationException { + runner = TestRunners.newTestRunner(NoOpProcessor.class); + runner.addControllerService("S3FileResourceService", service); + } + + @Test + void testGetFileResourceHappyPath() throws InitializationException { + setupS3Client(); + setupService(); + + FileResource fileResource = service.getFileResource(Map.of()); + assertFileResource(fileResource); + } + + @Test + void testNonExistingObject() throws InitializationException { + when(client.doesObjectExist(BUCKET_NAME, KEY)).thenReturn(false); + setupService(); + + assertThrows(ProcessException.class, () -> service.getFileResource(Map.of()), "Failed to fetch s3 object"); + verify(client).doesObjectExist(BUCKET_NAME, KEY); + verifyNoMoreInteractions(client); + } + + @Test + void testValidBlobUsingELButMissingAttribute() throws InitializationException { + setupService("${s3.bucket}", "${key}"); + + assertThrows(ProcessException.class, + () -> service.getFileResource(Map.of()), "Bucket name or key value is missing"); + verifyNoInteractions(client); + } + + @Test + void testValidBlobUsingEL() throws InitializationException { + String bucketProperty = "s3.bucket"; + String keyProperty = "key"; + setupService("${" + bucketProperty + "}", "${" + keyProperty + "}"); + setupS3Client(); + + FileResource fileResource = service.getFileResource(Map.of( + bucketProperty, BUCKET_NAME, + keyProperty, KEY)); + assertFileResource(fileResource); + } + + private void assertFileResource(FileResource fileResource) { + assertNotNull(fileResource); + assertEquals(fileResource.getInputStream(), inputStream); + assertEquals(fileResource.getSize(), CONTENT_LENGTH); + verify(client).doesObjectExist(BUCKET_NAME, KEY); + verify(client).getObject(BUCKET_NAME, KEY); + verify(s3Object).getObjectMetadata(); + verify(metadata).getContentLength(); + verify(s3Object).getObjectContent(); + } + + private void setupService() throws InitializationException { + setupService(BUCKET_NAME, KEY); + } + + private void setupService(String bucket, String key) throws InitializationException { + final AWSCredentialsProviderService credentialsService = new AWSCredentialsProviderControllerService(); + + runner.addControllerService(CONTROLLER_SERVICE, credentialsService); + runner.enableControllerService(credentialsService); + + runner.setProperty(service, AWS_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); + runner.setProperty(service, S3FileResourceService.KEY, key); + runner.setProperty(service, S3FileResourceService.BUCKET_WITH_DEFAULT_VALUE, bucket); + + runner.enableControllerService(service); + } + + private void setupS3Client() { + when(client.doesObjectExist(BUCKET_NAME, KEY)).thenReturn(true); + when(client.getObject(BUCKET_NAME, KEY)).thenReturn(s3Object); + when(s3Object.getObjectContent()).thenReturn(inputStream); + when(s3Object.getObjectMetadata()).thenReturn(metadata); + when(metadata.getContentLength()).thenReturn(CONTENT_LENGTH); + } + + private static class TestS3FileResourceService extends S3FileResourceService { + + private final AmazonS3 client; + + private TestS3FileResourceService(AmazonS3 client) { + this.client = client; + } + + @Override + protected AmazonS3 getS3Client(Map attributes, AWSCredentialsProvider credentialsProvider) { + return client; + } + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/AbstractSQSIT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/AbstractSQSIT.java index c0c7981b79..378b38a07f 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/AbstractSQSIT.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/AbstractSQSIT.java @@ -18,8 +18,8 @@ package org.apache.nifi.processors.aws.sqs; import org.apache.nifi.processor.Processor; -import org.apache.nifi.processors.aws.s3.AbstractS3Processor; import org.apache.nifi.processors.aws.testutil.AuthUtils; +import org.apache.nifi.processors.aws.v2.AbstractAwsProcessor; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.AfterAll; @@ -84,8 +84,8 @@ public abstract class AbstractSQSIT { TestRunner runner = TestRunners.newTestRunner(processorClass); AuthUtils.enableAccessKey(runner, localstack.getAccessKey(), localstack.getSecretKey()); - runner.setProperty(AbstractS3Processor.S3_REGION, localstack.getRegion()); - runner.setProperty(AbstractS3Processor.ENDPOINT_OVERRIDE, localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString()); + runner.setProperty(AbstractAwsProcessor.REGION, localstack.getRegion()); + runner.setProperty(AbstractAwsProcessor.ENDPOINT_OVERRIDE, localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString()); runner.setProperty("Queue URL", queueUrl); return runner; } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAWSGatewayApiCommon.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAWSGatewayApiCommon.java index 2506524d67..4d4adf5bde 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAWSGatewayApiCommon.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAWSGatewayApiCommon.java @@ -22,6 +22,7 @@ import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processors.aws.testutil.AuthUtils; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.proxy.StandardProxyConfigurationService; @@ -66,7 +67,7 @@ public abstract class TestInvokeAWSGatewayApiCommon { } public void setupEndpointAndRegion() { - runner.setProperty(InvokeAWSGatewayApi.REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.REGION, "us-east-1"); runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_API_KEY, "abcd"); runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_GATEWAY_API_ENDPOINT, mockWebServer.url("/").toString()); } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java index 8a62496efc..d10a7346c8 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java @@ -30,6 +30,7 @@ import org.apache.http.message.BasicStatusLine; import org.apache.http.protocol.HttpContext; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processors.aws.testutil.AuthUtils; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -64,7 +65,7 @@ public class TestInvokeAmazonGatewayApiMock { AuthUtils.enableAccessKey(runner, "awsAccessKey", "awsSecretKey"); - runner.setProperty(InvokeAWSGatewayApi.REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.REGION, "us-east-1"); runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_API_KEY, "abcd"); runner.setProperty(InvokeAWSGatewayApi.PROP_RESOURCE_NAME, "/TEST"); runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_GATEWAY_API_ENDPOINT, "https://foobar.execute-api.us-east-1.amazonaws.com");