NIFI-8287 Upgraded SQS Processors to use AWS SDK 2

This closes #7211

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Joe Gresock 2023-04-24 16:10:47 -04:00 committed by exceptionfactory
parent 7a10ba6165
commit 3ef4439879
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
17 changed files with 861 additions and 236 deletions

View File

@ -25,6 +25,24 @@
<artifactId>nifi-aws-abstract-processors</artifactId>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
<exclusions>
<exclusion>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
@ -48,14 +66,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>

View File

@ -20,14 +20,11 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import org.apache.nifi.processors.aws.v2.AbstractAwsProcessor;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsClientBuilder;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.sqs.AmazonSQSClient;
public abstract class AbstractSQSProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonSQSClient> {
public abstract class AbstractSQSProcessor extends AbstractAwsProcessor<SqsClient, SqsClientBuilder> {
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
@ -45,26 +42,9 @@ public abstract class AbstractSQSProcessor extends AbstractAWSCredentialsProvide
.required(true)
.build();
/**
* Create client using credentials provider. This is the preferred way for creating clients
*/
@Override
protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
getLogger().info("Creating client using aws credentials provider ");
return new AmazonSQSClient(credentialsProvider, config);
}
/**
* Create client using AWSCredentials
*
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
*/
@Override
protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
getLogger().info("Creating client using aws credentials ");
return new AmazonSQSClient(credentials, config);
protected SqsClientBuilder createClientBuilder(final ProcessContext context) {
return SqsClient.builder();
}
}

View File

@ -0,0 +1,440 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.v2;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.credentials.provider.PropertiesCredentialsProvider;
import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.ssl.SSLContextService;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
import software.amazon.awssdk.core.SdkClient;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.http.FileStoreTlsKeyManagersProvider;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.TlsKeyManagersProvider;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.regions.Region;
import javax.net.ssl.TrustManager;
import java.io.File;
import java.net.Proxy;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* Base class for aws processors using the AWS v2 SDK.
*
* @param <T> client type
*
* @see <a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/auth/credentials/AwsCredentialsProvider.html">AwsCredentialsProvider</a>
*/
public abstract class AbstractAwsProcessor<T extends SdkClient, U extends AwsSyncClientBuilder<U, T> & AwsClientBuilder<U, T>>
extends AbstractProcessor implements VerifiableProcessor, AwsClientProvider<T> {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are routed to success relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles are routed to failure relationship")
.build();
private static final Set<Relationship> relationships = Collections.unmodifiableSet(
new LinkedHashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))
);
public static final PropertyDescriptor CREDENTIALS_FILE = CredentialPropertyDescriptors.CREDENTIALS_FILE;
public static final PropertyDescriptor ACCESS_KEY = CredentialPropertyDescriptors.ACCESS_KEY;
public static final PropertyDescriptor SECRET_KEY = CredentialPropertyDescriptors.SECRET_KEY;
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
.name("Proxy Host")
.description("Proxy host name or IP")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROXY_HOST_PORT = new PropertyDescriptor.Builder()
.name("Proxy Host Port")
.description("Proxy host port")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor PROXY_USERNAME = new PropertyDescriptor.Builder()
.name("proxy-user-name")
.displayName("Proxy Username")
.description("Proxy username")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROXY_PASSWORD = new PropertyDescriptor.Builder()
.name("proxy-user-password")
.displayName("Proxy Password")
.description("Proxy password")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
.name("Region")
.required(true)
.allowableValues(RegionUtil.getAvailableRegions())
.defaultValue(RegionUtil.createAllowableValue(Region.US_WEST_2).getValue())
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 secs")
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
.name("Endpoint Override URL")
.description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " +
"The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " +
"the selected endpoint URL, allowing use with other S3-compatible endpoints.")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
/**
* AWS credentials provider service
*
* @see <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a>
* @see <a href="https://sdk.amazonaws.com/java/api/2.0.0/software/amazon/awssdk/auth/credentials/AwsCredentialsProvider.html">AwsCredentialsProvider</a>
*/
public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
.name("AWS Credentials Provider service")
.displayName("AWS Credentials Provider Service")
.description("The Controller Service that is used to obtain AWS credentials provider")
.required(false)
.identifiesControllerService(AWSCredentialsProviderService.class)
.build();
protected static final String DEFAULT_USER_AGENT = "NiFi";
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
protected volatile T client;
protected volatile Region region;
private final AwsClientCache<T> awsClientCache = new AwsClientCache<>();
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet();
final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet();
if ((accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet)) {
validationResults.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("If setting Secret Key or Access Key, must set both").build());
}
final boolean credentialsFileSet = validationContext.getProperty(CREDENTIALS_FILE).isSet();
if ((secretKeySet || accessKeySet) && credentialsFileSet) {
validationResults.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build());
}
final boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet();
final boolean proxyPortSet = validationContext.getProperty(PROXY_HOST_PORT).isSet();
final boolean proxyConfigServiceSet = validationContext.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet();
if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
validationResults.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build());
}
final boolean proxyUserSet = validationContext.getProperty(PROXY_USERNAME).isSet();
final boolean proxyPwdSet = validationContext.getProperty(PROXY_PASSWORD).isSet();
if ((proxyUserSet && !proxyPwdSet) || (!proxyUserSet && proxyPwdSet)) {
validationResults.add(new ValidationResult.Builder().subject("Proxy User and Password").valid(false).explanation("If Proxy Username or Proxy Password is set, both must be set").build());
}
if (proxyUserSet && !proxyHostSet) {
validationResults.add(new ValidationResult.Builder().subject("Proxy").valid(false).explanation("If Proxy Username or Proxy Password").build());
}
ProxyConfiguration.validateProxySpec(validationContext, validationResults, PROXY_SPECS);
if (proxyHostSet && proxyConfigServiceSet) {
validationResults.add(new ValidationResult.Builder().subject("Proxy Configuration Service").valid(false)
.explanation("Either Proxy Username and Proxy Password must be set or Proxy Configuration Service but not both").build());
}
return validationResults;
}
@OnShutdown
public void onShutDown() {
if (this.client != null) {
this.client.close();
}
}
@OnStopped
public void onStopped() {
this.awsClientCache.clearCache();
}
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
final List<ConfigVerificationResult> results = new ArrayList<>();
try {
createClient(context);
results.add(new ConfigVerificationResult.Builder()
.outcome(Outcome.SUCCESSFUL)
.verificationStepName("Create Client")
.explanation("Successfully created AWS Client")
.build());
} catch (final Exception e) {
verificationLogger.error("Failed to create AWS Client", e);
results.add(new ConfigVerificationResult.Builder()
.outcome(Outcome.FAILED)
.verificationStepName("Create Client")
.explanation("Failed to crete AWS Client: " + e.getMessage())
.build());
}
return results;
}
/**
* Creates the AWS SDK client.
* @param context The process context
* @return The created client
*/
@Override
public T createClient(final ProcessContext context) {
final U clientBuilder = createClientBuilder(context);
this.configureClientBuilder(clientBuilder, context);
return clientBuilder.build();
}
protected void configureClientBuilder(final U clientBuilder, final ProcessContext context) {
clientBuilder.overrideConfiguration(builder -> builder.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, DEFAULT_USER_AGENT));
clientBuilder.overrideConfiguration(builder -> builder.retryPolicy(RetryPolicy.none()));
clientBuilder.httpClient(createSdkHttpClient(context));
final Region region = getRegion(context);
if (region != null) {
clientBuilder.region(region);
}
configureEndpoint(context, clientBuilder);
final AwsCredentialsProvider credentialsProvider = getCredentialsProvider(context);
clientBuilder.credentialsProvider(credentialsProvider);
}
/**
* Creates an AWS service client from the context or returns an existing client from the cache
* @param context The process context
* @param awsClientDetails details of the AWS client
* @return The created client
*/
protected T getClient(final ProcessContext context, final AwsClientDetails awsClientDetails) {
return this.awsClientCache.getOrCreateClient(context, awsClientDetails, this);
}
protected T getClient(final ProcessContext context) {
final AwsClientDetails awsClientDetails = new AwsClientDetails(getRegion(context));
return getClient(context, awsClientDetails);
}
/**
* Construct the AWS SDK client builder and perform any service-specific configuration of the builder.
* @param context The process context
* @return The SDK client builder
*/
protected abstract U createClientBuilder(final ProcessContext context);
protected Region getRegion(final ProcessContext context) {
final Region region;
// if the processor supports REGION, get the configured region.
if (getSupportedPropertyDescriptors().contains(REGION)) {
final String regionValue = context.getProperty(REGION).getValue();
if (regionValue != null) {
region = Region.of(regionValue);
} else {
region = null;
}
} else {
region = null;
}
return region;
}
protected void configureEndpoint(final ProcessContext context, final U clientBuilder) {
// if the endpoint override has been configured, set the endpoint.
// (per Amazon docs this should only be configured at client creation)
if (getSupportedPropertyDescriptors().contains(ENDPOINT_OVERRIDE)) {
final String endpointOverride = StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue());
if (!endpointOverride.isEmpty()) {
getLogger().info("Overriding endpoint with {}", endpointOverride);
clientBuilder.endpointOverride(URI.create(endpointOverride));
}
}
}
/**
* Get credentials provider using the {@link AwsCredentialsProvider}
* @param context the process context
* @return AwsCredentialsProvider the credential provider
*/
protected AwsCredentialsProvider getCredentialsProvider(final ProcessContext context) {
final AWSCredentialsProviderService awsCredentialsProviderService =
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService(AWSCredentialsProviderService.class);
return awsCredentialsProviderService != null ? awsCredentialsProviderService.getAwsCredentialsProvider() : createStaticCredentialsProvider(context);
}
protected AwsCredentialsProvider createStaticCredentialsProvider(final PropertyContext context) {
final String accessKey = context.getProperty(ACCESS_KEY).evaluateAttributeExpressions().getValue();
final String secretKey = context.getProperty(SECRET_KEY).evaluateAttributeExpressions().getValue();
final String credentialsFile = context.getProperty(CREDENTIALS_FILE).getValue();
if (credentialsFile != null) {
return new PropertiesCredentialsProvider(new File(credentialsFile));
}
if (accessKey != null && secretKey != null) {
return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
}
return AnonymousCredentialsProvider.create();
}
private SdkHttpClient createSdkHttpClient(final ProcessContext context) {
final ApacheHttpClient.Builder builder = ApacheHttpClient.builder();
final int communicationsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
builder.connectionTimeout(Duration.ofMillis(communicationsTimeout));
builder.socketTimeout(Duration.ofMillis(communicationsTimeout));
builder.maxConnections(context.getMaxConcurrentTasks());
if (this.getSupportedPropertyDescriptors().contains(SSL_CONTEXT_SERVICE)) {
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
final TrustManager[] trustManagers = new TrustManager[] { sslContextService.createTrustManager() };
final TlsKeyManagersProvider keyManagersProvider = FileStoreTlsKeyManagersProvider
.create(Path.of(sslContextService.getKeyStoreFile()), sslContextService.getKeyStoreType(), sslContextService.getKeyStorePassword());
builder.tlsTrustManagersProvider(() -> trustManagers);
builder.tlsKeyManagersProvider(keyManagersProvider);
}
}
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
if (context.getProperty(PROXY_HOST).isSet()) {
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
final Integer proxyPort = context.getProperty(PROXY_HOST_PORT).evaluateAttributeExpressions().asInteger();
final String proxyUsername = context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue();
final String proxyPassword = context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
componentProxyConfig.setProxyType(Proxy.Type.HTTP);
componentProxyConfig.setProxyServerHost(proxyHost);
componentProxyConfig.setProxyServerPort(proxyPort);
componentProxyConfig.setProxyUserName(proxyUsername);
componentProxyConfig.setProxyUserPassword(proxyPassword);
return componentProxyConfig;
} else if (context.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet()) {
final ProxyConfigurationService configurationService = context.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
return configurationService.getConfiguration();
}
return ProxyConfiguration.DIRECT_CONFIGURATION;
});
if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
final software.amazon.awssdk.http.apache.ProxyConfiguration.Builder proxyConfigBuilder = software.amazon.awssdk.http.apache.ProxyConfiguration.builder()
.endpoint(URI.create(String.format("%s:%s", proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort())));
if (proxyConfig.hasCredential()) {
proxyConfigBuilder.username(proxyConfig.getProxyUserName());
proxyConfigBuilder.password(proxyConfig.getProxyUserPassword());
}
builder.proxyConfiguration(proxyConfigBuilder.build());
}
return builder.build();
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.v2;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.nifi.processor.ProcessContext;
import software.amazon.awssdk.core.SdkClient;
public class AwsClientCache<T extends SdkClient> {
private final Cache<AwsClientDetails, T> clientCache = Caffeine.newBuilder().build();
public T getOrCreateClient(final ProcessContext context, final AwsClientDetails clientDetails, final AwsClientProvider<T> provider) {
return clientCache.get(clientDetails, ignored -> provider.createClient(context));
}
public void clearCache() {
clientCache.invalidateAll();
clientCache.cleanUp();
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.v2;
import software.amazon.awssdk.regions.Region;
import java.util.Objects;
/**
* This class contains the AWS client details used to distinguish between the various AWS clients stored in the cache.
* The class acts as a cache key for @link AwsClientCache.
* AwsClientDetails contains the region only, since actually the region value may come from the FlowFile attributes.
*/
public class AwsClientDetails {
private Region region;
public AwsClientDetails(Region region) {
this.region = region;
}
public Region getRegion() {
return region;
}
public void setRegion(final Region region) {
this.region = region;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final AwsClientDetails that = (AwsClientDetails) o;
return Objects.equals(region, that.region);
}
@Override
public int hashCode() {
return Objects.hash(region);
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.v2;
import org.apache.nifi.processor.ProcessContext;
import software.amazon.awssdk.core.SdkClient;
public interface AwsClientProvider<T extends SdkClient> {
/**
* Creates an AWS client using process context and AWS client details.
*
* @param context process context
* @return AWS client
*/
T createClient(final ProcessContext context);
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.v2;
import org.apache.nifi.components.AllowableValue;
import software.amazon.awssdk.regions.Region;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/**
* Utility class for AWS region methods.
*
*/
public abstract class RegionUtil {
/**
* Creates an AllowableValue from a Region.
* @param region An AWS region
* @return An AllowableValue for the region
*/
public static AllowableValue createAllowableValue(final Region region) {
final String description = region.metadata() != null ? region.metadata().description() : region.id();
return new AllowableValue(region.id(), description, "AWS Region Code : " + region.id());
}
/**
*
* @return All available regions as AllowableValues.
*/
public static AllowableValue[] getAvailableRegions() {
final List<AllowableValue> values = new ArrayList<>();
for (final Region region : Region.regions()) {
values.add(createAllowableValue(region));
}
Collections.sort(values, Comparator.comparing(AllowableValue::getDisplayName));
return values.toArray(new AllowableValue[0]);
}
}

View File

@ -16,11 +16,6 @@
*/
package org.apache.nifi.processors.aws.sqs;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -35,11 +30,14 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@SupportsBatching
@SeeAlso({GetSQS.class, PutSQS.class})
@ -75,25 +73,26 @@ public class DeleteSQS extends AbstractSQSProcessor {
final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
final AmazonSQSClient client = getClient(context);
final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest();
request.setQueueUrl(queueUrl);
final SqsClient client = getClient(context);
final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
String receiptHandle = context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue();
entry.setReceiptHandle(receiptHandle);
String entryId = flowFile.getAttribute(CoreAttributes.UUID.key());
entry.setId(entryId);
entries.add(entry);
request.setEntries(entries);
final String receiptHandle = context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue();
final String entryId = flowFile.getAttribute(CoreAttributes.UUID.key());
final DeleteMessageBatchRequestEntry entry = DeleteMessageBatchRequestEntry.builder()
.receiptHandle(receiptHandle)
.id(entryId)
.build();
final DeleteMessageBatchRequest request = DeleteMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(entry)
.build();
try {
DeleteMessageBatchResult response = client.deleteMessageBatch(request);
DeleteMessageBatchResponse response = client.deleteMessageBatch(request);
// check for errors
if (!response.getFailed().isEmpty()) {
throw new ProcessException(response.getFailed().get(0).toString());
if (!response.failed().isEmpty()) {
throw new ProcessException(response.failed().get(0).toString());
}
getLogger().info("Successfully deleted message from SQS for {}", new Object[] { flowFile });

View File

@ -16,13 +16,7 @@
*/
package org.apache.nifi.processors.aws.sqs;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -38,11 +32,17 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
@ -136,28 +136,29 @@ public class GetSQS extends AbstractSQSProcessor {
final String queueUrl = context.getProperty(DYNAMIC_QUEUE_URL).evaluateAttributeExpressions()
.getValue();
final AmazonSQSClient client = getClient(context);
final SqsClient client = getClient(context);
final ReceiveMessageRequest request = new ReceiveMessageRequest();
request.setAttributeNames(Collections.singleton("All"));
request.setMessageAttributeNames(Collections.singleton("All"));
request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger());
request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue());
request.setQueueUrl(queueUrl);
request.setWaitTimeSeconds(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue());
final ReceiveMessageRequest request = ReceiveMessageRequest.builder()
.attributeNames(QueueAttributeName.ALL)
.messageAttributeNames("All")
.maxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger())
.visibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue())
.queueUrl(queueUrl)
.waitTimeSeconds(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue())
.build();
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
final ReceiveMessageResult result;
final ReceiveMessageResponse response;
try {
result = client.receiveMessage(request);
response = client.receiveMessage(request);
} catch (final Exception e) {
getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[]{e});
context.yield();
return;
}
final List<Message> messages = result.getMessages();
final List<Message> messages = response.messages();
if (messages.isEmpty()) {
context.yield();
return;
@ -169,26 +170,21 @@ public class GetSQS extends AbstractSQSProcessor {
FlowFile flowFile = session.create();
final Map<String, String> attributes = new HashMap<>();
for (final Map.Entry<String, String> entry : message.getAttributes().entrySet()) {
for (final Map.Entry<MessageSystemAttributeName, String> entry : message.attributes().entrySet()) {
attributes.put("sqs." + entry.getKey(), entry.getValue());
}
for (final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet()) {
attributes.put("sqs." + entry.getKey(), entry.getValue().getStringValue());
for (final Map.Entry<String, MessageAttributeValue> entry : message.messageAttributes().entrySet()) {
attributes.put("sqs." + entry.getKey(), entry.getValue().stringValue());
}
attributes.put("hash.value", message.getMD5OfBody());
attributes.put("hash.value", message.md5OfBody());
attributes.put("hash.algorithm", "md5");
attributes.put("sqs.message.id", message.getMessageId());
attributes.put("sqs.receipt.handle", message.getReceiptHandle());
attributes.put("sqs.message.id", message.messageId());
attributes.put("sqs.receipt.handle", message.receiptHandle());
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(message.getBody().getBytes(charset));
}
});
flowFile = session.write(flowFile, out -> out.write(message.body().getBytes(charset)));
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, queueUrl);
@ -203,18 +199,19 @@ public class GetSQS extends AbstractSQSProcessor {
}
}
private void deleteMessages(final AmazonSQSClient client, final String queueUrl, final List<Message> messages) {
final DeleteMessageBatchRequest deleteRequest = new DeleteMessageBatchRequest();
deleteRequest.setQueueUrl(queueUrl);
private void deleteMessages(final SqsClient client, final String queueUrl, final List<Message> messages) {
final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>();
for (final Message message : messages) {
final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
entry.setId(message.getMessageId());
entry.setReceiptHandle(message.getReceiptHandle());
final DeleteMessageBatchRequestEntry entry = DeleteMessageBatchRequestEntry.builder()
.id(message.messageId())
.receiptHandle(message.receiptHandle())
.build();
deleteRequestEntries.add(entry);
}
deleteRequest.setEntries(deleteRequestEntries);
final DeleteMessageBatchRequest deleteRequest = DeleteMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(deleteRequestEntries)
.build();
try {
client.deleteMessageBatch(deleteRequest);

View File

@ -16,17 +16,6 @@
*/
package org.apache.nifi.processors.aws.sqs;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -38,16 +27,25 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageBatchResult;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@SupportsBatching
@SeeAlso({ GetSQS.class, DeleteSQS.class })
@ -58,6 +56,7 @@ import com.amazonaws.services.sqs.model.SendMessageBatchResult;
description = "Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of "
+ "the Message Attribute and value will become the value of the Message Attribute", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
public class PutSQS extends AbstractSQSProcessor {
private static final String STRING_DATA_TYPE = "String";
public static final PropertyDescriptor DELAY = new PropertyDescriptor.Builder()
.name("Delay")
@ -127,53 +126,43 @@ public class PutSQS extends AbstractSQSProcessor {
}
final long startNanos = System.nanoTime();
final AmazonSQSClient client = getClient(context);
final SendMessageBatchRequest request = new SendMessageBatchRequest();
final SqsClient client = getClient(context);
final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
request.setQueueUrl(queueUrl);
final Set<SendMessageBatchRequestEntry> entries = new HashSet<>();
final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos);
final String flowFileContent = baos.toString();
entry.setMessageBody(flowFileContent);
entry.setId(flowFile.getAttribute("uuid"));
if (context.getProperty(MESSAGEGROUPID).isSet()) {
entry.setMessageGroupId(context.getProperty(MESSAGEGROUPID)
.evaluateAttributeExpressions(flowFile)
.getValue());
}
if (context.getProperty(MESSAGEDEDUPLICATIONID).isSet()) {
entry.setMessageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID)
.evaluateAttributeExpressions(flowFile)
.getValue());
}
final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
for (final PropertyDescriptor descriptor : userDefinedProperties) {
final MessageAttributeValue mav = new MessageAttributeValue();
mav.setDataType("String");
mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
final MessageAttributeValue mav = MessageAttributeValue.builder()
.dataType(STRING_DATA_TYPE)
.stringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue())
.build();
messageAttributes.put(descriptor.getName(), mav);
}
entry.setMessageAttributes(messageAttributes);
entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue());
entries.add(entry);
final SendMessageBatchRequestEntry entry = SendMessageBatchRequestEntry.builder()
.messageBody(flowFileContent)
.messageGroupId(context.getProperty(MESSAGEGROUPID).evaluateAttributeExpressions(flowFile).getValue())
.messageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID).evaluateAttributeExpressions(flowFile).getValue())
.id(flowFile.getAttribute(CoreAttributes.UUID.key()))
.messageAttributes(messageAttributes)
.delaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue())
.build();
request.setEntries(entries);
final SendMessageBatchRequest request = SendMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(entry)
.build();
try {
SendMessageBatchResult response = client.sendMessageBatch(request);
SendMessageBatchResponse response = client.sendMessageBatch(request);
// check for errors
if (!response.getFailed().isEmpty()) {
throw new ProcessException(response.getFailed().get(0).toString());
if (!response.failed().isEmpty()) {
throw new ProcessException(response.failed().get(0).toString());
}
} catch (final Exception e) {
getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[]{e});

View File

@ -16,16 +16,18 @@
*/
package org.apache.nifi.processors.aws.sqs;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageResult;
import org.apache.nifi.processors.aws.credentials.provider.PropertiesCredentialsProvider;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
import java.io.File;
import java.io.IOException;
@ -39,26 +41,34 @@ public class ITDeleteSQS {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
private final String TEST_QUEUE_URL = "https://sqs.us-west-2.amazonaws.com/123456789012/nifi-test-queue";
private final String TEST_REGION = "us-west-2";
AmazonSQSClient sqsClient = null;
SqsClient sqsClient = null;
@BeforeEach
public void setUp() throws IOException {
PropertiesCredentials credentials = new PropertiesCredentials(new File(CREDENTIALS_FILE));
sqsClient = new AmazonSQSClient(credentials);
sqsClient.withRegion(Regions.fromName(TEST_REGION));
sqsClient = SqsClient.builder()
.region(Region.of(TEST_REGION))
.credentialsProvider(() -> new PropertiesCredentialsProvider(new File(CREDENTIALS_FILE)).resolveCredentials())
.build();
}
@Test
public void testSimpleDelete() {
// Setup - put one message in queue
SendMessageResult sendMessageResult = sqsClient.sendMessage(TEST_QUEUE_URL, "Test message");
assertEquals(200, sendMessageResult.getSdkHttpMetadata().getHttpStatusCode());
final SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
.queueUrl(TEST_QUEUE_URL)
.messageBody("Test message")
.build();
SendMessageResponse sendMessageResult = sqsClient.sendMessage(sendMessageRequest);
assertEquals(200, sendMessageResult.sdkHttpResponse().statusCode());
// Setup - receive message to get receipt handle
ReceiveMessageResult receiveMessageResult = sqsClient.receiveMessage(TEST_QUEUE_URL);
assertEquals(200, receiveMessageResult.getSdkHttpMetadata().getHttpStatusCode());
Message deleteMessage = receiveMessageResult.getMessages().get(0);
String receiptHandle = deleteMessage.getReceiptHandle();
final ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl(TEST_QUEUE_URL)
.build();
ReceiveMessageResponse receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest);
assertEquals(200, receiveMessageResult.sdkHttpResponse().statusCode());
Message deleteMessage = receiveMessageResult.messages().get(0);
String receiptHandle = deleteMessage.receiptHandle();
// Test - delete message with DeleteSQS
final TestRunner runner = TestRunners.newTestRunner(new DeleteSQS());

View File

@ -16,12 +16,12 @@
*/
package org.apache.nifi.processors.aws.sqs;
import com.amazonaws.regions.Regions;
import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.regions.Region;
import java.io.IOException;
import java.nio.file.Paths;
@ -34,7 +34,7 @@ public class ITPutSQS {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
private final String QUEUE_URL = "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000";
private final String REGION = Regions.US_WEST_2.getName();
private final String REGION = Region.US_WEST_2.id();
private final String VPCE_QUEUE_URL = "https://vpce-1234567890abcdefg-12345678.sqs.us-west-2.vpce.amazonaws.com/123456789012/test-queue";
private final String VPCE_ENDPOINT_OVERRIDE = "https://vpce-1234567890abcdefg-12345678.sqs.us-west-2.vpce.amazonaws.com";
@ -91,7 +91,7 @@ public class ITPutSQS {
final TestRunner runner = TestRunners.newTestRunner(new PutSQS());
runner.setProperty(PutSQS.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties");
runner.setProperty(PutSQS.REGION, Regions.US_WEST_2.getName());
runner.setProperty(PutSQS.REGION, Region.US_WEST_2.id());
runner.setProperty(PutSQS.QUEUE_URL, VPCE_QUEUE_URL);
runner.setProperty(PutSQS.ENDPOINT_OVERRIDE, VPCE_ENDPOINT_OVERRIDE);

View File

@ -16,10 +16,6 @@
*/
package org.apache.nifi.processors.aws.sqs;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.AmazonSQSException;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -27,8 +23,11 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -39,18 +38,19 @@ public class TestDeleteSQS {
private TestRunner runner = null;
private DeleteSQS mockDeleteSQS = null;
private AmazonSQSClient mockSQSClient = null;
private SqsClient mockSQSClient = null;
@BeforeEach
public void setUp() {
mockSQSClient = Mockito.mock(AmazonSQSClient.class);
DeleteMessageBatchResult mockResponse = Mockito.mock(DeleteMessageBatchResult.class);
Mockito.when(mockSQSClient.deleteMessageBatch(Mockito.any())).thenReturn(mockResponse);
Mockito.when(mockResponse.getFailed()).thenReturn(new ArrayList<>());
mockSQSClient = Mockito.mock(SqsClient.class);
DeleteMessageBatchResponse mockResponse = DeleteMessageBatchResponse.builder()
.failed(Collections.emptyList())
.build();
Mockito.when(mockSQSClient.deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class))).thenReturn(mockResponse);
mockDeleteSQS = new DeleteSQS() {
@Override
protected AmazonSQSClient getClient(ProcessContext context) {
protected SqsClient getClient(ProcessContext context) {
return mockSQSClient;
}
};
@ -71,8 +71,8 @@ public class TestDeleteSQS {
ArgumentCaptor<DeleteMessageBatchRequest> captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture());
DeleteMessageBatchRequest deleteRequest = captureDeleteRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", deleteRequest.getQueueUrl());
assertEquals("test-receipt-handle-1", deleteRequest.getEntries().get(0).getReceiptHandle());
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", deleteRequest.queueUrl());
assertEquals("test-receipt-handle-1", deleteRequest.entries().get(0).receiptHandle());
runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1);
}
@ -92,7 +92,7 @@ public class TestDeleteSQS {
ArgumentCaptor<DeleteMessageBatchRequest> captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture());
DeleteMessageBatchRequest deleteRequest = captureDeleteRequest.getValue();
assertEquals("test-receipt-handle-1", deleteRequest.getEntries().get(0).getReceiptHandle());
assertEquals("test-receipt-handle-1", deleteRequest.entries().get(0).receiptHandle());
runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1);
}
@ -105,7 +105,7 @@ public class TestDeleteSQS {
ff1Attributes.put("sqs.receipt.handle", "test-receipt-handle-1");
runner.enqueue("TestMessageBody1", ff1Attributes);
Mockito.when(mockSQSClient.deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class)))
.thenThrow(new AmazonSQSException("TestFail"));
.thenThrow(new RuntimeException());
runner.assertValid();
runner.run(1);

View File

@ -16,12 +16,6 @@
*/
package org.apache.nifi.processors.aws.sqs;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -30,8 +24,17 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -40,14 +43,14 @@ public class TestGetSQS {
private TestRunner runner = null;
private GetSQS mockGetSQS = null;
private AmazonSQSClient mockSQSClient = null;
private SqsClient mockSQSClient = null;
@BeforeEach
public void setUp() {
mockSQSClient = Mockito.mock(AmazonSQSClient.class);
mockSQSClient = Mockito.mock(SqsClient.class);
mockGetSQS = new GetSQS() {
@Override
protected AmazonSQSClient getClient(ProcessContext context) {
protected SqsClient getClient(ProcessContext context) {
return mockSQSClient;
}
};
@ -59,17 +62,24 @@ public class TestGetSQS {
runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000");
runner.setProperty(GetSQS.AUTO_DELETE, "false");
Message message1 = new Message();
message1.setBody("TestMessage1");
message1.addAttributesEntry("attrib-key-1", "attrib-value-1");
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
messageAttributeValue.setStringValue("msg-attrib-value-1");
message1.addMessageAttributesEntry("msg-attrib-key-1", messageAttributeValue);
message1.setMD5OfBody("test-md5-hash-1");
message1.setMessageId("test-message-id-1");
message1.setReceiptHandle("test-receipt-handle-1");
ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult()
.withMessages(message1);
final Map<String, String> attributes = new HashMap<>();
final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
MessageAttributeValue messageAttributeValue = MessageAttributeValue.builder()
.stringValue("msg-attrib-value-1").build();
messageAttributes.put("msg-attrib-key-1", messageAttributeValue);
attributes.put("attrib-key-1", "attrib-value-1"); // This attribute is no longer valid in SDK v2
attributes.put(MessageSystemAttributeName.MESSAGE_GROUP_ID.toString(), "attrib-value-1"); // However, this one is allowed
Message message1 = Message.builder()
.body("TestMessage1")
.attributesWithStrings(attributes)
.messageAttributes(messageAttributes)
.md5OfBody("test-md5-hash-1")
.messageId("test-message-id-1")
.receiptHandle("test-receipt-handle-1")
.build();
ReceiveMessageResponse receiveMessageResult = ReceiveMessageResponse.builder()
.messages(message1)
.build();
Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
runner.run(1);
@ -77,13 +87,14 @@ public class TestGetSQS {
ArgumentCaptor<ReceiveMessageRequest> captureRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureRequest.capture());
ReceiveMessageRequest request = captureRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl());
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.queueUrl());
Mockito.verify(mockSQSClient, Mockito.never()).deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class));
runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("sqs.attrib-key-1", "attrib-value-1");
ff0.assertAttributeNotExists("sqs.attrib-key-1");
ff0.assertAttributeEquals("sqs.MessageGroupId", "attrib-value-1");
ff0.assertAttributeEquals("sqs.msg-attrib-key-1", "msg-attrib-value-1");
ff0.assertAttributeEquals("hash.value", "test-md5-hash-1");
ff0.assertAttributeEquals("hash.algorithm", "md5");
@ -94,7 +105,7 @@ public class TestGetSQS {
@Test
public void testGetNoMessages() {
runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000");
ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult();
ReceiveMessageResponse receiveMessageResult = ReceiveMessageResponse.builder().build();
Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
runner.run(1);
@ -102,7 +113,7 @@ public class TestGetSQS {
ArgumentCaptor<ReceiveMessageRequest> captureRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureRequest.capture());
ReceiveMessageRequest request = captureRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl());
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.queueUrl());
runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 0);
}
@ -112,16 +123,19 @@ public class TestGetSQS {
runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000");
runner.setProperty(GetSQS.AUTO_DELETE, "true");
Message message1 = new Message();
message1.setBody("TestMessage1");
message1.setMessageId("test-message-id-1");
message1.setReceiptHandle("test-receipt-handle-1");
Message message2 = new Message();
message2.setBody("TestMessage2");
message2.setMessageId("test-message-id-2");
message2.setReceiptHandle("test-receipt-handle-2");
ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult()
.withMessages(message1, message2);
Message message1 = Message.builder()
.body("TestMessage1")
.messageId("test-message-id-1")
.receiptHandle("test-receipt-handle-1")
.build();
Message message2 = Message.builder()
.body("TestMessage2")
.messageId("test-message-id-2")
.receiptHandle("test-receipt-handle-2")
.build();
ReceiveMessageResponse receiveMessageResult = ReceiveMessageResponse.builder()
.messages(message1, message2)
.build();
Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
runner.run(1);
@ -129,14 +143,14 @@ public class TestGetSQS {
ArgumentCaptor<ReceiveMessageRequest> captureReceiveRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureReceiveRequest.capture());
ReceiveMessageRequest receiveRequest = captureReceiveRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", receiveRequest.getQueueUrl());
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", receiveRequest.queueUrl());
ArgumentCaptor<DeleteMessageBatchRequest> captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture());
DeleteMessageBatchRequest deleteRequest = captureDeleteRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", deleteRequest.getQueueUrl());
assertEquals("test-message-id-1", deleteRequest.getEntries().get(0).getId());
assertEquals("test-message-id-2", deleteRequest.getEntries().get(1).getId());
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", deleteRequest.queueUrl());
assertEquals("test-message-id-1", deleteRequest.entries().get(0).id());
assertEquals("test-message-id-2", deleteRequest.entries().get(1).id());
runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 2);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);

View File

@ -16,10 +16,6 @@
*/
package org.apache.nifi.processors.aws.sqs;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.AmazonSQSException;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -27,6 +23,9 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import java.util.HashMap;
import java.util.Map;
@ -39,14 +38,14 @@ public class TestPutSQS {
private TestRunner runner = null;
private PutSQS mockPutSQS = null;
private AmazonSQSClient mockSQSClient = null;
private SqsClient mockSQSClient = null;
@BeforeEach
public void setUp() {
mockSQSClient = Mockito.mock(AmazonSQSClient.class);
mockSQSClient = Mockito.mock(SqsClient.class);
mockPutSQS = new PutSQS() {
@Override
protected AmazonSQSClient getClient(ProcessContext context) {
protected SqsClient getClient(ProcessContext context) {
return mockSQSClient;
}
};
@ -62,7 +61,7 @@ public class TestPutSQS {
attrs.put("filename", "1.txt");
runner.enqueue("TestMessageBody", attrs);
SendMessageBatchResult batchResult = new SendMessageBatchResult();
SendMessageBatchResponse batchResult = SendMessageBatchResponse.builder().build();
Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenReturn(batchResult);
runner.run(1);
@ -70,9 +69,9 @@ public class TestPutSQS {
ArgumentCaptor<SendMessageBatchRequest> captureRequest = ArgumentCaptor.forClass(SendMessageBatchRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).sendMessageBatch(captureRequest.capture());
SendMessageBatchRequest request = captureRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl());
assertEquals("hello", request.getEntries().get(0).getMessageAttributes().get("x-custom-prop").getStringValue());
assertEquals("TestMessageBody", request.getEntries().get(0).getMessageBody());
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.queueUrl());
assertEquals("hello", request.entries().get(0).messageAttributes().get("x-custom-prop").stringValue());
assertEquals("TestMessageBody", request.entries().get(0).messageBody());
runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
}
@ -85,15 +84,15 @@ public class TestPutSQS {
attrs.put("filename", "1.txt");
runner.enqueue("TestMessageBody", attrs);
Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenThrow(new AmazonSQSException("TestFail"));
Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenThrow(new RuntimeException());
runner.run(1);
ArgumentCaptor<SendMessageBatchRequest> captureRequest = ArgumentCaptor.forClass(SendMessageBatchRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).sendMessageBatch(captureRequest.capture());
SendMessageBatchRequest request = captureRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl());
assertEquals("TestMessageBody", request.getEntries().get(0).getMessageBody());
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.queueUrl());
assertEquals("TestMessageBody", request.entries().get(0).messageBody());
runner.assertAllFlowFilesTransferred(PutSQS.REL_FAILURE, 1);
}
@ -110,7 +109,7 @@ public class TestPutSQS {
attrs.put("myuuid", "fb0dfed8-092e-40ee-83ce-5b576cd26236");
runner.enqueue("TestMessageBody", attrs);
SendMessageBatchResult batchResult = new SendMessageBatchResult();
final SendMessageBatchResponse batchResult = SendMessageBatchResponse.builder().build();
Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenReturn(batchResult);
runner.run(1);
@ -118,11 +117,11 @@ public class TestPutSQS {
ArgumentCaptor<SendMessageBatchRequest> captureRequest = ArgumentCaptor.forClass(SendMessageBatchRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).sendMessageBatch(captureRequest.capture());
SendMessageBatchRequest request = captureRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl());
assertEquals("hello", request.getEntries().get(0).getMessageAttributes().get("x-custom-prop").getStringValue());
assertEquals("TestMessageBody", request.getEntries().get(0).getMessageBody());
assertEquals("test1234", request.getEntries().get(0).getMessageGroupId());
assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", request.getEntries().get(0).getMessageDeduplicationId());
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.queueUrl());
assertEquals("hello", request.entries().get(0).messageAttributes().get("x-custom-prop").stringValue());
assertEquals("TestMessageBody", request.entries().get(0).messageBody());
assertEquals("test1234", request.entries().get(0).messageGroupId());
assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", request.entries().get(0).messageDeduplicationId());
runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
}

View File

@ -32,6 +32,10 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>