diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml index 24dd2c2e1f..177839b7de 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml @@ -25,6 +25,24 @@ nifi-aws-abstract-processors + + software.amazon.awssdk + sdk-core + + + software.amazon.awssdk + apache-client + + + software.amazon.awssdk + sqs + + + software.amazon.awssdk + netty-nio-client + + + com.amazonaws aws-java-sdk-core @@ -48,14 +66,6 @@ - - software.amazon.awssdk - s3 - - - software.amazon.awssdk - apache-client - org.slf4j jcl-over-slf4j diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/PropertiesCredentialsProvider.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/PropertiesCredentialsProvider.java similarity index 100% rename from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/PropertiesCredentialsProvider.java rename to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/PropertiesCredentialsProvider.java diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java index 2d9841a7f3..3ab6729a11 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java @@ -20,14 +20,11 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; +import org.apache.nifi.processors.aws.v2.AbstractAwsProcessor; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.SqsClientBuilder; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.services.sqs.AmazonSQSClient; - -public abstract class AbstractSQSProcessor extends AbstractAWSCredentialsProviderProcessor { +public abstract class AbstractSQSProcessor extends AbstractAwsProcessor { public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() .name("Batch Size") @@ -45,26 +42,9 @@ public abstract class AbstractSQSProcessor extends AbstractAWSCredentialsProvide .required(true) .build(); - /** - * Create client using credentials provider. This is the preferred way for creating clients - */ @Override - protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) { - getLogger().info("Creating client using aws credentials provider "); - - return new AmazonSQSClient(credentialsProvider, config); - } - - /** - * Create client using AWSCredentials - * - * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead - */ - @Override - protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { - getLogger().info("Creating client using aws credentials "); - - return new AmazonSQSClient(credentials, config); + protected SqsClientBuilder createClientBuilder(final ProcessContext context) { + return SqsClient.builder(); } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java new file mode 100644 index 0000000000..ceb05b9094 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.v2; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ConfigVerificationResult.Outcome; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.credentials.provider.PropertiesCredentialsProvider; +import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.proxy.ProxyConfigurationService; +import org.apache.nifi.proxy.ProxySpec; +import org.apache.nifi.ssl.SSLContextService; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder; +import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder; +import software.amazon.awssdk.core.SdkClient; +import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.http.FileStoreTlsKeyManagersProvider; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.TlsKeyManagersProvider; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.regions.Region; + +import javax.net.ssl.TrustManager; +import java.io.File; +import java.net.Proxy; +import java.net.URI; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Base class for aws processors using the AWS v2 SDK. + * + * @param client type + * + * @see AwsCredentialsProvider + */ +public abstract class AbstractAwsProcessor & AwsClientBuilder> + extends AbstractProcessor implements VerifiableProcessor, AwsClientProvider { + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to success relationship") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to failure relationship") + .build(); + + private static final Set relationships = Collections.unmodifiableSet( + new LinkedHashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)) + ); + + public static final PropertyDescriptor CREDENTIALS_FILE = CredentialPropertyDescriptors.CREDENTIALS_FILE; + + public static final PropertyDescriptor ACCESS_KEY = CredentialPropertyDescriptors.ACCESS_KEY; + + public static final PropertyDescriptor SECRET_KEY = CredentialPropertyDescriptors.SECRET_KEY; + + public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder() + .name("Proxy Host") + .description("Proxy host name or IP") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROXY_HOST_PORT = new PropertyDescriptor.Builder() + .name("Proxy Host Port") + .description("Proxy host port") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(false) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROXY_USERNAME = new PropertyDescriptor.Builder() + .name("proxy-user-name") + .displayName("Proxy Username") + .description("Proxy username") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROXY_PASSWORD = new PropertyDescriptor.Builder() + .name("proxy-user-password") + .displayName("Proxy Password") + .description("Proxy password") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + + public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder() + .name("Region") + .required(true) + .allowableValues(RegionUtil.getAvailableRegions()) + .defaultValue(RegionUtil.createAllowableValue(Region.US_WEST_2).getValue()) + .build(); + + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("Communications Timeout") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("30 secs") + .build(); + + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder() + .name("Endpoint Override URL") + .description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " + + "The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " + + "the selected endpoint URL, allowing use with other S3-compatible endpoints.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(false) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + /** + * AWS credentials provider service + * + * @see AWSCredentialsProvider + * @see AwsCredentialsProvider + */ + public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder() + .name("AWS Credentials Provider service") + .displayName("AWS Credentials Provider Service") + .description("The Controller Service that is used to obtain AWS credentials provider") + .required(false) + .identifiesControllerService(AWSCredentialsProviderService.class) + .build(); + + protected static final String DEFAULT_USER_AGENT = "NiFi"; + + private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH}; + + public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS); + + protected volatile T client; + + protected volatile Region region; + + private final AwsClientCache awsClientCache = new AwsClientCache<>(); + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final List validationResults = new ArrayList<>(super.customValidate(validationContext)); + + final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet(); + final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet(); + if ((accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet)) { + validationResults.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("If setting Secret Key or Access Key, must set both").build()); + } + + final boolean credentialsFileSet = validationContext.getProperty(CREDENTIALS_FILE).isSet(); + if ((secretKeySet || accessKeySet) && credentialsFileSet) { + validationResults.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build()); + } + + final boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet(); + final boolean proxyPortSet = validationContext.getProperty(PROXY_HOST_PORT).isSet(); + final boolean proxyConfigServiceSet = validationContext.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet(); + + if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) { + validationResults.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build()); + } + + final boolean proxyUserSet = validationContext.getProperty(PROXY_USERNAME).isSet(); + final boolean proxyPwdSet = validationContext.getProperty(PROXY_PASSWORD).isSet(); + + if ((proxyUserSet && !proxyPwdSet) || (!proxyUserSet && proxyPwdSet)) { + validationResults.add(new ValidationResult.Builder().subject("Proxy User and Password").valid(false).explanation("If Proxy Username or Proxy Password is set, both must be set").build()); + } + + if (proxyUserSet && !proxyHostSet) { + validationResults.add(new ValidationResult.Builder().subject("Proxy").valid(false).explanation("If Proxy Username or Proxy Password").build()); + } + + ProxyConfiguration.validateProxySpec(validationContext, validationResults, PROXY_SPECS); + + if (proxyHostSet && proxyConfigServiceSet) { + validationResults.add(new ValidationResult.Builder().subject("Proxy Configuration Service").valid(false) + .explanation("Either Proxy Username and Proxy Password must be set or Proxy Configuration Service but not both").build()); + } + + return validationResults; + } + + @OnShutdown + public void onShutDown() { + if (this.client != null) { + this.client.close(); + } + } + + @OnStopped + public void onStopped() { + this.awsClientCache.clearCache(); + } + + @Override + public List verify(final ProcessContext context, final ComponentLog verificationLogger, final Map attributes) { + final List results = new ArrayList<>(); + + try { + createClient(context); + results.add(new ConfigVerificationResult.Builder() + .outcome(Outcome.SUCCESSFUL) + .verificationStepName("Create Client") + .explanation("Successfully created AWS Client") + .build()); + } catch (final Exception e) { + verificationLogger.error("Failed to create AWS Client", e); + results.add(new ConfigVerificationResult.Builder() + .outcome(Outcome.FAILED) + .verificationStepName("Create Client") + .explanation("Failed to crete AWS Client: " + e.getMessage()) + .build()); + } + + return results; + } + + /** + * Creates the AWS SDK client. + * @param context The process context + * @return The created client + */ + @Override + public T createClient(final ProcessContext context) { + final U clientBuilder = createClientBuilder(context); + this.configureClientBuilder(clientBuilder, context); + return clientBuilder.build(); + } + + protected void configureClientBuilder(final U clientBuilder, final ProcessContext context) { + clientBuilder.overrideConfiguration(builder -> builder.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, DEFAULT_USER_AGENT)); + clientBuilder.overrideConfiguration(builder -> builder.retryPolicy(RetryPolicy.none())); + clientBuilder.httpClient(createSdkHttpClient(context)); + + final Region region = getRegion(context); + if (region != null) { + clientBuilder.region(region); + } + configureEndpoint(context, clientBuilder); + + final AwsCredentialsProvider credentialsProvider = getCredentialsProvider(context); + clientBuilder.credentialsProvider(credentialsProvider); + } + + /** + * Creates an AWS service client from the context or returns an existing client from the cache + * @param context The process context + * @param awsClientDetails details of the AWS client + * @return The created client + */ + protected T getClient(final ProcessContext context, final AwsClientDetails awsClientDetails) { + return this.awsClientCache.getOrCreateClient(context, awsClientDetails, this); + } + + protected T getClient(final ProcessContext context) { + final AwsClientDetails awsClientDetails = new AwsClientDetails(getRegion(context)); + return getClient(context, awsClientDetails); + } + + /** + * Construct the AWS SDK client builder and perform any service-specific configuration of the builder. + * @param context The process context + * @return The SDK client builder + */ + protected abstract U createClientBuilder(final ProcessContext context); + + protected Region getRegion(final ProcessContext context) { + final Region region; + // if the processor supports REGION, get the configured region. + if (getSupportedPropertyDescriptors().contains(REGION)) { + final String regionValue = context.getProperty(REGION).getValue(); + if (regionValue != null) { + region = Region.of(regionValue); + } else { + region = null; + } + } else { + region = null; + } + return region; + } + + protected void configureEndpoint(final ProcessContext context, final U clientBuilder) { + // if the endpoint override has been configured, set the endpoint. + // (per Amazon docs this should only be configured at client creation) + if (getSupportedPropertyDescriptors().contains(ENDPOINT_OVERRIDE)) { + final String endpointOverride = StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue()); + + if (!endpointOverride.isEmpty()) { + getLogger().info("Overriding endpoint with {}", endpointOverride); + + clientBuilder.endpointOverride(URI.create(endpointOverride)); + } + } + } + + /** + * Get credentials provider using the {@link AwsCredentialsProvider} + * @param context the process context + * @return AwsCredentialsProvider the credential provider + */ + protected AwsCredentialsProvider getCredentialsProvider(final ProcessContext context) { + final AWSCredentialsProviderService awsCredentialsProviderService = + context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService(AWSCredentialsProviderService.class); + + return awsCredentialsProviderService != null ? awsCredentialsProviderService.getAwsCredentialsProvider() : createStaticCredentialsProvider(context); + + } + + protected AwsCredentialsProvider createStaticCredentialsProvider(final PropertyContext context) { + final String accessKey = context.getProperty(ACCESS_KEY).evaluateAttributeExpressions().getValue(); + final String secretKey = context.getProperty(SECRET_KEY).evaluateAttributeExpressions().getValue(); + + final String credentialsFile = context.getProperty(CREDENTIALS_FILE).getValue(); + + if (credentialsFile != null) { + return new PropertiesCredentialsProvider(new File(credentialsFile)); + } + + if (accessKey != null && secretKey != null) { + return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)); + } + + return AnonymousCredentialsProvider.create(); + } + + private SdkHttpClient createSdkHttpClient(final ProcessContext context) { + final ApacheHttpClient.Builder builder = ApacheHttpClient.builder(); + + final int communicationsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + builder.connectionTimeout(Duration.ofMillis(communicationsTimeout)); + builder.socketTimeout(Duration.ofMillis(communicationsTimeout)); + builder.maxConnections(context.getMaxConcurrentTasks()); + + if (this.getSupportedPropertyDescriptors().contains(SSL_CONTEXT_SERVICE)) { + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + if (sslContextService != null) { + final TrustManager[] trustManagers = new TrustManager[] { sslContextService.createTrustManager() }; + final TlsKeyManagersProvider keyManagersProvider = FileStoreTlsKeyManagersProvider + .create(Path.of(sslContextService.getKeyStoreFile()), sslContextService.getKeyStoreType(), sslContextService.getKeyStorePassword()); + builder.tlsTrustManagersProvider(() -> trustManagers); + builder.tlsKeyManagersProvider(keyManagersProvider); + } + } + + final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> { + if (context.getProperty(PROXY_HOST).isSet()) { + final ProxyConfiguration componentProxyConfig = new ProxyConfiguration(); + final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue(); + final Integer proxyPort = context.getProperty(PROXY_HOST_PORT).evaluateAttributeExpressions().asInteger(); + final String proxyUsername = context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue(); + final String proxyPassword = context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue(); + componentProxyConfig.setProxyType(Proxy.Type.HTTP); + componentProxyConfig.setProxyServerHost(proxyHost); + componentProxyConfig.setProxyServerPort(proxyPort); + componentProxyConfig.setProxyUserName(proxyUsername); + componentProxyConfig.setProxyUserPassword(proxyPassword); + return componentProxyConfig; + } else if (context.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet()) { + final ProxyConfigurationService configurationService = context.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class); + return configurationService.getConfiguration(); + } + return ProxyConfiguration.DIRECT_CONFIGURATION; + }); + + if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) { + final software.amazon.awssdk.http.apache.ProxyConfiguration.Builder proxyConfigBuilder = software.amazon.awssdk.http.apache.ProxyConfiguration.builder() + .endpoint(URI.create(String.format("%s:%s", proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort()))); + + if (proxyConfig.hasCredential()) { + proxyConfigBuilder.username(proxyConfig.getProxyUserName()); + proxyConfigBuilder.password(proxyConfig.getProxyUserPassword()); + } + builder.proxyConfiguration(proxyConfigBuilder.build()); + } + + return builder.build(); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientCache.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientCache.java new file mode 100644 index 0000000000..4d98583f55 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientCache.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.v2; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.processor.ProcessContext; +import software.amazon.awssdk.core.SdkClient; + +public class AwsClientCache { + + private final Cache clientCache = Caffeine.newBuilder().build(); + + public T getOrCreateClient(final ProcessContext context, final AwsClientDetails clientDetails, final AwsClientProvider provider) { + return clientCache.get(clientDetails, ignored -> provider.createClient(context)); + } + + public void clearCache() { + clientCache.invalidateAll(); + clientCache.cleanUp(); + } + +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientDetails.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientDetails.java new file mode 100644 index 0000000000..b8148d6093 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientDetails.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.v2; + +import software.amazon.awssdk.regions.Region; + +import java.util.Objects; + +/** + * This class contains the AWS client details used to distinguish between the various AWS clients stored in the cache. + * The class acts as a cache key for @link AwsClientCache. + * AwsClientDetails contains the region only, since actually the region value may come from the FlowFile attributes. + */ +public class AwsClientDetails { + private Region region; + + public AwsClientDetails(Region region) { + this.region = region; + } + + public Region getRegion() { + return region; + } + + public void setRegion(final Region region) { + this.region = region; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AwsClientDetails that = (AwsClientDetails) o; + return Objects.equals(region, that.region); + } + + @Override + public int hashCode() { + return Objects.hash(region); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientProvider.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientProvider.java new file mode 100644 index 0000000000..bea56727f8 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientProvider.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.v2; + +import org.apache.nifi.processor.ProcessContext; +import software.amazon.awssdk.core.SdkClient; + +public interface AwsClientProvider { + + /** + * Creates an AWS client using process context and AWS client details. + * + * @param context process context + * @return AWS client + */ + T createClient(final ProcessContext context); +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtil.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtil.java new file mode 100644 index 0000000000..45814f5c93 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtil.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.v2; + +import org.apache.nifi.components.AllowableValue; +import software.amazon.awssdk.regions.Region; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +/** + * Utility class for AWS region methods. + * + */ +public abstract class RegionUtil { + + /** + * Creates an AllowableValue from a Region. + * @param region An AWS region + * @return An AllowableValue for the region + */ + public static AllowableValue createAllowableValue(final Region region) { + final String description = region.metadata() != null ? region.metadata().description() : region.id(); + return new AllowableValue(region.id(), description, "AWS Region Code : " + region.id()); + } + + /** + * + * @return All available regions as AllowableValues. + */ + public static AllowableValue[] getAvailableRegions() { + final List values = new ArrayList<>(); + for (final Region region : Region.regions()) { + values.add(createAllowableValue(region)); + } + Collections.sort(values, Comparator.comparing(AllowableValue::getDisplayName)); + return values.toArray(new AllowableValue[0]); + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java index 6bc858cfb8..e5e3cc87b3 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java @@ -16,11 +16,6 @@ */ package org.apache.nifi.processors.aws.sqs; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -35,11 +30,14 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; -import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; -import com.amazonaws.services.sqs.model.DeleteMessageBatchResult; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; @SupportsBatching @SeeAlso({GetSQS.class, PutSQS.class}) @@ -75,25 +73,26 @@ public class DeleteSQS extends AbstractSQSProcessor { final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue(); - final AmazonSQSClient client = getClient(context); - final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(); - request.setQueueUrl(queueUrl); + final SqsClient client = getClient(context); - final List entries = new ArrayList<>(); - final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(); - String receiptHandle = context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue(); - entry.setReceiptHandle(receiptHandle); - String entryId = flowFile.getAttribute(CoreAttributes.UUID.key()); - entry.setId(entryId); - entries.add(entry); - request.setEntries(entries); + final String receiptHandle = context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue(); + final String entryId = flowFile.getAttribute(CoreAttributes.UUID.key()); + final DeleteMessageBatchRequestEntry entry = DeleteMessageBatchRequestEntry.builder() + .receiptHandle(receiptHandle) + .id(entryId) + .build(); + + final DeleteMessageBatchRequest request = DeleteMessageBatchRequest.builder() + .queueUrl(queueUrl) + .entries(entry) + .build(); try { - DeleteMessageBatchResult response = client.deleteMessageBatch(request); + DeleteMessageBatchResponse response = client.deleteMessageBatch(request); // check for errors - if (!response.getFailed().isEmpty()) { - throw new ProcessException(response.getFailed().get(0).toString()); + if (!response.failed().isEmpty()) { + throw new ProcessException(response.failed().get(0).toString()); } getLogger().info("Successfully deleted message from SQS for {}", new Object[] { flowFile }); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java index 1ffb68d023..e6f197829b 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java @@ -16,13 +16,7 @@ */ package org.apache.nifi.processors.aws.sqs; -import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; -import com.amazonaws.services.sqs.model.Message; -import com.amazonaws.services.sqs.model.MessageAttributeValue; -import com.amazonaws.services.sqs.model.ReceiveMessageRequest; -import com.amazonaws.services.sqs.model.ReceiveMessageResult; + import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -38,11 +32,17 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; +import software.amazon.awssdk.services.sqs.model.QueueAttributeName; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; -import java.io.IOException; -import java.io.OutputStream; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; @@ -136,28 +136,29 @@ public class GetSQS extends AbstractSQSProcessor { final String queueUrl = context.getProperty(DYNAMIC_QUEUE_URL).evaluateAttributeExpressions() .getValue(); - final AmazonSQSClient client = getClient(context); + final SqsClient client = getClient(context); - final ReceiveMessageRequest request = new ReceiveMessageRequest(); - request.setAttributeNames(Collections.singleton("All")); - request.setMessageAttributeNames(Collections.singleton("All")); - request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger()); - request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue()); - request.setQueueUrl(queueUrl); - request.setWaitTimeSeconds(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue()); + final ReceiveMessageRequest request = ReceiveMessageRequest.builder() + .attributeNames(QueueAttributeName.ALL) + .messageAttributeNames("All") + .maxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger()) + .visibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue()) + .queueUrl(queueUrl) + .waitTimeSeconds(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue()) + .build(); final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); - final ReceiveMessageResult result; + final ReceiveMessageResponse response; try { - result = client.receiveMessage(request); + response = client.receiveMessage(request); } catch (final Exception e) { getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[]{e}); context.yield(); return; } - final List messages = result.getMessages(); + final List messages = response.messages(); if (messages.isEmpty()) { context.yield(); return; @@ -169,26 +170,21 @@ public class GetSQS extends AbstractSQSProcessor { FlowFile flowFile = session.create(); final Map attributes = new HashMap<>(); - for (final Map.Entry entry : message.getAttributes().entrySet()) { + for (final Map.Entry entry : message.attributes().entrySet()) { attributes.put("sqs." + entry.getKey(), entry.getValue()); } - for (final Map.Entry entry : message.getMessageAttributes().entrySet()) { - attributes.put("sqs." + entry.getKey(), entry.getValue().getStringValue()); + for (final Map.Entry entry : message.messageAttributes().entrySet()) { + attributes.put("sqs." + entry.getKey(), entry.getValue().stringValue()); } - attributes.put("hash.value", message.getMD5OfBody()); + attributes.put("hash.value", message.md5OfBody()); attributes.put("hash.algorithm", "md5"); - attributes.put("sqs.message.id", message.getMessageId()); - attributes.put("sqs.receipt.handle", message.getReceiptHandle()); + attributes.put("sqs.message.id", message.messageId()); + attributes.put("sqs.receipt.handle", message.receiptHandle()); flowFile = session.putAllAttributes(flowFile, attributes); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(message.getBody().getBytes(charset)); - } - }); + flowFile = session.write(flowFile, out -> out.write(message.body().getBytes(charset))); session.transfer(flowFile, REL_SUCCESS); session.getProvenanceReporter().receive(flowFile, queueUrl); @@ -203,18 +199,19 @@ public class GetSQS extends AbstractSQSProcessor { } } - private void deleteMessages(final AmazonSQSClient client, final String queueUrl, final List messages) { - final DeleteMessageBatchRequest deleteRequest = new DeleteMessageBatchRequest(); - deleteRequest.setQueueUrl(queueUrl); + private void deleteMessages(final SqsClient client, final String queueUrl, final List messages) { final List deleteRequestEntries = new ArrayList<>(); for (final Message message : messages) { - final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(); - entry.setId(message.getMessageId()); - entry.setReceiptHandle(message.getReceiptHandle()); + final DeleteMessageBatchRequestEntry entry = DeleteMessageBatchRequestEntry.builder() + .id(message.messageId()) + .receiptHandle(message.receiptHandle()) + .build(); deleteRequestEntries.add(entry); } - - deleteRequest.setEntries(deleteRequestEntries); + final DeleteMessageBatchRequest deleteRequest = DeleteMessageBatchRequest.builder() + .queueUrl(queueUrl) + .entries(deleteRequestEntries) + .build(); try { client.deleteMessageBatch(deleteRequest); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java index a72d1b222b..6667673cbd 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java @@ -16,17 +16,6 @@ */ package org.apache.nifi.processors.aws.sqs; -import java.io.ByteArrayOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -38,16 +27,25 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; -import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.MessageAttributeValue; -import com.amazonaws.services.sqs.model.SendMessageBatchRequest; -import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; -import com.amazonaws.services.sqs.model.SendMessageBatchResult; +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; @SupportsBatching @SeeAlso({ GetSQS.class, DeleteSQS.class }) @@ -58,6 +56,7 @@ import com.amazonaws.services.sqs.model.SendMessageBatchResult; description = "Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of " + "the Message Attribute and value will become the value of the Message Attribute", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) public class PutSQS extends AbstractSQSProcessor { + private static final String STRING_DATA_TYPE = "String"; public static final PropertyDescriptor DELAY = new PropertyDescriptor.Builder() .name("Delay") @@ -127,53 +126,43 @@ public class PutSQS extends AbstractSQSProcessor { } final long startNanos = System.nanoTime(); - final AmazonSQSClient client = getClient(context); - final SendMessageBatchRequest request = new SendMessageBatchRequest(); + final SqsClient client = getClient(context); final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue(); - request.setQueueUrl(queueUrl); - final Set entries = new HashSet<>(); - - final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry(); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); session.exportTo(flowFile, baos); final String flowFileContent = baos.toString(); - entry.setMessageBody(flowFileContent); - entry.setId(flowFile.getAttribute("uuid")); - - if (context.getProperty(MESSAGEGROUPID).isSet()) { - entry.setMessageGroupId(context.getProperty(MESSAGEGROUPID) - .evaluateAttributeExpressions(flowFile) - .getValue()); - } - - if (context.getProperty(MESSAGEDEDUPLICATIONID).isSet()) { - entry.setMessageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID) - .evaluateAttributeExpressions(flowFile) - .getValue()); - } final Map messageAttributes = new HashMap<>(); for (final PropertyDescriptor descriptor : userDefinedProperties) { - final MessageAttributeValue mav = new MessageAttributeValue(); - mav.setDataType("String"); - mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue()); + final MessageAttributeValue mav = MessageAttributeValue.builder() + .dataType(STRING_DATA_TYPE) + .stringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue()) + .build(); messageAttributes.put(descriptor.getName(), mav); } - entry.setMessageAttributes(messageAttributes); - entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue()); - entries.add(entry); + final SendMessageBatchRequestEntry entry = SendMessageBatchRequestEntry.builder() + .messageBody(flowFileContent) + .messageGroupId(context.getProperty(MESSAGEGROUPID).evaluateAttributeExpressions(flowFile).getValue()) + .messageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID).evaluateAttributeExpressions(flowFile).getValue()) + .id(flowFile.getAttribute(CoreAttributes.UUID.key())) + .messageAttributes(messageAttributes) + .delaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue()) + .build(); - request.setEntries(entries); + final SendMessageBatchRequest request = SendMessageBatchRequest.builder() + .queueUrl(queueUrl) + .entries(entry) + .build(); try { - SendMessageBatchResult response = client.sendMessageBatch(request); + SendMessageBatchResponse response = client.sendMessageBatch(request); // check for errors - if (!response.getFailed().isEmpty()) { - throw new ProcessException(response.getFailed().get(0).toString()); + if (!response.failed().isEmpty()) { + throw new ProcessException(response.failed().get(0).toString()); } } catch (final Exception e) { getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[]{e}); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java index 301222488c..c8752e9eb1 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java @@ -16,16 +16,18 @@ */ package org.apache.nifi.processors.aws.sqs; -import com.amazonaws.auth.PropertiesCredentials; -import com.amazonaws.regions.Regions; -import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.Message; -import com.amazonaws.services.sqs.model.ReceiveMessageResult; -import com.amazonaws.services.sqs.model.SendMessageResult; +import org.apache.nifi.processors.aws.credentials.provider.PropertiesCredentialsProvider; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; import java.io.File; import java.io.IOException; @@ -39,26 +41,34 @@ public class ITDeleteSQS { private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; private final String TEST_QUEUE_URL = "https://sqs.us-west-2.amazonaws.com/123456789012/nifi-test-queue"; private final String TEST_REGION = "us-west-2"; - AmazonSQSClient sqsClient = null; + SqsClient sqsClient = null; @BeforeEach public void setUp() throws IOException { - PropertiesCredentials credentials = new PropertiesCredentials(new File(CREDENTIALS_FILE)); - sqsClient = new AmazonSQSClient(credentials); - sqsClient.withRegion(Regions.fromName(TEST_REGION)); + sqsClient = SqsClient.builder() + .region(Region.of(TEST_REGION)) + .credentialsProvider(() -> new PropertiesCredentialsProvider(new File(CREDENTIALS_FILE)).resolveCredentials()) + .build(); } @Test public void testSimpleDelete() { // Setup - put one message in queue - SendMessageResult sendMessageResult = sqsClient.sendMessage(TEST_QUEUE_URL, "Test message"); - assertEquals(200, sendMessageResult.getSdkHttpMetadata().getHttpStatusCode()); + final SendMessageRequest sendMessageRequest = SendMessageRequest.builder() + .queueUrl(TEST_QUEUE_URL) + .messageBody("Test message") + .build(); + SendMessageResponse sendMessageResult = sqsClient.sendMessage(sendMessageRequest); + assertEquals(200, sendMessageResult.sdkHttpResponse().statusCode()); // Setup - receive message to get receipt handle - ReceiveMessageResult receiveMessageResult = sqsClient.receiveMessage(TEST_QUEUE_URL); - assertEquals(200, receiveMessageResult.getSdkHttpMetadata().getHttpStatusCode()); - Message deleteMessage = receiveMessageResult.getMessages().get(0); - String receiptHandle = deleteMessage.getReceiptHandle(); + final ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() + .queueUrl(TEST_QUEUE_URL) + .build(); + ReceiveMessageResponse receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest); + assertEquals(200, receiveMessageResult.sdkHttpResponse().statusCode()); + Message deleteMessage = receiveMessageResult.messages().get(0); + String receiptHandle = deleteMessage.receiptHandle(); // Test - delete message with DeleteSQS final TestRunner runner = TestRunners.newTestRunner(new DeleteSQS()); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITPutSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITPutSQS.java index 2b54350c43..d63a8605ad 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITPutSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITPutSQS.java @@ -16,12 +16,12 @@ */ package org.apache.nifi.processors.aws.sqs; -import com.amazonaws.regions.Regions; import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.regions.Region; import java.io.IOException; import java.nio.file.Paths; @@ -34,7 +34,7 @@ public class ITPutSQS { private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; private final String QUEUE_URL = "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000"; - private final String REGION = Regions.US_WEST_2.getName(); + private final String REGION = Region.US_WEST_2.id(); private final String VPCE_QUEUE_URL = "https://vpce-1234567890abcdefg-12345678.sqs.us-west-2.vpce.amazonaws.com/123456789012/test-queue"; private final String VPCE_ENDPOINT_OVERRIDE = "https://vpce-1234567890abcdefg-12345678.sqs.us-west-2.vpce.amazonaws.com"; @@ -91,7 +91,7 @@ public class ITPutSQS { final TestRunner runner = TestRunners.newTestRunner(new PutSQS()); runner.setProperty(PutSQS.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); - runner.setProperty(PutSQS.REGION, Regions.US_WEST_2.getName()); + runner.setProperty(PutSQS.REGION, Region.US_WEST_2.id()); runner.setProperty(PutSQS.QUEUE_URL, VPCE_QUEUE_URL); runner.setProperty(PutSQS.ENDPOINT_OVERRIDE, VPCE_ENDPOINT_OVERRIDE); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java index 744f47876d..0ee3b13da0 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java @@ -16,10 +16,6 @@ */ package org.apache.nifi.processors.aws.sqs; -import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.AmazonSQSException; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; -import com.amazonaws.services.sqs.model.DeleteMessageBatchResult; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -27,8 +23,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; -import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -39,18 +38,19 @@ public class TestDeleteSQS { private TestRunner runner = null; private DeleteSQS mockDeleteSQS = null; - private AmazonSQSClient mockSQSClient = null; + private SqsClient mockSQSClient = null; @BeforeEach public void setUp() { - mockSQSClient = Mockito.mock(AmazonSQSClient.class); - DeleteMessageBatchResult mockResponse = Mockito.mock(DeleteMessageBatchResult.class); - Mockito.when(mockSQSClient.deleteMessageBatch(Mockito.any())).thenReturn(mockResponse); - Mockito.when(mockResponse.getFailed()).thenReturn(new ArrayList<>()); + mockSQSClient = Mockito.mock(SqsClient.class); + DeleteMessageBatchResponse mockResponse = DeleteMessageBatchResponse.builder() + .failed(Collections.emptyList()) + .build(); + Mockito.when(mockSQSClient.deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class))).thenReturn(mockResponse); mockDeleteSQS = new DeleteSQS() { @Override - protected AmazonSQSClient getClient(ProcessContext context) { + protected SqsClient getClient(ProcessContext context) { return mockSQSClient; } }; @@ -71,8 +71,8 @@ public class TestDeleteSQS { ArgumentCaptor captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture()); DeleteMessageBatchRequest deleteRequest = captureDeleteRequest.getValue(); - assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", deleteRequest.getQueueUrl()); - assertEquals("test-receipt-handle-1", deleteRequest.getEntries().get(0).getReceiptHandle()); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", deleteRequest.queueUrl()); + assertEquals("test-receipt-handle-1", deleteRequest.entries().get(0).receiptHandle()); runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1); } @@ -92,7 +92,7 @@ public class TestDeleteSQS { ArgumentCaptor captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture()); DeleteMessageBatchRequest deleteRequest = captureDeleteRequest.getValue(); - assertEquals("test-receipt-handle-1", deleteRequest.getEntries().get(0).getReceiptHandle()); + assertEquals("test-receipt-handle-1", deleteRequest.entries().get(0).receiptHandle()); runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1); } @@ -105,7 +105,7 @@ public class TestDeleteSQS { ff1Attributes.put("sqs.receipt.handle", "test-receipt-handle-1"); runner.enqueue("TestMessageBody1", ff1Attributes); Mockito.when(mockSQSClient.deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class))) - .thenThrow(new AmazonSQSException("TestFail")); + .thenThrow(new RuntimeException()); runner.assertValid(); runner.run(1); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java index edef28a62c..908caf18b8 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java @@ -16,12 +16,6 @@ */ package org.apache.nifi.processors.aws.sqs; -import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; -import com.amazonaws.services.sqs.model.Message; -import com.amazonaws.services.sqs.model.MessageAttributeValue; -import com.amazonaws.services.sqs.model.ReceiveMessageRequest; -import com.amazonaws.services.sqs.model.ReceiveMessageResult; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -30,8 +24,17 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -40,14 +43,14 @@ public class TestGetSQS { private TestRunner runner = null; private GetSQS mockGetSQS = null; - private AmazonSQSClient mockSQSClient = null; + private SqsClient mockSQSClient = null; @BeforeEach public void setUp() { - mockSQSClient = Mockito.mock(AmazonSQSClient.class); + mockSQSClient = Mockito.mock(SqsClient.class); mockGetSQS = new GetSQS() { @Override - protected AmazonSQSClient getClient(ProcessContext context) { + protected SqsClient getClient(ProcessContext context) { return mockSQSClient; } }; @@ -59,17 +62,24 @@ public class TestGetSQS { runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); runner.setProperty(GetSQS.AUTO_DELETE, "false"); - Message message1 = new Message(); - message1.setBody("TestMessage1"); - message1.addAttributesEntry("attrib-key-1", "attrib-value-1"); - MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); - messageAttributeValue.setStringValue("msg-attrib-value-1"); - message1.addMessageAttributesEntry("msg-attrib-key-1", messageAttributeValue); - message1.setMD5OfBody("test-md5-hash-1"); - message1.setMessageId("test-message-id-1"); - message1.setReceiptHandle("test-receipt-handle-1"); - ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult() - .withMessages(message1); + final Map attributes = new HashMap<>(); + final Map messageAttributes = new HashMap<>(); + MessageAttributeValue messageAttributeValue = MessageAttributeValue.builder() + .stringValue("msg-attrib-value-1").build(); + messageAttributes.put("msg-attrib-key-1", messageAttributeValue); + attributes.put("attrib-key-1", "attrib-value-1"); // This attribute is no longer valid in SDK v2 + attributes.put(MessageSystemAttributeName.MESSAGE_GROUP_ID.toString(), "attrib-value-1"); // However, this one is allowed + Message message1 = Message.builder() + .body("TestMessage1") + .attributesWithStrings(attributes) + .messageAttributes(messageAttributes) + .md5OfBody("test-md5-hash-1") + .messageId("test-message-id-1") + .receiptHandle("test-receipt-handle-1") + .build(); + ReceiveMessageResponse receiveMessageResult = ReceiveMessageResponse.builder() + .messages(message1) + .build(); Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); runner.run(1); @@ -77,13 +87,14 @@ public class TestGetSQS { ArgumentCaptor captureRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class); Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureRequest.capture()); ReceiveMessageRequest request = captureRequest.getValue(); - assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl()); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.queueUrl()); Mockito.verify(mockSQSClient, Mockito.never()).deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class)); runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 1); List flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); MockFlowFile ff0 = flowFiles.get(0); - ff0.assertAttributeEquals("sqs.attrib-key-1", "attrib-value-1"); + ff0.assertAttributeNotExists("sqs.attrib-key-1"); + ff0.assertAttributeEquals("sqs.MessageGroupId", "attrib-value-1"); ff0.assertAttributeEquals("sqs.msg-attrib-key-1", "msg-attrib-value-1"); ff0.assertAttributeEquals("hash.value", "test-md5-hash-1"); ff0.assertAttributeEquals("hash.algorithm", "md5"); @@ -94,7 +105,7 @@ public class TestGetSQS { @Test public void testGetNoMessages() { runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); - ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(); + ReceiveMessageResponse receiveMessageResult = ReceiveMessageResponse.builder().build(); Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); runner.run(1); @@ -102,7 +113,7 @@ public class TestGetSQS { ArgumentCaptor captureRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class); Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureRequest.capture()); ReceiveMessageRequest request = captureRequest.getValue(); - assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl()); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.queueUrl()); runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 0); } @@ -112,16 +123,19 @@ public class TestGetSQS { runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); runner.setProperty(GetSQS.AUTO_DELETE, "true"); - Message message1 = new Message(); - message1.setBody("TestMessage1"); - message1.setMessageId("test-message-id-1"); - message1.setReceiptHandle("test-receipt-handle-1"); - Message message2 = new Message(); - message2.setBody("TestMessage2"); - message2.setMessageId("test-message-id-2"); - message2.setReceiptHandle("test-receipt-handle-2"); - ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult() - .withMessages(message1, message2); + Message message1 = Message.builder() + .body("TestMessage1") + .messageId("test-message-id-1") + .receiptHandle("test-receipt-handle-1") + .build(); + Message message2 = Message.builder() + .body("TestMessage2") + .messageId("test-message-id-2") + .receiptHandle("test-receipt-handle-2") + .build(); + ReceiveMessageResponse receiveMessageResult = ReceiveMessageResponse.builder() + .messages(message1, message2) + .build(); Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); runner.run(1); @@ -129,14 +143,14 @@ public class TestGetSQS { ArgumentCaptor captureReceiveRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class); Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureReceiveRequest.capture()); ReceiveMessageRequest receiveRequest = captureReceiveRequest.getValue(); - assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", receiveRequest.getQueueUrl()); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", receiveRequest.queueUrl()); ArgumentCaptor captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture()); DeleteMessageBatchRequest deleteRequest = captureDeleteRequest.getValue(); - assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", deleteRequest.getQueueUrl()); - assertEquals("test-message-id-1", deleteRequest.getEntries().get(0).getId()); - assertEquals("test-message-id-2", deleteRequest.getEntries().get(1).getId()); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", deleteRequest.queueUrl()); + assertEquals("test-message-id-1", deleteRequest.entries().get(0).id()); + assertEquals("test-message-id-2", deleteRequest.entries().get(1).id()); runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 2); List flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java index 9f23423e1c..882745bc79 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java @@ -16,10 +16,6 @@ */ package org.apache.nifi.processors.aws.sqs; -import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.AmazonSQSException; -import com.amazonaws.services.sqs.model.SendMessageBatchRequest; -import com.amazonaws.services.sqs.model.SendMessageBatchResult; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -27,6 +23,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; import java.util.HashMap; import java.util.Map; @@ -39,14 +38,14 @@ public class TestPutSQS { private TestRunner runner = null; private PutSQS mockPutSQS = null; - private AmazonSQSClient mockSQSClient = null; + private SqsClient mockSQSClient = null; @BeforeEach public void setUp() { - mockSQSClient = Mockito.mock(AmazonSQSClient.class); + mockSQSClient = Mockito.mock(SqsClient.class); mockPutSQS = new PutSQS() { @Override - protected AmazonSQSClient getClient(ProcessContext context) { + protected SqsClient getClient(ProcessContext context) { return mockSQSClient; } }; @@ -62,7 +61,7 @@ public class TestPutSQS { attrs.put("filename", "1.txt"); runner.enqueue("TestMessageBody", attrs); - SendMessageBatchResult batchResult = new SendMessageBatchResult(); + SendMessageBatchResponse batchResult = SendMessageBatchResponse.builder().build(); Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenReturn(batchResult); runner.run(1); @@ -70,9 +69,9 @@ public class TestPutSQS { ArgumentCaptor captureRequest = ArgumentCaptor.forClass(SendMessageBatchRequest.class); Mockito.verify(mockSQSClient, Mockito.times(1)).sendMessageBatch(captureRequest.capture()); SendMessageBatchRequest request = captureRequest.getValue(); - assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl()); - assertEquals("hello", request.getEntries().get(0).getMessageAttributes().get("x-custom-prop").getStringValue()); - assertEquals("TestMessageBody", request.getEntries().get(0).getMessageBody()); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.queueUrl()); + assertEquals("hello", request.entries().get(0).messageAttributes().get("x-custom-prop").stringValue()); + assertEquals("TestMessageBody", request.entries().get(0).messageBody()); runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1); } @@ -85,15 +84,15 @@ public class TestPutSQS { attrs.put("filename", "1.txt"); runner.enqueue("TestMessageBody", attrs); - Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenThrow(new AmazonSQSException("TestFail")); + Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenThrow(new RuntimeException()); runner.run(1); ArgumentCaptor captureRequest = ArgumentCaptor.forClass(SendMessageBatchRequest.class); Mockito.verify(mockSQSClient, Mockito.times(1)).sendMessageBatch(captureRequest.capture()); SendMessageBatchRequest request = captureRequest.getValue(); - assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl()); - assertEquals("TestMessageBody", request.getEntries().get(0).getMessageBody()); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.queueUrl()); + assertEquals("TestMessageBody", request.entries().get(0).messageBody()); runner.assertAllFlowFilesTransferred(PutSQS.REL_FAILURE, 1); } @@ -110,7 +109,7 @@ public class TestPutSQS { attrs.put("myuuid", "fb0dfed8-092e-40ee-83ce-5b576cd26236"); runner.enqueue("TestMessageBody", attrs); - SendMessageBatchResult batchResult = new SendMessageBatchResult(); + final SendMessageBatchResponse batchResult = SendMessageBatchResponse.builder().build(); Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenReturn(batchResult); runner.run(1); @@ -118,11 +117,11 @@ public class TestPutSQS { ArgumentCaptor captureRequest = ArgumentCaptor.forClass(SendMessageBatchRequest.class); Mockito.verify(mockSQSClient, Mockito.times(1)).sendMessageBatch(captureRequest.capture()); SendMessageBatchRequest request = captureRequest.getValue(); - assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl()); - assertEquals("hello", request.getEntries().get(0).getMessageAttributes().get("x-custom-prop").getStringValue()); - assertEquals("TestMessageBody", request.getEntries().get(0).getMessageBody()); - assertEquals("test1234", request.getEntries().get(0).getMessageGroupId()); - assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", request.getEntries().get(0).getMessageDeduplicationId()); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.queueUrl()); + assertEquals("hello", request.entries().get(0).messageAttributes().get("x-custom-prop").stringValue()); + assertEquals("TestMessageBody", request.entries().get(0).messageBody()); + assertEquals("test1234", request.entries().get(0).messageGroupId()); + assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", request.entries().get(0).messageDeduplicationId()); runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1); } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/pom.xml index cc8968f6e0..553fce66ec 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/pom.xml +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/pom.xml @@ -32,6 +32,10 @@ software.amazon.awssdk auth + + software.amazon.awssdk + apache-client + com.amazonaws aws-java-sdk-s3