NIFI-11255 Allowable value for 'Use s3.region Attribute'

This closes #7051.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
krisztina-zsihovszki 2023-03-07 09:51:50 +01:00 committed by Peter Turcsanyi
parent 51f7d6d747
commit 9a002d9a43
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
64 changed files with 835 additions and 404 deletions

View File

@ -107,6 +107,11 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.2</version>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -19,7 +19,6 @@ package org.apache.nifi.processors.aws;
import com.amazonaws.AmazonWebServiceClient;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
@ -34,7 +33,7 @@ import java.util.List;
import java.util.Map;
/**
* Base class for aws processors that uses AWSCredentialsProvider interface for creating aws clients.
* Base class for AWS processors that uses AWSCredentialsProvider interface for creating AWS clients.
*
* @param <ClientType> client type
*
@ -51,7 +50,7 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends
public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
.name("AWS Credentials Provider service")
.displayName("AWS Credentials Provider Service")
.description("The Controller Service that is used to obtain aws credentials provider")
.description("The Controller Service that is used to obtain AWS credentials provider")
.required(false)
.identifiesControllerService(AWSCredentialsProviderService.class)
.build();
@ -61,30 +60,29 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends
* @param context The process context
* @return The created client
*/
protected ClientType createClient(final ProcessContext context) {
@Override
public ClientType createClient(final ProcessContext context, AwsClientDetails awsClientDetails) {
final ControllerService service = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService();
if (service != null) {
getLogger().debug("Using aws credentials provider service for creating client");
return createClient(context, getCredentialsProvider(context), createConfiguration(context));
getLogger().debug("Using AWS credentials provider service for creating client");
final AWSCredentialsProvider credentialsProvider = getCredentialsProvider(context);
final ClientConfiguration configuration = createConfiguration(context);
final ClientType createdClient = createClient(context, credentialsProvider, configuration);
setRegionAndInitializeEndpoint(awsClientDetails.getRegion(), context, createdClient);
return createdClient;
} else {
getLogger().debug("Using aws credentials for creating client");
return super.createClient(context);
getLogger().debug("Using AWS credentials for creating client");
return super.createClient(context, awsClientDetails);
}
}
@OnShutdown
public void onShutDown() {
if ( this.client != null ) {
this.client.shutdown();
}
}
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
final List<ConfigVerificationResult> results = new ArrayList<>();
try {
getConfiguration(context);
createClient(context);
results.add(new ConfigVerificationResult.Builder()
.outcome(Outcome.SUCCESSFUL)
.verificationStepName("Create Client and Configure Region")
@ -114,15 +112,14 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService(AWSCredentialsProviderService.class);
return awsCredentialsProviderService.getCredentialsProvider();
}
/**
* Abstract method to create aws client using credentials provider. This is the preferred method
* for creating aws clients
* Abstract method to create AWS client using credentials provider. This is the preferred method
* for creating AWS clients
* @param context process context
* @param credentialsProvider aws credentials provider
* @param config aws client configuration
* @param credentialsProvider AWS credentials provider
* @param config AWS client configuration
* @return ClientType the client
*/
protected abstract ClientType createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config);

View File

@ -30,7 +30,7 @@ import com.amazonaws.regions.Regions;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@ -66,14 +66,14 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Abstract base class for aws processors. This class uses aws credentials for creating aws clients
* Abstract base class for AWS processors. This class uses AWS credentials for creating AWS clients
*
* @deprecated use {@link AbstractAWSCredentialsProviderProcessor} instead which uses credentials providers or creating aws clients
* @deprecated use {@link AbstractAWSCredentialsProviderProcessor} instead which uses credentials providers or creating AWS clients
* @see AbstractAWSCredentialsProviderProcessor
*
*/
@Deprecated
public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractSessionFactoryProcessor {
public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractSessionFactoryProcessor implements AwsClientProvider<ClientType> {
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("FlowFiles are routed to success relationship").build();
@ -153,9 +153,6 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
protected volatile ClientType client;
protected volatile Region region;
protected static final String VPCE_ENDPOINT_SUFFIX = ".vpce.amazonaws.com";
protected static final Pattern VPCE_ENDPOINT_PATTERN = Pattern.compile("^(?:.+[vpce-][a-z0-9-]+\\.)?([a-z0-9-]+)$");
@ -166,6 +163,8 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
private final AwsClientCache<ClientType> awsClientCache = new AwsClientCache<>();
public static AllowableValue createAllowableValue(final Regions region) {
return new AllowableValue(region.getName(), region.getDescription(), "AWS Region Code : " + region.getName());
}
@ -226,7 +225,6 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
return validationResults;
}
protected ClientConfiguration createConfiguration(final ProcessContext context) {
return createConfiguration(context, context.getMaxConcurrentTasks());
}
@ -287,7 +285,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
@OnScheduled
public void onScheduled(final ProcessContext context) {
setClientAndRegion(context);
getClient(context);
}
/*
@ -312,21 +310,23 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
*/
public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
protected Region getRegionAndInitializeEndpoint(final ProcessContext context, final AmazonWebServiceClient client) {
final Region region;
// if the processor supports REGION, get the configured region.
if (getSupportedPropertyDescriptors().contains(REGION)) {
final String regionValue = context.getProperty(REGION).getValue();
if (regionValue != null) {
region = Region.getRegion(Regions.fromName(regionValue));
if (client != null) {
client.setRegion(region);
}
} else {
region = null;
}
} else {
region = null;
public ClientType createClient(final ProcessContext context, AwsClientDetails awsClientDetails) {
final AWSCredentials credentials = getCredentials(context);
final ClientConfiguration configuration = createConfiguration(context);
final ClientType createdClient = createClient(context, credentials, configuration);
setRegionAndInitializeEndpoint(awsClientDetails.getRegion(), context, createdClient);
return createdClient;
}
protected ClientType createClient(ProcessContext context) {
return createClient(context, new AwsClientDetails(getRegion(context)));
}
protected void setRegionAndInitializeEndpoint(final Region region, final ProcessContext context, final AmazonWebServiceClient client) {
if (region!= null && client != null) {
client.setRegion(region);
}
// if the endpoint override has been configured, set the endpoint.
@ -353,7 +353,17 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
}
}
}
return region;
}
protected Region getRegion(final ProcessContext context) {
// if the processor supports REGION, get the configured region.
if (getSupportedPropertyDescriptors().contains(REGION)) {
final String regionValue = context.getProperty(REGION).getValue();
if (regionValue != null) {
return Region.getRegion(Regions.fromName(regionValue));
}
}
return null;
}
protected boolean isCustomSignerConfigured(final ProcessContext context) {
@ -382,26 +392,6 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
}
}
/**
* Create client from the arguments
* @param context process context
* @param credentials static aws credentials
* @param config aws client configuration
* @return ClientType aws client
*
* @deprecated use {@link AbstractAWSCredentialsProviderProcessor#createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)}
*/
@Deprecated
protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config);
protected ClientType getClient() {
return client;
}
protected Region getRegion() {
return region;
}
protected AWSCredentials getCredentials(final PropertyContext context) {
final String accessKey = context.getProperty(ACCESS_KEY).evaluateAttributeExpressions().getValue();
final String secretKey = context.getProperty(SECRET_KEY).evaluateAttributeExpressions().getValue();
@ -423,55 +413,35 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
return new AnonymousAWSCredentials();
}
@OnShutdown
public void onShutdown() {
if ( getClient() != null ) {
getClient().shutdown();
}
}
protected void setClientAndRegion(final ProcessContext context) {
final AWSConfiguration awsConfiguration = getConfiguration(context);
this.client = awsConfiguration.getClient();
this.region = awsConfiguration.getRegion();
@OnStopped
public void onStopped() {
this.awsClientCache.clearCache();
}
/**
* Creates an AWS service client from the context.
* Creates an AWS service client from the context or returns an existing client from the cache
* @param context The process context
* @param awsClientDetails details of the AWS client
* @return The created client
*/
protected ClientType createClient(final ProcessContext context) {
return createClient(context, getCredentials(context), createConfiguration(context));
protected ClientType getClient(final ProcessContext context, AwsClientDetails awsClientDetails) {
return this.awsClientCache.getOrCreateClient(context, awsClientDetails, this);
}
protected ClientType getClient(final ProcessContext context) {
final AwsClientDetails awsClientDetails = new AwsClientDetails(getRegion(context));
return getClient(context, awsClientDetails);
}
/**
* Parses and configures the client and region from the context.
* @param context The process context
* @return The parsed configuration
* Create client from the arguments
* @param context process context
* @param credentials static aws credentials
* @param config aws client configuration
* @return ClientType aws client
*
* @deprecated use {@link AbstractAWSCredentialsProviderProcessor#createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)}
*/
protected AWSConfiguration getConfiguration(final ProcessContext context) {
final ClientType client = createClient(context);
final Region region = getRegionAndInitializeEndpoint(context, client);
return new AWSConfiguration(client, region);
}
public class AWSConfiguration {
final ClientType client;
final Region region;
public AWSConfiguration(final ClientType client, final Region region) {
this.client = client;
this.region = region;
}
public ClientType getClient() {
return client;
}
public Region getRegion() {
return region;
}
}
@Deprecated
protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config);
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws;
import com.amazonaws.AmazonWebServiceClient;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.nifi.processor.ProcessContext;
public class AwsClientCache<ClientType extends AmazonWebServiceClient> {
private static final int MAXIMUM_CACHE_SIZE = 10;
private final Cache<AwsClientDetails, ClientType> clientCache = Caffeine.newBuilder()
.maximumSize(MAXIMUM_CACHE_SIZE)
.build();
public ClientType getOrCreateClient(final ProcessContext context, final AwsClientDetails clientDetails, final AwsClientProvider<ClientType> provider) {
return clientCache.get(clientDetails, ignored -> provider.createClient(context, clientDetails));
}
public void clearCache() {
clientCache.invalidateAll();
clientCache.cleanUp();
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws;
import com.amazonaws.regions.Region;
import java.util.Objects;
/**
* This class contains the AWS client details used to distinguish between the various AWS clients stored in the cache.
* The class acts as a cache key for @link AwsClientCache.
* AwsClientDetails contains the region only, since actually the region value may come from the FlowFile attributes.
*/
public class AwsClientDetails {
private Region region;
public AwsClientDetails(Region region) {
this.region = region;
}
public Region getRegion() {
return region;
}
public void setRegion(final Region region) {
this.region = region;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final AwsClientDetails that = (AwsClientDetails) o;
return Objects.equals(region, that.region);
}
@Override
public int hashCode() {
return Objects.hash(region);
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws;
import com.amazonaws.AmazonWebServiceClient;
import org.apache.nifi.processor.ProcessContext;
public interface AwsClientProvider<ClientType extends AmazonWebServiceClient> {
/**
* Creates an AWS client using process context and AWS client details.
*
* @param context process context
* @param awsClientDetails AWS client details
* @return AWS client
*/
ClientType createClient(final ProcessContext context, final AwsClientDetails awsClientDetails);
}

View File

@ -179,10 +179,7 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
@Override
protected AmazonDynamoDBClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
getLogger().debug("Creating client with credentials provider");
final AmazonDynamoDBClient client = new AmazonDynamoDBClient(credentialsProvider, config);
return client;
return new AmazonDynamoDBClient(credentialsProvider, config);
}
/**
@ -190,13 +187,11 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
*
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
*/
@Deprecated
@Override
protected AmazonDynamoDBClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
getLogger().debug("Creating client with aws credentials");
final AmazonDynamoDBClient client = new AmazonDynamoDBClient(credentials, config);
return client;
return new AmazonDynamoDBClient(credentials, config);
}
protected Object getValue(final ProcessContext context, final PropertyDescriptor type, final PropertyDescriptor value, final Map<String, String> attributes) {
@ -221,9 +216,9 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
return new DynamoDB(client);
}
protected synchronized DynamoDB getDynamoDB() {
protected synchronized DynamoDB getDynamoDB(ProcessContext context) {
if (dynamoDB == null) {
dynamoDB = getDynamoDB(client);
dynamoDB = getDynamoDB(getClient(context));
}
return dynamoDB;
}
@ -275,8 +270,6 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable()));
attributes.put(DYNAMODB_ERROR_REQUEST_ID, exception.getRequestId() );
attributes.put(DYNAMODB_ERROR_STATUS_CODE, Integer.toString(exception.getStatusCode()) );
attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable()));
flowFile = session.putAllAttributes(flowFile, attributes);
failedFlowFiles.add(flowFile);
}

View File

@ -55,6 +55,7 @@ public abstract class AbstractKinesisStreamProcessor extends AbstractBaseKinesis
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
*/
@Override
@Deprecated
protected AmazonKinesisClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
getLogger().info("Creating client using aws credentials");

View File

@ -22,6 +22,7 @@ import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.AccessControlList;
@ -32,14 +33,24 @@ import com.amazonaws.services.s3.model.EmailAddressGrantee;
import com.amazonaws.services.s3.model.Grantee;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
import java.util.Map;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AwsClientDetails;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.processors.aws.AwsPropertyDescriptors;
import org.apache.nifi.processors.aws.signer.AwsCustomSignerUtil;
import org.apache.nifi.processors.aws.signer.AwsSignerType;
@ -49,6 +60,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import static java.lang.String.format;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V2_SIGNER;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V4_SIGNER;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.CUSTOM_SIGNER;
@ -150,6 +162,16 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
.dependsOn(SIGNER_OVERRIDE, CUSTOM_SIGNER)
.build();
public static final String S3_REGION_ATTRIBUTE = "s3.region" ;
static final AllowableValue ATTRIBUTE_DEFINED_REGION = new AllowableValue("attribute-defined-region",
"Use '" + S3_REGION_ATTRIBUTE + "' Attribute",
"Uses '" + S3_REGION_ATTRIBUTE + "' FlowFile attribute as region.");
public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractAWSProcessor.REGION)
.allowableValues(getAvailableS3Regions())
.build();
public static final PropertyDescriptor ENCRYPTION_SERVICE = new PropertyDescriptor.Builder()
.name("encryption-service")
.displayName("Encryption Service")
@ -198,17 +220,82 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
return s3;
}
/**
* Create client using AWSCredentials
*
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
*/
@Deprecated
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
getLogger().info("Creating client with AWS credentials");
return createClient(context, new AWSStaticCredentialsProvider(credentials), config);
}
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
final List<ConfigVerificationResult> results = new ArrayList<>();
try {
createClient(context, attributes);
results.add(new ConfigVerificationResult.Builder()
.outcome(Outcome.SUCCESSFUL)
.verificationStepName("Create S3 Client")
.explanation("Successfully created S3 Client")
.build());
} catch (final Exception e) {
verificationLogger.error("Failed to create S3 Client", e);
results.add(new ConfigVerificationResult.Builder()
.outcome(Outcome.FAILED)
.verificationStepName("Create S3 Client")
.explanation("Failed to crete S3 Client: " + e.getMessage())
.build());
}
return results;
}
/**
* Creates and configures the client from the context and FlowFile attributes or returns an existing client from cache
* @param context the process context
* @param attributes FlowFile attributes
* @return The created S3 client
*/
protected AmazonS3Client getS3Client(final ProcessContext context, final Map<String, String> attributes) {
final AwsClientDetails clientDetails = getAwsClientDetails(context, attributes);
return getClient(context, clientDetails);
}
/**
* Creates the client from the context and FlowFile attributes
* @param context the process context
* @param attributes FlowFile attributes
* @return The newly created S3 client
*/
protected AmazonS3Client createClient(final ProcessContext context, final Map<String, String> attributes) {
final AwsClientDetails clientDetails = getAwsClientDetails(context, attributes);
return createClient(context, clientDetails);
}
@Override
@OnScheduled
public void onScheduled(final ProcessContext context) {
if (!isAttributeDefinedRegion(context)) {
getClient(context);
}
}
private void configureClientOptions(final ProcessContext context, final AmazonS3Client s3) {
S3ClientOptions.Builder builder = S3ClientOptions.builder();
final S3ClientOptions.Builder builder = S3ClientOptions.builder();
// disable chunked encoding if "Use Chunked Encoding" has been set to false, otherwise use the default (not disabled)
Boolean useChunkedEncoding = context.getProperty(USE_CHUNKED_ENCODING).asBoolean();
final Boolean useChunkedEncoding = context.getProperty(USE_CHUNKED_ENCODING).asBoolean();
if (useChunkedEncoding != null && !useChunkedEncoding) {
builder.disableChunkedEncoding();
}
// use PathStyleAccess if "Use Path Style Access" has been set to true, otherwise use the default (false)
Boolean usePathStyleAccess = context.getProperty(USE_PATH_STYLE_ACCESS).asBoolean();
final Boolean usePathStyleAccess = context.getProperty(USE_PATH_STYLE_ACCESS).asBoolean();
if (usePathStyleAccess != null && usePathStyleAccess) {
builder.setPathStyleAccess(true);
}
@ -241,17 +328,6 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
return signerType == CUSTOM_SIGNER;
}
/**
* Create client using AWSCredentials
*
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
*/
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
getLogger().info("Creating client with AWS credentials");
return createClient(context, new AWSStaticCredentialsProvider(credentials), config);
}
protected Grantee createGrantee(final String value) {
if (StringUtils.isEmpty(value)) {
return null;
@ -281,17 +357,6 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
return grantees;
}
protected String getUrlForObject(final String bucket, final String key) {
Region region = getRegion();
if (region == null) {
return DEFAULT_PROTOCOL.toString() + "://s3.amazonaws.com/" + bucket + "/" + key;
} else {
final String endpoint = region.getServiceEndpoint("s3");
return DEFAULT_PROTOCOL.toString() + "://" + endpoint + "/" + bucket + "/" + key;
}
}
/**
* Create AccessControlList if appropriate properties are configured.
*
@ -386,4 +451,41 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
return cannedAcl;
}
private Region parseRegionValue(String regionValue) {
if (regionValue == null) {
throw new ProcessException(format("[%s] was selected as region source but [%s] attribute does not exist", ATTRIBUTE_DEFINED_REGION, S3_REGION_ATTRIBUTE));
}
try {
return Region.getRegion(Regions.fromName(regionValue));
} catch (Exception e) {
throw new ProcessException(format("The [%s] attribute contains an invalid region value [%s]", S3_REGION_ATTRIBUTE, regionValue), e);
}
}
private Region resolveRegion(final ProcessContext context, final Map<String, String> attributes) {
String regionValue = context.getProperty(S3_REGION).getValue();
if (ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue)) {
regionValue = attributes.get(S3_REGION_ATTRIBUTE);
}
return parseRegionValue(regionValue);
}
private boolean isAttributeDefinedRegion(final ProcessContext context) {
String regionValue = context.getProperty(S3_REGION).getValue();
return ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue);
}
private static AllowableValue[] getAvailableS3Regions() {
final AllowableValue[] availableRegions = getAvailableRegions();
return ArrayUtils.addAll(availableRegions, ATTRIBUTE_DEFINED_REGION);
}
private AwsClientDetails getAwsClientDetails(final ProcessContext context, final Map<String, String> attributes) {
final Region region = resolveRegion(context, attributes);
return new AwsClientDetails(region);
}
}

View File

@ -119,4 +119,9 @@ The following binary components are provided under the Apache Software License v
Validate.java
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/../wag/
RequestMatcher.java
GetAWSGatewayApiTest.java
GetAWSGatewayApiTest.java
(ASLv2) Caffeine (com.github.ben-manes.caffeine:caffeine:jar:2.9.2 - https://github.com/ben-manes/caffeine)
The following NOTICE information applies:
Caffeine (caching library)
Copyright Ben Manes

View File

@ -265,7 +265,6 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
/**
* Create client using aws credentials provider. This is the preferred way for creating clients
*/
@Override
protected AmazonCloudWatchClient createClient(ProcessContext processContext, AWSCredentialsProvider awsCredentialsProvider, ClientConfiguration clientConfiguration) {
getLogger().info("Creating client using aws credentials provider");
@ -278,6 +277,7 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
*/
@Override
@Deprecated
protected AmazonCloudWatchClient createClient(ProcessContext processContext, AWSCredentials awsCredentials, ClientConfiguration clientConfiguration) {
getLogger().debug("Creating client with aws credentials");
return new AmazonCloudWatchClient(awsCredentials, clientConfiguration);
@ -332,7 +332,7 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
.withNamespace(context.getProperty(NAMESPACE).evaluateAttributeExpressions(flowFile).getValue())
.withMetricData(datum);
putMetricData(metricDataRequest);
putMetricData(context, metricDataRequest);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully published cloudwatch metric for {}", new Object[]{flowFile});
} catch (final Exception e) {
@ -343,8 +343,8 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
}
protected PutMetricDataResult putMetricData(PutMetricDataRequest metricDataRequest) throws AmazonClientException {
final AmazonCloudWatchClient client = getClient();
protected PutMetricDataResult putMetricData(ProcessContext context, PutMetricDataRequest metricDataRequest) throws AmazonClientException {
final AmazonCloudWatchClient client = getClient(context);
final PutMetricDataResult result = client.putMetricData(metricDataRequest);
return result;
}

View File

@ -125,7 +125,7 @@ public class DeleteDynamoDB extends AbstractWriteDynamoDBProcessor {
return;
}
final DynamoDB dynamoDB = getDynamoDB();
final DynamoDB dynamoDB = getDynamoDB(context);
try {
BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(tableWriteItems);

View File

@ -137,7 +137,7 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
.build());
} else {
try {
final DynamoDB dynamoDB = getDynamoDB(getConfiguration(context).getClient());
final DynamoDB dynamoDB = getDynamoDB(getClient(context));
int totalCount = 0;
int jsonDocumentCount = 0;
@ -199,7 +199,7 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
return;
}
final DynamoDB dynamoDB = getDynamoDB();
final DynamoDB dynamoDB = getDynamoDB(context);
try {
BatchGetItemOutcome result = dynamoDB.batchGetItem(tableKeysAndAttributes);

View File

@ -152,7 +152,7 @@ public class PutDynamoDB extends AbstractWriteDynamoDBProcessor {
return;
}
final DynamoDB dynamoDB = getDynamoDB();
final DynamoDB dynamoDB = getDynamoDB(context);
try {
BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(tableWriteItems);

View File

@ -200,7 +200,7 @@ public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
final int alreadyProcessedChunks = flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE) != null ? Integer.parseInt(flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE)) : 0;
final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final SplitRecordSetHandler handler = new DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getDynamoDB(), context, flowFile.getAttributes(), getLogger());
final SplitRecordSetHandler handler = new DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getDynamoDB(context), context, flowFile.getAttributes(), getLogger());
final SplitRecordSetHandler.RecordHandlerResult result;
try (

View File

@ -94,7 +94,7 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
HashMap<String, List<FlowFile>> hashFlowFiles = new HashMap<>();
HashMap<String, List<Record>> recordHash = new HashMap<String, List<Record>>();
final AmazonKinesisFirehoseClient client = getClient();
final AmazonKinesisFirehoseClient client = getClient(context);
try {
List<FlowFile> failedFlowFiles = new ArrayList<>();

View File

@ -548,14 +548,14 @@ public class ConsumeKinesisStream extends AbstractKinesisStreamProcessor {
return () -> {
if (isRecordReaderSet && isRecordWriterSet) {
return new KinesisRecordProcessorRecord(
sessionFactory, getLogger(), getStreamName(context), getClient().getEndpointPrefix(),
sessionFactory, getLogger(), getStreamName(context), getClient(context).getEndpointPrefix(),
getKinesisEndpoint(context).orElse(null), getCheckpointIntervalMillis(context),
getRetryWaitMillis(context), getNumRetries(context), getDateTimeFormatter(context),
getReaderFactory(context), getWriterFactory(context)
);
} else {
return new KinesisRecordProcessorRaw(
sessionFactory, getLogger(), getStreamName(context), getClient().getEndpointPrefix(),
sessionFactory, getLogger(), getStreamName(context), getClient(context).getEndpointPrefix(),
getKinesisEndpoint(context).orElse(null), getCheckpointIntervalMillis(context),
getRetryWaitMillis(context), getNumRetries(context), getDateTimeFormatter(context)
);
@ -575,8 +575,8 @@ public class ConsumeKinesisStream extends AbstractKinesisStreamProcessor {
getCredentialsProvider(context),
workerId
)
.withCommonClientConfig(getClient().getClientConfiguration())
.withRegionName(getRegion().getName())
.withCommonClientConfig(getClient(context).getClientConfiguration())
.withRegionName(getRegion(context).getName())
.withFailoverTimeMillis(getFailoverTimeMillis(context))
.withShutdownGraceMillis(getGracefulShutdownMillis(context));
@ -613,7 +613,7 @@ public class ConsumeKinesisStream extends AbstractKinesisStreamProcessor {
final IRecordProcessorFactory factory) {
final Worker.Builder workerBuilder = new Worker.Builder()
.config(kinesisClientLibConfiguration)
.kinesisClient(getClient())
.kinesisClient(getClient(context))
.workerStateChangeListener(workerState::set)
.recordProcessorFactory(factory);

View File

@ -131,7 +131,7 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor {
HashMap<String, List<FlowFile>> hashFlowFiles = new HashMap<>();
HashMap<String, List<PutRecordsRequestEntry>> recordHash = new HashMap<String, List<PutRecordsRequestEntry>>();
final AmazonKinesisClient client = getClient();
final AmazonKinesisClient client = getClient(context);
try {

View File

@ -155,7 +155,7 @@ public class PutLambda extends AbstractAWSLambdaProcessor {
return;
}
final AWSLambdaClient client = getClient();
final AWSLambdaClient client = getClient(context);
try {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();

View File

@ -110,6 +110,6 @@ public class GetAwsPollyJobStatus extends AwsMachineLearningJobStatusProcessor<A
private GetSpeechSynthesisTaskResult getSynthesisTask(ProcessContext context, FlowFile flowFile) {
String taskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue();
GetSpeechSynthesisTaskRequest request = new GetSpeechSynthesisTaskRequest().withTaskId(taskId);
return getClient().getSpeechSynthesisTask(request);
return getClient(context).getSpeechSynthesisTask(request);
}
}

View File

@ -44,7 +44,7 @@ public class StartAwsPollyJob extends AwsMachineLearningJobStarter<AmazonPollyCl
@Override
protected StartSpeechSynthesisTaskResult sendRequest(StartSpeechSynthesisTaskRequest request, ProcessContext context, FlowFile flowFile) {
return getClient().startSpeechSynthesisTask(request);
return getClient(context).startSpeechSynthesisTask(request);
}
@Override

View File

@ -83,9 +83,9 @@ public class GetAwsTextractJobStatus extends AwsMachineLearningJobStatusProcesso
String awsTaskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue();
try {
JobStatus jobStatus = getTaskStatus(TextractType.fromString(textractType), getClient(), awsTaskId);
JobStatus jobStatus = getTaskStatus(TextractType.fromString(textractType), getClient(context), awsTaskId);
if (JobStatus.SUCCEEDED == jobStatus) {
Object task = getTask(TextractType.fromString(textractType), getClient(), awsTaskId);
Object task = getTask(TextractType.fromString(textractType), getClient(context), awsTaskId);
writeToFlowFile(session, flowFile, task);
session.transfer(flowFile, REL_SUCCESS);
} else if (JobStatus.IN_PROGRESS == jobStatus) {

View File

@ -100,13 +100,13 @@ public class StartAwsTextractJob extends AwsMachineLearningJobStarter<AmazonText
AmazonWebServiceResult result;
switch (textractType) {
case DOCUMENT_ANALYSIS :
result = getClient().startDocumentAnalysis((StartDocumentAnalysisRequest) request);
result = getClient(context).startDocumentAnalysis((StartDocumentAnalysisRequest) request);
break;
case DOCUMENT_TEXT_DETECTION:
result = getClient().startDocumentTextDetection((StartDocumentTextDetectionRequest) request);
result = getClient(context).startDocumentTextDetection((StartDocumentTextDetectionRequest) request);
break;
case EXPENSE_ANALYSIS:
result = getClient().startExpenseAnalysis((StartExpenseAnalysisRequest) request);
result = getClient(context).startExpenseAnalysis((StartExpenseAnalysisRequest) request);
break;
default: throw new UnsupportedOperationException("Unsupported textract type: " + textractType);
}

View File

@ -86,6 +86,6 @@ public class GetAwsTranscribeJobStatus extends AwsMachineLearningJobStatusProces
private GetTranscriptionJobResult getJob(ProcessContext context, FlowFile flowFile) {
String taskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue();
GetTranscriptionJobRequest request = new GetTranscriptionJobRequest().withTranscriptionJobName(taskId);
return getClient().getTranscriptionJob(request);
return getClient(context).getTranscriptionJob(request);
}
}

View File

@ -49,7 +49,7 @@ public class StartAwsTranscribeJob extends AwsMachineLearningJobStarter<AmazonTr
@Override
protected StartTranscriptionJobResult sendRequest(StartTranscriptionJobRequest request, ProcessContext context, FlowFile flowFile) {
return getClient().startTranscriptionJob(request);
return getClient(context).startTranscriptionJob(request);
}
@Override

View File

@ -58,7 +58,7 @@ public class GetAwsTranslateJobStatus extends AwsMachineLearningJobStatusProcess
}
String awsTaskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue();
try {
DescribeTextTranslationJobResult describeTextTranslationJobResult = getStatusString(awsTaskId);
DescribeTextTranslationJobResult describeTextTranslationJobResult = getStatusString(context, awsTaskId);
JobStatus status = JobStatus.fromValue(describeTextTranslationJobResult.getTextTranslationJobProperties().getJobStatus());
if (status == JobStatus.IN_PROGRESS || status == JobStatus.SUBMITTED) {
@ -84,9 +84,9 @@ public class GetAwsTranslateJobStatus extends AwsMachineLearningJobStatusProcess
}
}
private DescribeTextTranslationJobResult getStatusString(String awsTaskId) {
private DescribeTextTranslationJobResult getStatusString(ProcessContext context, String awsTaskId) {
DescribeTextTranslationJobRequest request = new DescribeTextTranslationJobRequest().withJobId(awsTaskId);
DescribeTextTranslationJobResult translationJobsResult = getClient().describeTextTranslationJob(request);
DescribeTextTranslationJobResult translationJobsResult = getClient(context).describeTextTranslationJob(request);
return translationJobsResult;
}
}

View File

@ -43,7 +43,7 @@ public class StartAwsTranslateJob extends AwsMachineLearningJobStarter<AmazonTra
@Override
protected StartTextTranslationJobResult sendRequest(StartTextTranslationJobRequest request, ProcessContext context, FlowFile flowFile) {
return getClient().startTextTranslationJob(request);
return getClient(context).startTextTranslationJob(request);
}
@Override

View File

@ -71,7 +71,7 @@ public class DeleteS3Object extends AbstractS3Processor {
SECRET_KEY,
CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE,
REGION,
S3_REGION,
TIMEOUT,
VERSION_ID,
FULL_CONTROL_USER_LIST,
@ -96,6 +96,7 @@ public class DeleteS3Object extends AbstractS3Processor {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
@ -103,14 +104,22 @@ public class DeleteS3Object extends AbstractS3Processor {
return;
}
final AmazonS3Client s3;
try {
s3 = getS3Client(context, flowFile.getAttributes());
} catch (Exception e) {
getLogger().error("Failed to initialize S3 client", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final long startNanos = System.nanoTime();
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
final AmazonS3Client s3 = getClient();
// Deletes a key on Amazon S3
try {
if (versionId == null) {

View File

@ -130,7 +130,7 @@ public class FetchS3Object extends AbstractS3Processor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
BUCKET,
KEY,
REGION,
S3_REGION,
ACCESS_KEY,
SECRET_KEY,
CREDENTIALS_FILE,
@ -185,7 +185,7 @@ public class FetchS3Object extends AbstractS3Processor {
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
final AmazonS3Client client = getConfiguration(context).getClient();
final AmazonS3Client client = createClient(context, attributes);
final GetObjectMetadataRequest request = createGetObjectMetadataRequest(context, attributes);
try {
@ -215,6 +215,16 @@ public class FetchS3Object extends AbstractS3Processor {
return;
}
final AmazonS3Client client;
try {
client = getS3Client(context, flowFile.getAttributes());
} catch (Exception e) {
getLogger().error("Failed to initialize S3 client", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final long startNanos = System.nanoTime();
final Map<String, String> attributes = new HashMap<>();
@ -226,7 +236,6 @@ public class FetchS3Object extends AbstractS3Processor {
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final AmazonS3Client client = getClient();
final GetObjectRequest request = createGetObjectRequest(context, flowFile.getAttributes());
try (final S3Object s3Object = client.getObject(request)) {

View File

@ -472,7 +472,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
return;
}
final AmazonS3 client = getClient();
final AmazonS3 client = getClient(context);
S3BucketLister bucketLister = getS3BucketLister(context, client);
@ -583,7 +583,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
S3BucketLister bucketLister = getS3BucketLister(context, getClient());
S3BucketLister bucketLister = getS3BucketLister(context, getClient(context));
List<ListableEntityWrapper<S3VersionSummary>> listedEntities = bucketLister.listVersions().getVersionSummaries()
.stream()
@ -644,8 +644,9 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
for (ListableEntityWrapper<S3VersionSummary> updatedEntity : updatedEntities) {
S3VersionSummary s3VersionSummary = updatedEntity.getRawEntity();
GetObjectTaggingResult taggingResult = getTaggingResult(context, getClient(), s3VersionSummary);
ObjectMetadata objectMetadata = getObjectMetadata(context, getClient(), s3VersionSummary);
AmazonS3Client s3Client = getClient(context);
GetObjectTaggingResult taggingResult = getTaggingResult(context, s3Client, s3VersionSummary);
ObjectMetadata objectMetadata = getObjectMetadata(context, s3Client, s3VersionSummary);
writer.addToListing(s3VersionSummary, taggingResult, objectMetadata);
@ -1129,7 +1130,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog logger, final Map<String, String> attributes) {
final AmazonS3Client client = getConfiguration(context).getClient();
final AmazonS3Client client = createClient(context);
final List<ConfigVerificationResult> results = new ArrayList<>(super.verify(context, logger, attributes));
final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();

View File

@ -289,7 +289,7 @@ public class PutS3Object extends AbstractS3Processor {
OBJECT_TAGS_PREFIX,
REMOVE_TAG_PREFIX,
STORAGE_CLASS,
REGION,
S3_REGION,
TIMEOUT,
EXPIRATION_RULE_ID,
FULL_CONTROL_USER_LIST,
@ -503,18 +503,29 @@ public class PutS3Object extends AbstractS3Processor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final AmazonS3Client s3;
try {
s3 = getS3Client(context, flowFile.getAttributes());
} catch (Exception e) {
getLogger().error("Failed to initialize S3 client", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final long startNanos = System.nanoTime();
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final String cacheKey = getIdentifier() + "/" + bucket + "/" + key;
final AmazonS3Client s3 = getClient();
final FlowFile ff = flowFile;
final Map<String, String> attributes = new HashMap<>();
final String ffFilename = ff.getAttributes().get(CoreAttributes.FILENAME.key());
@ -969,7 +980,7 @@ public class PutS3Object extends AbstractS3Processor {
final List<Tag> objectTags = new ArrayList<>();
final Map<String, String> attributesMap = flowFile.getAttributes();
attributesMap.entrySet().stream().sequential()
attributesMap.entrySet().stream()
.filter(attribute -> attribute.getKey().startsWith(prefix))
.forEach(attribute -> {
String tagKey = attribute.getKey();

View File

@ -117,7 +117,7 @@ public class TagS3Object extends AbstractS3Processor {
SECRET_KEY,
CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE,
REGION,
S3_REGION,
TIMEOUT,
SSL_CONTEXT_SERVICE,
ENDPOINT_OVERRIDE,
@ -135,6 +135,7 @@ public class TagS3Object extends AbstractS3Processor {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
@ -142,6 +143,16 @@ public class TagS3Object extends AbstractS3Processor {
return;
}
final AmazonS3Client s3;
try {
s3 = getS3Client(context, flowFile.getAttributes());
} catch (Exception e) {
getLogger().error("Failed to initialize S3 client", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final long startNanos = System.nanoTime();
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
@ -171,7 +182,6 @@ public class TagS3Object extends AbstractS3Processor {
final String version = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
final AmazonS3Client s3 = getClient();
SetObjectTaggingRequest r;
List<Tag> tags = new ArrayList<>();

View File

@ -137,7 +137,7 @@ public class PutSNS extends AbstractSNSProcessor {
session.exportTo(flowFile, baos);
final String message = new String(baos.toByteArray(), charset);
final AmazonSNSClient client = getClient();
final AmazonSNSClient client = getClient(context);
final PublishRequest request = new PublishRequest();
request.setMessage(message);

View File

@ -75,7 +75,7 @@ public class DeleteSQS extends AbstractSQSProcessor {
final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
final AmazonSQSClient client = getClient();
final AmazonSQSClient client = getClient(context);
final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest();
request.setQueueUrl(queueUrl);

View File

@ -136,7 +136,7 @@ public class GetSQS extends AbstractSQSProcessor {
final String queueUrl = context.getProperty(DYNAMIC_QUEUE_URL).evaluateAttributeExpressions()
.getValue();
final AmazonSQSClient client = getClient();
final AmazonSQSClient client = getClient(context);
final ReceiveMessageRequest request = new ReceiveMessageRequest();
request.setAttributeNames(Collections.singleton("All"));

View File

@ -127,7 +127,7 @@ public class PutSQS extends AbstractSQSProcessor {
}
final long startNanos = System.nanoTime();
final AmazonSQSClient client = getClient();
final AmazonSQSClient client = getClient(context);
final SendMessageBatchRequest request = new SendMessageBatchRequest();
final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
request.setQueueUrl(queueUrl);

View File

@ -182,7 +182,7 @@ public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor {
final String resourceName = context.getProperty(PROP_RESOURCE_NAME).getValue();
final GenericApiGatewayClient client = getClient();
final GenericApiGatewayClient client = getClient(context);
final long startNanos = System.nanoTime();
final Map<String, String> attributes = requestFlowFile == null ? Collections.emptyMap() : requestFlowFile.getAttributes();
@ -370,7 +370,7 @@ public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor {
final String endpoint = context.getProperty(PROP_AWS_GATEWAY_API_ENDPOINT).getValue();
final String resource = context.getProperty(PROP_RESOURCE_NAME).getValue();
try {
final GenericApiGatewayClient client = getConfiguration(context).getClient();
final GenericApiGatewayClient client = getClient(context);
final GatewayResponse gatewayResponse = invokeGateway(client, context, null, null, attributes, verificationLogger);

View File

@ -40,7 +40,6 @@ import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
/**
* Unit tests for AWS Credential specification based on {@link AbstractAWSProcessor} and
* [@link AbstractAWSCredentialsProviderProcessor}, without interaction with S3.
@ -51,7 +50,6 @@ public class TestAWSCredentials {
private AbstractAWSProcessor mockAwsProcessor = null;
private AWSCredentials awsCredentials = null;
private AWSCredentialsProvider awsCredentialsProvider = null;
private ClientConfiguration clientConfiguration = null;
@BeforeEach
public void setUp() {
@ -70,7 +68,6 @@ public class TestAWSCredentials {
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
awsCredentials = credentials;
clientConfiguration = config;
final AmazonS3Client s3 = new AmazonS3Client(credentials, config);
return s3;
}
@ -78,7 +75,6 @@ public class TestAWSCredentials {
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
awsCredentialsProvider = credentialsProvider;
clientConfiguration = config;
final AmazonS3Client s3 = new AmazonS3Client(credentialsProvider, config);
return s3;
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws;
import static com.amazonaws.regions.Regions.US_EAST_1;
import static com.amazonaws.regions.Regions.US_WEST_2;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.amazonaws.AmazonWebServiceClient;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import org.apache.nifi.processor.ProcessContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class TestAwsClientCache {
@Mock
private ProcessContext contextMock;
@Mock
private AwsClientProvider<AmazonWebServiceClient> awsClientProviderMock;
@Mock
private AmazonWebServiceClient awsClientMock1;
@Mock
private AmazonWebServiceClient awsClientMock2;
private AwsClientCache<AmazonWebServiceClient> clientCache;
@BeforeEach
public void setup() {
clientCache = new AwsClientCache<>();
}
@Test
public void testSameRegionUseExistingClientFromCache() {
final AwsClientDetails clientDetails = getClientDetails(US_WEST_2);
when(awsClientProviderMock.createClient(contextMock, clientDetails)).thenReturn(awsClientMock1);
final AmazonWebServiceClient client1 = clientCache.getOrCreateClient(contextMock, clientDetails, awsClientProviderMock);
final AwsClientDetails newClientDetails = getClientDetails(US_WEST_2);
final AmazonWebServiceClient client2 = clientCache.getOrCreateClient(contextMock, newClientDetails, awsClientProviderMock);
verify(awsClientProviderMock, times(1)).createClient(eq(contextMock), any(AwsClientDetails.class));
assertSame(client1, client2);
}
@Test
public void testRegionChangeNewClientIsCreated() {
final AwsClientDetails clientDetails = getClientDetails(US_WEST_2);
when(awsClientProviderMock.createClient(contextMock, clientDetails)).thenReturn(awsClientMock1);
final AmazonWebServiceClient client1 = clientCache.getOrCreateClient(contextMock, clientDetails, awsClientProviderMock);
final AwsClientDetails newClientDetails = getClientDetails(US_EAST_1);
when(awsClientProviderMock.createClient(contextMock, newClientDetails)).thenReturn(awsClientMock2);
final AmazonWebServiceClient client2 = clientCache.getOrCreateClient(contextMock, newClientDetails, awsClientProviderMock);
verify(awsClientProviderMock, times(2)).createClient(eq(contextMock), any(AwsClientDetails.class));
assertNotEquals(client1, client2);
}
@Test
public void testSameRegionClientCacheIsClearedNewClientIsCreated() {
final AwsClientDetails clientDetails = getClientDetails(US_WEST_2);
when(awsClientProviderMock.createClient(contextMock, clientDetails)).thenReturn(awsClientMock1);
final AmazonWebServiceClient client1 = clientCache.getOrCreateClient(contextMock, clientDetails, awsClientProviderMock);
clientCache.clearCache();
final AwsClientDetails newClientDetails = getClientDetails(US_WEST_2);
when(awsClientProviderMock.createClient(contextMock, clientDetails)).thenReturn(awsClientMock2);
final AmazonWebServiceClient client2 = clientCache.getOrCreateClient(contextMock, newClientDetails, awsClientProviderMock);
verify(awsClientProviderMock, times(2)).createClient(eq(contextMock), any(AwsClientDetails.class));
assertNotEquals(client1, client2);
}
private static AwsClientDetails getClientDetails(Regions region) {
return new AwsClientDetails(Region.getRegion(region));
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import org.junit.jupiter.api.Test;
public class TestAwsClientDetails {
@Test
public void clientDetailsEqual() {
final AwsClientDetails details1 = getDefaultClientDetails(Regions.US_WEST_2);
final AwsClientDetails details2 = getDefaultClientDetails(Regions.US_WEST_2);
assertEquals(details1, details2);
assertEquals(details1.hashCode(), details2.hashCode());
}
@Test
public void clientDetailsDifferInRegion() {
final AwsClientDetails details1 = getDefaultClientDetails(Regions.US_WEST_2);
final AwsClientDetails details2 = getDefaultClientDetails(Regions.US_EAST_1);
assertNotEquals(details1, details2);
assertNotEquals(details1.hashCode(), details2.hashCode());
}
private AwsClientDetails getDefaultClientDetails(Regions region) {
return new AwsClientDetails(Region.getRegion(region));
}
}

View File

@ -22,6 +22,7 @@ import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
import com.amazonaws.services.cloudwatch.model.PutMetricDataResult;
import java.util.List;
import org.apache.nifi.processor.ProcessContext;
/**
@ -35,7 +36,8 @@ public class MockPutCloudWatchMetric extends PutCloudWatchMetric {
protected PutMetricDataResult result = new PutMetricDataResult();
protected int putMetricDataCallCount = 0;
protected PutMetricDataResult putMetricData(PutMetricDataRequest metricDataRequest) throws AmazonClientException {
protected PutMetricDataResult putMetricData(ProcessContext context, PutMetricDataRequest metricDataRequest) throws AmazonClientException {
putMetricDataCallCount++;
actualNamespace = metricDataRequest.getNamespace();
actualMetricData = metricDataRequest.getMetricData();

View File

@ -95,8 +95,7 @@ public class MockAWSProcessor extends AbstractAWSCredentialsProviderProcessor<Am
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
getLogger().info("Creating client with credentials provider");
final AmazonS3Client s3 = new AmazonS3Client(credentialsProvider, config);
return s3;
return new AmazonS3Client(credentialsProvider, config);
}
/**
@ -107,10 +106,7 @@ public class MockAWSProcessor extends AbstractAWSCredentialsProviderProcessor<Am
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
getLogger().info("Creating client with awd credentials");
final AmazonS3Client s3 = new AmazonS3Client(credentials, config);
return s3;
return new AmazonS3Client(credentials, config);
}
}

View File

@ -26,6 +26,7 @@ import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
import com.amazonaws.services.dynamodbv2.model.DeleteRequest;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -63,7 +64,7 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};
@ -79,7 +80,7 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDb;
}
};
@ -277,7 +278,7 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};
@ -316,7 +317,7 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};
@ -354,7 +355,7 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};

View File

@ -29,7 +29,6 @@ import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -525,7 +524,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
private GetDynamoDB mockDynamoDB(final DynamoDB mockDynamoDB) {
return new GetDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
@Override
@ -534,9 +533,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
}
@Override
protected AbstractAWSProcessor<AmazonDynamoDBClient>.AWSConfiguration getConfiguration(final ProcessContext context) {
final AmazonDynamoDBClient client = Mockito.mock(AmazonDynamoDBClient.class);
return new AWSConfiguration(client, region);
protected AmazonDynamoDBClient getClient(final ProcessContext context) {
return Mockito.mock(AmazonDynamoDBClient.class);
}
};
}

View File

@ -24,6 +24,7 @@ import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
@ -81,7 +82,7 @@ public class PutDynamoDBRecordTest {
testSubject = new PutDynamoDBRecord() {
@Override
protected DynamoDB getDynamoDB() {
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};

View File

@ -26,6 +26,7 @@ import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
import com.amazonaws.services.dynamodbv2.model.PutRequest;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -63,7 +64,7 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};
@ -79,7 +80,7 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDb;
}
};
@ -333,7 +334,7 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};
@ -373,7 +374,7 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};
@ -412,7 +413,7 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};

View File

@ -23,6 +23,7 @@ import com.amazonaws.services.lambda.model.InvokeRequest;
import com.amazonaws.services.lambda.model.InvokeResult;
import com.amazonaws.services.lambda.model.TooManyRequestsException;
import com.amazonaws.util.Base64;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -42,15 +43,15 @@ public class TestPutLambda {
private TestRunner runner = null;
private PutLambda mockPutLambda = null;
private AWSLambdaClient actualLambdaClient = null;
private AWSLambdaClient mockLambdaClient = null;
@BeforeEach
public void setUp() {
mockLambdaClient = Mockito.mock(AWSLambdaClient.class);
mockPutLambda = new PutLambda() {
protected AWSLambdaClient getClient() {
actualLambdaClient = client;
@Override
protected AWSLambdaClient getClient(ProcessContext context) {
return mockLambdaClient;
}
};
@ -58,7 +59,7 @@ public class TestPutLambda {
}
@Test
public void testSizeGreaterThan6MB() throws Exception {
public void testSizeGreaterThan6MB() {
runner = TestRunners.newTestRunner(PutLambda.class);
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "hello");
runner.assertValid();

View File

@ -28,7 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.polly.AmazonPollyClient;
import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest;
import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult;
@ -62,21 +62,18 @@ public class GetAwsPollyStatusTest {
@BeforeEach
public void setUp() throws InitializationException {
when(mockAwsCredentialsProvider.getIdentifier()).thenReturn("awsCredetialProvider");
final GetAwsPollyJobStatus mockGetAwsPollyStatus = new GetAwsPollyJobStatus() {
protected AmazonPollyClient getClient() {
return mockPollyClient;
}
when(mockAwsCredentialsProvider.getIdentifier()).thenReturn("awsCredentialProvider");
final GetAwsPollyJobStatus mockGetAwsPollyStatus = new GetAwsPollyJobStatus() {
@Override
protected AmazonPollyClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
protected AmazonPollyClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
return mockPollyClient;
}
};
runner = TestRunners.newTestRunner(mockGetAwsPollyStatus);
runner.addControllerService("awsCredetialProvider", mockAwsCredentialsProvider);
runner.addControllerService("awsCredentialProvider", mockAwsCredentialsProvider);
runner.enableControllerService(mockAwsCredentialsProvider);
runner.setProperty(AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredetialProvider");
runner.setProperty(AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialProvider");
}
@Test
@ -90,7 +87,7 @@ public class GetAwsPollyStatusTest {
runner.run();
runner.assertAllFlowFilesTransferred(REL_RUNNING);
assertEquals(requestCaptor.getValue().getTaskId(), TEST_TASK_ID);
assertEquals(TEST_TASK_ID, requestCaptor.getValue().getTaskId());
}
@Test
@ -107,7 +104,7 @@ public class GetAwsPollyStatusTest {
runner.assertTransferCount(REL_SUCCESS, 1);
runner.assertTransferCount(REL_ORIGINAL, 1);
runner.assertAllFlowFilesContainAttribute(REL_SUCCESS, AWS_TASK_OUTPUT_LOCATION);
assertEquals(requestCaptor.getValue().getTaskId(), TEST_TASK_ID);
assertEquals(TEST_TASK_ID, requestCaptor.getValue().getTaskId());
}
@ -123,6 +120,6 @@ public class GetAwsPollyStatusTest {
runner.run();
runner.assertAllFlowFilesTransferred(REL_FAILURE);
assertEquals(requestCaptor.getValue().getTaskId(), TEST_TASK_ID);
assertEquals(TEST_TASK_ID, requestCaptor.getValue().getTaskId());
}
}

View File

@ -18,16 +18,16 @@
package org.apache.nifi.processors.aws.ml.textract;
import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_FAILURE;
import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_SUCCESS;
import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_FAILURE;
import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_RUNNING;
import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_SUCCESS;
import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID;
import static org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob.TEXTRACT_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.textract.AmazonTextractClient;
import com.amazonaws.services.textract.model.GetDocumentAnalysisRequest;
import com.amazonaws.services.textract.model.GetDocumentAnalysisResult;
@ -47,7 +47,7 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class GetAwsTranslateJobStatusTest {
public class GetAwsTextractJobStatusTest {
private static final String TEST_TASK_ID = "testTaskId";
private TestRunner runner;
@Mock
@ -59,21 +59,17 @@ public class GetAwsTranslateJobStatusTest {
@BeforeEach
public void setUp() throws InitializationException {
when(mockAwsCredentialsProvider.getIdentifier()).thenReturn("awsCredetialProvider");
when(mockAwsCredentialsProvider.getIdentifier()).thenReturn("awsCredentialProvider");
final GetAwsTextractJobStatus awsTextractJobStatusGetter = new GetAwsTextractJobStatus() {
protected AmazonTextractClient getClient() {
return mockTextractClient;
}
@Override
protected AmazonTextractClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
protected AmazonTextractClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
return mockTextractClient;
}
};
runner = TestRunners.newTestRunner(awsTextractJobStatusGetter);
runner.addControllerService("awsCredetialProvider", mockAwsCredentialsProvider);
runner.addControllerService("awsCredentialProvider", mockAwsCredentialsProvider);
runner.enableControllerService(mockAwsCredentialsProvider);
runner.setProperty(AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredetialProvider");
runner.setProperty(AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialProvider");
}
@Test
@ -99,7 +95,7 @@ public class GetAwsTranslateJobStatusTest {
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID);
assertEquals(TEST_TASK_ID, requestCaptor.getValue().getJobId());
}
@Test
@ -112,6 +108,6 @@ public class GetAwsTranslateJobStatusTest {
runner.run();
runner.assertAllFlowFilesTransferred(REL_FAILURE);
assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID);
assertEquals(TEST_TASK_ID, requestCaptor.getValue().getJobId());
}
}

View File

@ -18,16 +18,16 @@
package org.apache.nifi.processors.aws.ml.transcribe;
import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_FAILURE;
import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_SUCCESS;
import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.FAILURE_REASON_ATTRIBUTE;
import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_RUNNING;
import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID;
import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_SUCCESS;
import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_FAILURE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.transcribe.AmazonTranscribeClient;
import com.amazonaws.services.transcribe.model.GetTranscriptionJobRequest;
import com.amazonaws.services.transcribe.model.GetTranscriptionJobResult;
@ -67,12 +67,8 @@ public class GetAwsTranscribeJobStatusTest {
public void setUp() throws InitializationException {
when(mockAwsCredentialsProvider.getIdentifier()).thenReturn(AWS_CREDENTIAL_PROVIDER_NAME);
final GetAwsTranscribeJobStatus mockPollyFetcher = new GetAwsTranscribeJobStatus() {
protected AmazonTranscribeClient getClient() {
return mockTranscribeClient;
}
@Override
protected AmazonTranscribeClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
protected AmazonTranscribeClient createClient(ProcessContext context, AWSCredentialsProvider credentials, ClientConfiguration config) {
return mockTranscribeClient;
}
};
@ -93,7 +89,7 @@ public class GetAwsTranscribeJobStatusTest {
runner.run();
runner.assertAllFlowFilesTransferred(REL_RUNNING);
assertEquals(requestCaptor.getValue().getTranscriptionJobName(), TEST_TASK_ID);
assertEquals(TEST_TASK_ID, requestCaptor.getValue().getTranscriptionJobName());
}
@Test
@ -108,7 +104,7 @@ public class GetAwsTranscribeJobStatusTest {
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertEquals(requestCaptor.getValue().getTranscriptionJobName(), TEST_TASK_ID);
assertEquals(TEST_TASK_ID, requestCaptor.getValue().getTranscriptionJobName());
}
@ -125,7 +121,7 @@ public class GetAwsTranscribeJobStatusTest {
runner.assertAllFlowFilesTransferred(REL_FAILURE);
runner.assertAllFlowFilesContainAttribute(FAILURE_REASON_ATTRIBUTE);
assertEquals(requestCaptor.getValue().getTranscriptionJobName(), TEST_TASK_ID);
assertEquals(TEST_TASK_ID, requestCaptor.getValue().getTranscriptionJobName());
}
}

View File

@ -18,8 +18,8 @@
package org.apache.nifi.processors.aws.ml.translate;
import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_FAILURE;
import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_SUCCESS;
import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_FAILURE;
import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_SUCCESS;
import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.AWS_TASK_OUTPUT_LOCATION;
import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_RUNNING;
import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID;
@ -27,7 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.translate.AmazonTranslateClient;
import com.amazonaws.services.translate.model.DescribeTextTranslationJobRequest;
import com.amazonaws.services.translate.model.DescribeTextTranslationJobResult;
@ -67,12 +67,8 @@ public class GetAwsTranslateJobStatusTest {
public void setUp() throws InitializationException {
when(mockAwsCredentialsProvider.getIdentifier()).thenReturn(AWS_CREDENTIALS_PROVIDER_NAME);
final GetAwsTranslateJobStatus mockPollyFetcher = new GetAwsTranslateJobStatus() {
protected AmazonTranslateClient getClient() {
return mockTranslateClient;
}
@Override
protected AmazonTranslateClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
protected AmazonTranslateClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
return mockTranslateClient;
}
};
@ -93,7 +89,7 @@ public class GetAwsTranslateJobStatusTest {
runner.run();
runner.assertAllFlowFilesTransferred(REL_RUNNING);
assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID);
assertEquals(TEST_TASK_ID, requestCaptor.getValue().getJobId());
}
@Test
@ -109,7 +105,7 @@ public class GetAwsTranslateJobStatusTest {
runner.assertAllFlowFilesTransferred(REL_SUCCESS);
runner.assertAllFlowFilesContainAttribute(AWS_TASK_OUTPUT_LOCATION);
assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID);
assertEquals(TEST_TASK_ID, requestCaptor.getValue().getJobId());
}
@Test
@ -123,7 +119,7 @@ public class GetAwsTranslateJobStatusTest {
runner.run();
runner.assertAllFlowFilesTransferred(REL_FAILURE);
assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID);
assertEquals(TEST_TASK_ID, requestCaptor.getValue().getJobId());
}
}

View File

@ -40,7 +40,7 @@ public class ITDeleteS3Object extends AbstractS3IT {
final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object());
runner.setProperty(DeleteS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, REGION);
runner.setProperty(DeleteS3Object.S3_REGION, REGION);
runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME);
final Map<String, String> attrs = new HashMap<>();
@ -60,7 +60,7 @@ public class ITDeleteS3Object extends AbstractS3IT {
final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object());
runner.setProperty(DeleteS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, REGION);
runner.setProperty(DeleteS3Object.S3_REGION, REGION);
runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME);
final Map<String, String> attrs = new HashMap<>();
@ -88,7 +88,7 @@ public class ITDeleteS3Object extends AbstractS3IT {
runner.assertValid(serviceImpl);
runner.setProperty(DeleteS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider");
runner.setProperty(DeleteS3Object.REGION, REGION);
runner.setProperty(DeleteS3Object.S3_REGION, REGION);
runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME);
final Map<String, String> attrs = new HashMap<>();
@ -108,7 +108,7 @@ public class ITDeleteS3Object extends AbstractS3IT {
final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object());
runner.setProperty(DeleteS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, REGION);
runner.setProperty(DeleteS3Object.S3_REGION, REGION);
runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(DeleteS3Object.KEY, "folder/delete-me");
@ -126,7 +126,7 @@ public class ITDeleteS3Object extends AbstractS3IT {
final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object());
runner.setProperty(DeleteS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, REGION);
runner.setProperty(DeleteS3Object.S3_REGION, REGION);
runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME);
final Map<String, String> attrs = new HashMap<>();

View File

@ -42,7 +42,7 @@ public class ITFetchS3Object extends AbstractS3IT {
final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object());
runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(FetchS3Object.REGION, REGION);
runner.setProperty(FetchS3Object.S3_REGION, REGION);
runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
final Map<String, String> attrs = new HashMap<>();
@ -65,7 +65,7 @@ public class ITFetchS3Object extends AbstractS3IT {
final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object());
runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(FetchS3Object.REGION, REGION);
runner.setProperty(FetchS3Object.S3_REGION, REGION);
runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
final Map<String, String> attrs = new HashMap<>();
@ -96,7 +96,7 @@ public class ITFetchS3Object extends AbstractS3IT {
runner.assertValid(serviceImpl);
runner.setProperty(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider");
runner.setProperty(FetchS3Object.REGION, REGION);
runner.setProperty(FetchS3Object.S3_REGION, REGION);
runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
final Map<String, String> attrs = new HashMap<>();
@ -110,11 +110,11 @@ public class ITFetchS3Object extends AbstractS3IT {
}
@Test
public void testTryToFetchNotExistingFile() throws IOException {
public void testTryToFetchNotExistingFile() {
final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object());
runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(FetchS3Object.REGION, REGION);
runner.setProperty(FetchS3Object.S3_REGION, REGION);
runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
final Map<String, String> attrs = new HashMap<>();
@ -134,7 +134,7 @@ public class ITFetchS3Object extends AbstractS3IT {
final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object());
runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(FetchS3Object.REGION, REGION);
runner.setProperty(FetchS3Object.S3_REGION, REGION);
runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
final Map<String, String> attrs = new HashMap<>();

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.aws.s3;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingResult;
@ -121,7 +120,7 @@ public class ITPutS3Object extends AbstractS3IT {
for (int i = 0; i < 3; i++) {
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", String.valueOf(i) + ".txt");
attrs.put("filename", i + ".txt");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
}
runner.run(3);
@ -139,7 +138,7 @@ public class ITPutS3Object extends AbstractS3IT {
for (int i = 0; i < 3; i++) {
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", String.valueOf(i) + ".txt");
attrs.put("filename", i + ".txt");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
}
runner.run(3);
@ -190,7 +189,7 @@ public class ITPutS3Object extends AbstractS3IT {
runner = TestRunners.newTestRunner(new FetchS3Object());
runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(FetchS3Object.REGION, REGION);
runner.setProperty(FetchS3Object.S3_REGION, REGION);
runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
runner.enqueue(new byte[0], attrs);
@ -235,14 +234,14 @@ public class ITPutS3Object extends AbstractS3IT {
runner.assertValid(serviceImpl);
runner.setProperty(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider");
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.S3_REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
for (int i = 0; i < 3; i++) {
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", String.valueOf(i) + ".txt");
attrs.put("filename", i + ".txt");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
}
runner.run(3);
@ -257,7 +256,7 @@ public class ITPutS3Object extends AbstractS3IT {
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.S3_REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
PropertyDescriptor prop1 = processor.getSupportedDynamicPropertyDescriptor("TEST-PROP-1");
runner.setProperty(prop1, "TESTING-1-2-3");
@ -272,10 +271,8 @@ public class ITPutS3Object extends AbstractS3IT {
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
MockFlowFile ff1 = flowFiles.get(0);
for (Map.Entry attrib : ff1.getAttributes().entrySet()) {
System.out.println(attrib.getKey() + " = " + attrib.getValue());
}
MockFlowFile ff = flowFiles.get(0);
ff.assertAttributeExists("s3.usermetadata");
}
@Test
@ -321,13 +318,12 @@ public class ITPutS3Object extends AbstractS3IT {
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
// Fetch
runner = TestRunners.newTestRunner(new FetchS3Object());
runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(FetchS3Object.REGION, REGION);
runner.setProperty(FetchS3Object.S3_REGION, REGION);
runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
runner.enqueue(new byte[0], attrs);
@ -335,7 +331,7 @@ public class ITPutS3Object extends AbstractS3IT {
runner.run(1);
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
MockFlowFile ff = ffs.get(0);
ff.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
ff.assertAttributeNotExists(PutS3Object.S3_CONTENT_DISPOSITION);
@ -417,7 +413,7 @@ public class ITPutS3Object extends AbstractS3IT {
}
@Test
public void testStorageClassesMultipart() throws IOException {
public void testStorageClassesMultipart() {
TestRunner runner = initTestRunner();
runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, "50 MB");
@ -463,7 +459,7 @@ public class ITPutS3Object extends AbstractS3IT {
}
@Test
public void testDynamicProperty() throws IOException {
public void testDynamicProperty() {
final String DYNAMIC_ATTRIB_KEY = "fs.runTimestamp";
final String DYNAMIC_ATTRIB_VALUE = "${now():toNumber()}";
@ -471,7 +467,7 @@ public class ITPutS3Object extends AbstractS3IT {
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.S3_REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
PropertyDescriptor testAttrib = processor.getSupportedDynamicPropertyDescriptor(DYNAMIC_ATTRIB_KEY);
@ -490,7 +486,7 @@ public class ITPutS3Object extends AbstractS3IT {
assertEquals(1, successFiles.size());
MockFlowFile ff1 = successFiles.get(0);
Long now = System.currentTimeMillis();
long now = System.currentTimeMillis();
String millisNow = Long.toString(now);
String millisOneSecAgo = Long.toString(now - 1000L);
String usermeta = ff1.getAttribute(PutS3Object.S3_USERMETA_ATTR_KEY);
@ -502,14 +498,14 @@ public class ITPutS3Object extends AbstractS3IT {
}
@Test
public void testProvenance() throws InitializationException {
public void testProvenance() {
final String PROV1_FILE = "provfile1";
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.S3_REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.KEY, "${filename}");
@ -538,23 +534,23 @@ public class ITPutS3Object extends AbstractS3IT {
@Test
public void testStateDefaults() {
PutS3Object.MultipartState state1 = new PutS3Object.MultipartState();
assertEquals(state1.getUploadId(), "");
assertEquals(state1.getFilePosition(), (Long) 0L);
assertEquals(state1.getPartETags().size(), 0L);
assertEquals(state1.getPartSize(), (Long) 0L);
assertEquals(state1.getStorageClass().toString(), StorageClass.Standard.toString());
assertEquals(state1.getContentLength(), (Long) 0L);
assertEquals("", state1.getUploadId());
assertEquals((Long) 0L, state1.getFilePosition());
assertEquals(0L, state1.getPartETags().size());
assertEquals((Long) 0L, state1.getPartSize());
assertEquals(StorageClass.Standard.toString(), state1.getStorageClass().toString());
assertEquals((Long) 0L, state1.getContentLength());
}
@Test
public void testStateToString() throws IOException, InitializationException {
public void testStateToString() {
final String target = "UID-test1234567890#10001#1/PartETag-1,2/PartETag-2,3/PartETag-3,4/PartETag-4#20002#REDUCED_REDUNDANCY#30003#8675309";
PutS3Object.MultipartState state2 = new PutS3Object.MultipartState();
state2.setUploadId("UID-test1234567890");
state2.setFilePosition(10001L);
state2.setTimestamp(8675309L);
for (Integer partNum = 1; partNum < 5; partNum++) {
state2.addPartETag(new PartETag(partNum, "PartETag-" + partNum.toString()));
for (int partNum = 1; partNum < 5; partNum++) {
state2.addPartETag(new PartETag(partNum, "PartETag-" + partNum));
}
state2.setPartSize(20002L);
state2.setStorageClass(StorageClass.ReducedRedundancy);
@ -581,19 +577,19 @@ public class ITPutS3Object extends AbstractS3IT {
assertEquals(TESTKEY, context.getProperty(PutS3Object.KEY).evaluateAttributeExpressions(Collections.emptyMap()).toString());
assertEquals(TEST_ENDPOINT, context.getProperty(PutS3Object.ENDPOINT_OVERRIDE).toString());
String s3url = ((TestablePutS3Object)processor).testable_getClient().getResourceUrl(BUCKET_NAME, TESTKEY);
String s3url = ((TestablePutS3Object)processor).testable_getClient(context).getResourceUrl(BUCKET_NAME, TESTKEY);
assertEquals(TEST_ENDPOINT + "/" + BUCKET_NAME + "/" + TESTKEY, s3url);
}
@Test
public void testMultipartProperties() throws IOException {
public void testMultipartProperties() {
final PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
final ProcessContext context = runner.getProcessContext();
runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST,
"28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5");
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.S3_REGION, REGION);
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.KEY, AbstractS3IT.SAMPLE_FILE_RESOURCE_NAME);
@ -812,7 +808,7 @@ public class ITPutS3Object extends AbstractS3IT {
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.S3_REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
@ -844,7 +840,7 @@ public class ITPutS3Object extends AbstractS3IT {
final Path tempFile = Files.createTempFile("s3mulitpart", "tmp");
final FileOutputStream tempOut = new FileOutputStream(tempFile.toFile());
long tempByteCount = 0;
for ( ; tempByteCount < TEST_PARTSIZE_LONG + 1; ) {
while (tempByteCount < TEST_PARTSIZE_LONG + 1) {
tempOut.write(megabyte);
tempByteCount += megabyte.length;
}
@ -857,7 +853,7 @@ public class ITPutS3Object extends AbstractS3IT {
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.S3_REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, TEST_PARTSIZE_STRING);
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
@ -887,7 +883,7 @@ public class ITPutS3Object extends AbstractS3IT {
final String FILE1_NAME = "file1";
final byte[] megabyte = new byte[1024 * 1024];
final Path tempFile = Files.createTempFile("s3mulitpart", "tmp");
final Path tempFile = Files.createTempFile("s3multipart", "tmp");
final FileOutputStream tempOut = new FileOutputStream(tempFile.toFile());
for (int i = 0; i < (S3_MAXIMUM_OBJECT_SIZE / 1024 / 1024 + 1); i++) {
tempOut.write(megabyte);
@ -898,7 +894,7 @@ public class ITPutS3Object extends AbstractS3IT {
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.S3_REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
@ -928,7 +924,7 @@ public class ITPutS3Object extends AbstractS3IT {
final ProcessContext context = runner.getProcessContext();
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.S3_REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
// set check interval and age off to minimum values
@ -936,7 +932,7 @@ public class ITPutS3Object extends AbstractS3IT {
runner.setProperty(PutS3Object.MULTIPART_S3_MAX_AGE, "1 milli");
// create some dummy uploads
for (Integer i = 0; i < 3; i++) {
for (int i = 0; i < 3; i++) {
final InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(
BUCKET_NAME, "file" + i + ".txt");
assertDoesNotThrow(() -> client.initiateMultipartUpload(initiateRequest));
@ -950,7 +946,7 @@ public class ITPutS3Object extends AbstractS3IT {
// call to circumvent what appears to be caching in the AWS library.
// The increments are 1000 millis because AWS returns upload
// initiation times in whole seconds.
Long now = System.currentTimeMillis();
long now = System.currentTimeMillis();
MultipartUploadListing uploadList = processor.getS3AgeoffListAndAgeoffLocalState(context, client, now, BUCKET_NAME);
assertEquals(3, uploadList.getMultipartUploads().size());
@ -971,7 +967,7 @@ public class ITPutS3Object extends AbstractS3IT {
}
@Test
public void testObjectTags() throws IOException, InterruptedException {
public void testObjectTags() throws IOException {
TestRunner runner = initTestRunner();
runner.setProperty(PutS3Object.OBJECT_TAGS_PREFIX, "tagS3");
@ -992,7 +988,7 @@ public class ITPutS3Object extends AbstractS3IT {
System.out.println("Tag Key : " + tag.getKey() + ", Tag Value : " + tag.getValue());
}
assertTrue(objectTags.size() == 1);
assertEquals(1, objectTags.size());
assertEquals("PII", objectTags.get(0).getKey());
assertEquals("true", objectTags.get(0).getValue());
}
@ -1023,7 +1019,7 @@ public class ITPutS3Object extends AbstractS3IT {
assertEquals(1, flowFiles.size());
assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
MockFlowFile putSuccess = flowFiles.get(0);
assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3);
assertEquals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3, putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY));
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3, null);
flowFile.assertContentEquals(data);
@ -1057,7 +1053,7 @@ public class ITPutS3Object extends AbstractS3IT {
assertEquals(1, flowFiles.size());
assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
MockFlowFile putSuccess = flowFiles.get(0);
assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS);
assertEquals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS, putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY));
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId);
flowFile.assertContentEquals(data);
@ -1091,7 +1087,7 @@ public class ITPutS3Object extends AbstractS3IT {
assertEquals(1, flowFiles.size());
assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
MockFlowFile putSuccess = flowFiles.get(0);
assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_SSE_C);
assertEquals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_C, putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY));
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial);
flowFile.assertContentEquals(data);
@ -1127,7 +1123,7 @@ public class ITPutS3Object extends AbstractS3IT {
assertEquals(1, flowFiles.size());
assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
MockFlowFile putSuccess = flowFiles.get(0);
assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS);
assertEquals(AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS, putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY));
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId);
flowFile.assertContentEquals(data);
@ -1161,7 +1157,7 @@ public class ITPutS3Object extends AbstractS3IT {
assertEquals(1, flowFiles.size());
assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
MockFlowFile putSuccess = flowFiles.get(0);
assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_CSE_C);
assertEquals(AmazonS3EncryptionService.STRATEGY_NAME_CSE_C, putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY));
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_CSE_C, randomKeyMaterial);
flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_CSE_C);
@ -1205,7 +1201,7 @@ public class ITPutS3Object extends AbstractS3IT {
final ConfigurationContext context = mock(ConfigurationContext.class);
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.S3_REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
if (strategyName != null) {
@ -1236,14 +1232,14 @@ public class ITPutS3Object extends AbstractS3IT {
@Override
public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest listMultipartUploadsRequest)
throws AmazonClientException, AmazonServiceException {
throws AmazonClientException {
return listing;
}
}
public class TestablePutS3Object extends PutS3Object {
public AmazonS3Client testable_getClient() {
return this.getClient();
public AmazonS3Client testable_getClient(ProcessContext context) {
return this.getClient(context);
}
}
@ -1269,7 +1265,7 @@ public class ITPutS3Object extends AbstractS3IT {
TestRunner runner = TestRunners.newTestRunner(PutS3Object.class);
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.S3_REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
return runner;

View File

@ -50,7 +50,7 @@ public class ITTagS3Object extends AbstractS3IT {
// Set up processor
final TestRunner runner = TestRunners.newTestRunner(new TagS3Object());
runner.setProperty(TagS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(TagS3Object.REGION, REGION);
runner.setProperty(TagS3Object.S3_REGION, REGION);
runner.setProperty(TagS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagValue);
@ -84,7 +84,7 @@ public class ITTagS3Object extends AbstractS3IT {
// Set up processor
final TestRunner runner = TestRunners.newTestRunner(new TagS3Object());
runner.setProperty(TagS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(TagS3Object.REGION, REGION);
runner.setProperty(TagS3Object.S3_REGION, REGION);
runner.setProperty(TagS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagValue);
@ -119,7 +119,7 @@ public class ITTagS3Object extends AbstractS3IT {
// Set up processor
final TestRunner runner = TestRunners.newTestRunner(new TagS3Object());
runner.setProperty(TagS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(TagS3Object.REGION, REGION);
runner.setProperty(TagS3Object.S3_REGION, REGION);
runner.setProperty(TagS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagValue);

View File

@ -21,6 +21,7 @@ import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteVersionRequest;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -47,7 +48,8 @@ public class TestDeleteS3Object {
public void setUp() {
mockS3Client = Mockito.mock(AmazonS3Client.class);
mockDeleteS3Object = new DeleteS3Object() {
protected AmazonS3Client getClient() {
@Override
protected AmazonS3Client getS3Client(final ProcessContext context, final Map<String, String> attributes) {
return mockS3Client;
}
};
@ -56,7 +58,7 @@ public class TestDeleteS3Object {
@Test
public void testDeleteObjectSimple() {
runner.setProperty(DeleteS3Object.REGION, "us-west-2");
runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET, "test-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "delete-key");
@ -73,9 +75,23 @@ public class TestDeleteS3Object {
Mockito.verify(mockS3Client, Mockito.never()).deleteVersion(Mockito.any(DeleteVersionRequest.class));
}
@Test
public void testDeleteObjectSimpleRegionFromFlowFileAttribute() {
runner.setProperty(DeleteS3Object.S3_REGION, "attribute-defined-region");
runner.setProperty(DeleteS3Object.BUCKET, "test-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "delete-key");
attrs.put("s3.region", "us-east-1");
runner.enqueue(new byte[0], attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
}
@Test
public void testDeleteObjectS3Exception() {
runner.setProperty(DeleteS3Object.REGION, "us-west-2");
runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET, "test-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "delete-key");
@ -91,7 +107,7 @@ public class TestDeleteS3Object {
@Test
public void testDeleteVersionSimple() {
runner.setProperty(DeleteS3Object.REGION, "us-west-2");
runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET, "test-bucket");
runner.setProperty(DeleteS3Object.VERSION_ID, "test-version");
final Map<String, String> attrs = new HashMap<>();
@ -112,7 +128,7 @@ public class TestDeleteS3Object {
@Test
public void testDeleteVersionFromExpressions() {
runner.setProperty(DeleteS3Object.REGION, "us-west-2");
runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET, "${s3.bucket}");
runner.setProperty(DeleteS3Object.VERSION_ID, "${s3.version}");
final Map<String, String> attrs = new HashMap<>();
@ -148,7 +164,7 @@ public class TestDeleteS3Object {
assertTrue(pd.contains(processor.OWNER));
assertTrue(pd.contains(processor.READ_ACL_LIST));
assertTrue(pd.contains(processor.READ_USER_LIST));
assertTrue(pd.contains(processor.REGION));
assertTrue(pd.contains(processor.S3_REGION));
assertTrue(pd.contains(processor.SECRET_KEY));
assertTrue(pd.contains(processor.SIGNER_OVERRIDE));
assertTrue(pd.contains(processor.S3_CUSTOM_SIGNER_CLASS_NAME));

View File

@ -16,7 +16,9 @@
*/
package org.apache.nifi.processors.aws.s3;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.SdkClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectRequest;
@ -28,7 +30,6 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -66,14 +67,9 @@ public class TestFetchS3Object {
public void setUp() {
mockS3Client = mock(AmazonS3Client.class);
mockFetchS3Object = new FetchS3Object() {
protected AmazonS3Client getClient() {
actualS3Client = client;
return mockS3Client;
}
@Override
protected AbstractAWSProcessor<AmazonS3Client>.AWSConfiguration getConfiguration(ProcessContext context) {
return new AWSConfiguration(mockS3Client, null);
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
return mockS3Client;
}
};
runner = TestRunners.newTestRunner(mockFetchS3Object);
@ -81,10 +77,11 @@ public class TestFetchS3Object {
@Test
public void testGetObject() throws IOException {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.S3_REGION, "attribute-defined-region");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
attrs.put("s3.region", "us-west-2");
runner.enqueue(new byte[0], attrs);
S3Object s3ObjectResponse = new S3Object();
@ -147,7 +144,7 @@ public class TestFetchS3Object {
@Test
public void testGetObjectWithRequesterPays() throws IOException {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
runner.setProperty(FetchS3Object.REQUESTER_PAYS, "true");
final Map<String, String> attrs = new HashMap<>();
@ -205,7 +202,7 @@ public class TestFetchS3Object {
@Test
public void testGetObjectVersion() throws IOException {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
runner.setProperty(FetchS3Object.VERSION_ID, "${s3.version}");
final Map<String, String> attrs = new HashMap<>();
@ -245,7 +242,7 @@ public class TestFetchS3Object {
@Test
public void testGetObjectExceptionGoesToFailure() {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@ -259,7 +256,7 @@ public class TestFetchS3Object {
@Test
public void testFetchObject_FailAdditionalAttributesBucketName() {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@ -288,7 +285,7 @@ public class TestFetchS3Object {
@Test
public void testFetchObject_FailAdditionalAttributesAuthentication() {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@ -312,7 +309,7 @@ public class TestFetchS3Object {
@Test
public void testFetchObject_FailAdditionalAttributesNetworkFailure() {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@ -330,7 +327,7 @@ public class TestFetchS3Object {
@Test
public void testGetObjectReturnsNull() {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@ -344,7 +341,7 @@ public class TestFetchS3Object {
@Test
public void testFlowFileAccessExceptionGoesToFailure() {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@ -369,7 +366,7 @@ public class TestFetchS3Object {
assertTrue(pd.contains(FetchS3Object.CREDENTIALS_FILE));
assertTrue(pd.contains(FetchS3Object.ENDPOINT_OVERRIDE));
assertTrue(pd.contains(FetchS3Object.KEY));
assertTrue(pd.contains(FetchS3Object.REGION));
assertTrue(pd.contains(FetchS3Object.S3_REGION));
assertTrue(pd.contains(FetchS3Object.SECRET_KEY));
assertTrue(pd.contains(FetchS3Object.SIGNER_OVERRIDE));
assertTrue(pd.contains(FetchS3Object.S3_CUSTOM_SIGNER_CLASS_NAME));

View File

@ -69,10 +69,6 @@ public class TestListS3 {
public void setUp() {
mockS3Client = Mockito.mock(AmazonS3Client.class);
final ListS3 mockListS3 = new ListS3() {
protected AmazonS3Client getClient() {
return mockS3Client;
}
@Override
protected AmazonS3Client createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
return mockS3Client;

View File

@ -47,13 +47,13 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -73,7 +73,7 @@ public class TestPutS3Object {
mockS3Client = mock(AmazonS3Client.class);
putS3Object = new PutS3Object() {
@Override
protected AmazonS3Client getClient() {
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
return mockS3Client;
}
};
@ -174,7 +174,7 @@ public class TestPutS3Object {
}
@Test
public void testFilenameWithNationalCharacters() throws UnsupportedEncodingException {
public void testFilenameWithNationalCharacters() {
prepareTest("Iñtërnâtiônàližætiøn.txt");
runner.run(1);
@ -184,7 +184,18 @@ public class TestPutS3Object {
PutObjectRequest request = captureRequest.getValue();
ObjectMetadata objectMetadata = request.getMetadata();
assertEquals(URLEncoder.encode("Iñtërnâtiônàližætiøn.txt", "UTF-8"), objectMetadata.getContentDisposition());
assertEquals(URLEncoder.encode("Iñtërnâtiônàližætiøn.txt", UTF_8), objectMetadata.getContentDisposition());
}
@Test
public void testRegionFromFlowFileAttribute() {
runner.setProperty(PutS3Object.OBJECT_TAGS_PREFIX, "tagS3");
runner.setProperty(PutS3Object.REMOVE_TAG_PREFIX, "false");
prepareTestWithRegionInAttributes("testfile.txt", "us-east-1");
runner.run(1);
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
}
private void prepareTest() {
@ -192,7 +203,7 @@ public class TestPutS3Object {
}
private void prepareTest(String filename) {
runner.setProperty(PutS3Object.REGION, "ap-northeast-1");
runner.setProperty(PutS3Object.S3_REGION, "ap-northeast-1");
runner.setProperty(PutS3Object.BUCKET, "test-bucket");
runner.assertValid();
@ -201,6 +212,24 @@ public class TestPutS3Object {
ffAttributes.put("tagS3PII", "true");
runner.enqueue("Test Content", ffAttributes);
initMocks();
}
private void prepareTestWithRegionInAttributes(String filename, String region) {
runner.setProperty(PutS3Object.S3_REGION, "attribute-defined-region");
runner.setProperty(PutS3Object.BUCKET, "test-bucket");
runner.assertValid();
Map<String, String> ffAttributes = new HashMap<>();
ffAttributes.put("s3.region", region);
ffAttributes.put("filename", filename);
ffAttributes.put("tagS3PII", "true");
runner.enqueue("Test Content", ffAttributes);
initMocks();
}
private void initMocks() {
PutObjectResult putObjectResult = new PutObjectResult();
putObjectResult.setExpirationTime(new Date());
putObjectResult.setMetadata(new ObjectMetadata());
@ -261,7 +290,7 @@ public class TestPutS3Object {
assertTrue(pd.contains(PutS3Object.OWNER));
assertTrue(pd.contains(PutS3Object.READ_ACL_LIST));
assertTrue(pd.contains(PutS3Object.READ_USER_LIST));
assertTrue(pd.contains(PutS3Object.REGION));
assertTrue(pd.contains(PutS3Object.S3_REGION));
assertTrue(pd.contains(PutS3Object.SECRET_KEY));
assertTrue(pd.contains(PutS3Object.SIGNER_OVERRIDE));
assertTrue(pd.contains(PutS3Object.S3_CUSTOM_SIGNER_CLASS_NAME));

View File

@ -16,12 +16,15 @@
*/
package org.apache.nifi.processors.aws.s3;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectTaggingResult;
import com.amazonaws.services.s3.model.SetObjectTaggingRequest;
import com.amazonaws.services.s3.model.Tag;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -30,8 +33,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -47,16 +48,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestTagS3Object {
private TestRunner runner = null;
private TagS3Object mockTagS3Object = null;
private AmazonS3Client actualS3Client = null;
private AmazonS3Client mockS3Client = null;
@BeforeEach
public void setUp() {
mockS3Client = Mockito.mock(AmazonS3Client.class);
mockTagS3Object = new TagS3Object() {
protected AmazonS3Client getClient() {
actualS3Client = client;
TagS3Object mockTagS3Object = new TagS3Object() {
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
return mockS3Client;
}
};
@ -64,14 +63,15 @@ public class TestTagS3Object {
}
@Test
public void testTagObjectSimple() throws IOException {
public void testTagObjectSimple() {
final String tagKey = "k";
final String tagVal = "v";
runner.setProperty(TagS3Object.REGION, "us-west-2");
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
runner.setProperty(TagS3Object.APPEND_TAG, "false");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "object-key");
runner.enqueue(new byte[0], attrs);
@ -93,10 +93,28 @@ public class TestTagS3Object {
}
@Test
public void testTagObjectVersion() throws IOException {
public void testTagObjectSimpleRegionFromFlowFileAttribute() {
runner.setProperty(TagS3Object.S3_REGION, "attribute-defined-region");
runner.setProperty(TagS3Object.BUCKET, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, "k");
runner.setProperty(TagS3Object.TAG_VALUE, "v");
runner.setProperty(TagS3Object.APPEND_TAG, "false");
runner.assertValid();
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "object-key");
attrs.put("s3.region", "us-east-1");
runner.enqueue(new byte[0], attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS);
}
@Test
public void testTagObjectVersion() {
final String tagKey = "k";
final String tagVal = "v";
runner.setProperty(TagS3Object.REGION, "us-west-2");
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET, "test-bucket");
runner.setProperty(TagS3Object.VERSION_ID, "test-version");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
@ -119,14 +137,14 @@ public class TestTagS3Object {
}
@Test
public void testTagObjectAppendToExistingTags() throws IOException {
public void testTagObjectAppendToExistingTags() {
//set up existing tags on S3 object
Tag currentTag = new Tag("ck", "cv");
mockGetExistingTags(currentTag);
final String tagKey = "nk";
final String tagVal = "nv";
runner.setProperty(TagS3Object.REGION, "us-west-2");
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
@ -153,7 +171,7 @@ public class TestTagS3Object {
}
@Test
public void testTagObjectAppendUpdatesExistingTagValue() throws IOException {
public void testTagObjectAppendUpdatesExistingTagValue() {
//set up existing tags on S3 object
Tag currentTag1 = new Tag("ck", "cv");
Tag currentTag2 = new Tag("nk", "ov");
@ -161,7 +179,7 @@ public class TestTagS3Object {
final String tagKey = "nk";
final String tagVal = "nv";
runner.setProperty(TagS3Object.REGION, "us-west-2");
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
@ -183,14 +201,14 @@ public class TestTagS3Object {
}
@Test
public void testTagObjectReplacesExistingTags() throws IOException {
public void testTagObjectReplacesExistingTags() {
//set up existing tags on S3 object
Tag currentTag = new Tag("ck", "cv");
mockGetExistingTags(currentTag);
final String tagKey = "nk";
final String tagVal = "nv";
runner.setProperty(TagS3Object.REGION, "us-west-2");
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
@ -225,7 +243,7 @@ public class TestTagS3Object {
final String tagKey = "nk";
final String tagVal = "nv";
runner.setProperty(TagS3Object.REGION, "us-west-2");
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
@ -241,7 +259,7 @@ public class TestTagS3Object {
}
@Test
public void testGetPropertyDescriptors() throws Exception {
public void testGetPropertyDescriptors() {
TagS3Object processor = new TagS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals(22, pd.size(), "size should be eq");
@ -251,7 +269,7 @@ public class TestTagS3Object {
assertTrue(pd.contains(TagS3Object.CREDENTIALS_FILE));
assertTrue(pd.contains(TagS3Object.ENDPOINT_OVERRIDE));
assertTrue(pd.contains(TagS3Object.KEY));
assertTrue(pd.contains(TagS3Object.REGION));
assertTrue(pd.contains(TagS3Object.S3_REGION));
assertTrue(pd.contains(TagS3Object.SECRET_KEY));
assertTrue(pd.contains(TagS3Object.SIGNER_OVERRIDE));
assertTrue(pd.contains(TagS3Object.S3_CUSTOM_SIGNER_CLASS_NAME));
@ -271,7 +289,7 @@ public class TestTagS3Object {
@Test
public void testBucketEvaluatedAsBlank() {
runner.setProperty(TagS3Object.REGION, "us-west-2");
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET, "${not.existant.attribute}");
runner.setProperty(TagS3Object.TAG_KEY, "key");
runner.setProperty(TagS3Object.TAG_VALUE, "val");
@ -286,7 +304,7 @@ public class TestTagS3Object {
@Test
public void testTagKeyEvaluatedAsBlank() {
runner.setProperty(TagS3Object.REGION, "us-west-2");
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, "${not.existant.attribute}");
runner.setProperty(TagS3Object.TAG_VALUE, "val");
@ -301,7 +319,7 @@ public class TestTagS3Object {
@Test
public void testTagValEvaluatedAsBlank() {
runner.setProperty(TagS3Object.REGION, "us-west-2");
runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET, "test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, "tagKey");
runner.setProperty(TagS3Object.TAG_VALUE, "${not.existant.attribute}");
@ -318,4 +336,5 @@ public class TestTagS3Object {
List<Tag> currentTags = new ArrayList<>(Arrays.asList(currentTag));
Mockito.when(mockS3Client.getObjectTagging(Mockito.any())).thenReturn(new GetObjectTaggingResult(currentTags));
}
}

View File

@ -21,6 +21,7 @@ import com.amazonaws.services.sns.model.AmazonSNSException;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -48,7 +49,7 @@ public class TestPutSNS {
mockSNSClient = Mockito.mock(AmazonSNSClient.class);
mockPutSNS = new PutSNS() {
@Override
protected AmazonSNSClient getClient() {
protected AmazonSNSClient getClient(ProcessContext context) {
return mockSNSClient;
}
};

View File

@ -20,6 +20,7 @@ import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.AmazonSQSException;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
@ -47,8 +48,9 @@ public class TestDeleteSQS {
Mockito.when(mockSQSClient.deleteMessageBatch(Mockito.any())).thenReturn(mockResponse);
Mockito.when(mockResponse.getFailed()).thenReturn(new ArrayList<>());
mockDeleteSQS = new DeleteSQS() {
@Override
protected AmazonSQSClient getClient() {
protected AmazonSQSClient getClient(ProcessContext context) {
return mockSQSClient;
}
};

View File

@ -22,6 +22,7 @@ import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -45,7 +46,8 @@ public class TestGetSQS {
public void setUp() {
mockSQSClient = Mockito.mock(AmazonSQSClient.class);
mockGetSQS = new GetSQS() {
protected AmazonSQSClient getClient() {
@Override
protected AmazonSQSClient getClient(ProcessContext context) {
return mockSQSClient;
}
};

View File

@ -20,6 +20,7 @@ import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.AmazonSQSException;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
@ -38,7 +39,6 @@ public class TestPutSQS {
private TestRunner runner = null;
private PutSQS mockPutSQS = null;
private AmazonSQSClient actualSQSClient = null;
private AmazonSQSClient mockSQSClient = null;
@BeforeEach
@ -46,8 +46,7 @@ public class TestPutSQS {
mockSQSClient = Mockito.mock(AmazonSQSClient.class);
mockPutSQS = new PutSQS() {
@Override
protected AmazonSQSClient getClient() {
actualSQSClient = client;
protected AmazonSQSClient getClient(ProcessContext context) {
return mockSQSClient;
}
};