NIFI-8662: Using the configured AWS region when parsing from VPCE endpoint URL fails in AbstractAWSProcessor

This closes #5140.

Signed-off-by: Tamas Palfy <tamas.bertalan.palfy@gmail.com>
This commit is contained in:
Peter Turcsanyi 2021-06-07 10:21:14 +02:00 committed by Tamas Palfy
parent 90f44d9d62
commit 820b2cff29
2 changed files with 49 additions and 24 deletions

View File

@ -154,6 +154,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
protected volatile ClientType client; protected volatile ClientType client;
protected volatile Region region; protected volatile Region region;
private static final String VPCE_ENDPOINT_SUFFIX = ".vpce.amazonaws.com";
private static final Pattern VPCE_ENDPOINT_PATTERN = Pattern.compile("^(?:.+[vpce-][a-z0-9-]+\\.)?([a-z0-9-]+)$"); private static final Pattern VPCE_ENDPOINT_PATTERN = Pattern.compile("^(?:.+[vpce-][a-z0-9-]+\\.)?([a-z0-9-]+)$");
// If protocol is changed to be a property, ensure other uses are also changed // If protocol is changed to be a property, ensure other uses are also changed
@ -318,10 +319,15 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
if (!urlstr.isEmpty()) { if (!urlstr.isEmpty()) {
getLogger().info("Overriding endpoint with {}", urlstr); getLogger().info("Overriding endpoint with {}", urlstr);
if (urlstr.endsWith(".vpce.amazonaws.com")) { if (urlstr.endsWith(VPCE_ENDPOINT_SUFFIX)) {
String region = parseRegionForVPCE(urlstr); // handling vpce endpoints
// falling back to the configured region if the parse fails
// e.g. in case of https://vpce-***-***.sqs.{region}.vpce.amazonaws.com
String region = parseRegionForVPCE(urlstr, this.region.getName());
this.client.setEndpoint(urlstr, this.client.getServiceName(), region); this.client.setEndpoint(urlstr, this.client.getServiceName(), region);
} else { } else {
// handling non-vpce custom endpoints where the AWS library can parse the region out
// e.g. https://sqs.{region}.***.***.***.gov
this.client.setEndpoint(urlstr); this.client.setEndpoint(urlstr);
} }
} }
@ -335,18 +341,18 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
is an AWS PrivateLink so this method does the job of parsing the region name and is an AWS PrivateLink so this method does the job of parsing the region name and
returning it. returning it.
Refer NIFI-5456 & NIFI-5893 Refer NIFI-5456, NIFI-5893 & NIFI-8662
*/ */
private String parseRegionForVPCE(String url) { private String parseRegionForVPCE(String url, String configuredRegion) {
int index = url.length() - ".vpce.amazonaws.com".length(); int index = url.length() - VPCE_ENDPOINT_SUFFIX.length();
Matcher matcher = VPCE_ENDPOINT_PATTERN.matcher(url.substring(0, index)); Matcher matcher = VPCE_ENDPOINT_PATTERN.matcher(url.substring(0, index));
if (matcher.matches()) { if (matcher.matches()) {
return matcher.group(1); return matcher.group(1);
} else { } else {
getLogger().warn("Unable to get a match with the VPCE endpoint pattern; defaulting the region to us-east-1..."); getLogger().info("Unable to get a match with the VPCE endpoint pattern; using the configured region: " + configuredRegion);
return "us-east-1"; return configuredRegion;
} }
} }

View File

@ -19,30 +19,32 @@ package org.apache.nifi.processors.aws.sqs;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.nifi.processors.aws.AbstractAWSProcessor; import com.amazonaws.regions.Regions;
import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.processors.aws.sns.PutSNS;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") @Ignore("For local testing only - interacts with SQS so the credentials file must be configured and all necessary queues created")
public class ITPutSQS { public class ITPutSQS {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
private final String QUEUE_URL = "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000"; private final String QUEUE_URL = "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000";
private final String REGION = Regions.US_WEST_2.getName();
private final String VPCE_QUEUE_URL = "https://vpce-1234567890abcdefg-12345678.sqs.us-west-2.vpce.amazonaws.com/123456789012/test-queue";
private final String VPCE_ENDPOINT_OVERRIDE = "https://vpce-1234567890abcdefg-12345678.sqs.us-west-2.vpce.amazonaws.com";
@Test @Test
public void testSimplePut() throws IOException { public void testSimplePut() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutSQS()); final TestRunner runner = TestRunners.newTestRunner(new PutSQS());
runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutSQS.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutSQS.TIMEOUT, "30 secs"); runner.setProperty(PutSQS.REGION, REGION);
runner.setProperty(PutSQS.QUEUE_URL, QUEUE_URL); runner.setProperty(PutSQS.QUEUE_URL, QUEUE_URL);
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
@ -58,28 +60,45 @@ public class ITPutSQS {
public void testSimplePutUsingCredentialsProviderService() throws Throwable { public void testSimplePutUsingCredentialsProviderService() throws Throwable {
final TestRunner runner = TestRunners.newTestRunner(new PutSQS()); final TestRunner runner = TestRunners.newTestRunner(new PutSQS());
runner.setProperty(PutSQS.TIMEOUT, "30 secs"); runner.setProperty(PutSQS.REGION, REGION);
runner.setProperty(PutSQS.QUEUE_URL, QUEUE_URL); runner.setProperty(PutSQS.QUEUE_URL, QUEUE_URL);
final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService();
runner.addControllerService("awsCredentialsProvider", serviceImpl); runner.addControllerService("awsCredentialsProvider", serviceImpl);
runner.setProperty(serviceImpl, CredentialPropertyDescriptors.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.enableControllerService(serviceImpl); runner.enableControllerService(serviceImpl);
runner.assertValid(serviceImpl); runner.assertValid(serviceImpl);
runner.setProperty(PutSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider");
runner.assertValid();
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "1.txt"); attrs.put("filename", "1.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
runner.setProperty(PutSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider");
runner.run(1); runner.run(1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSQS.REL_SUCCESS); runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
for (final MockFlowFile mff : flowFiles) {
System.out.println(mff.getAttributes());
System.out.println(new String(mff.toByteArray()));
} }
@Test
public void testVpceEndpoint() throws IOException {
// additional AWS environment setup for testing VPCE endpoints:
// - create an Interface Endpoint in your VPC for SQS (https://docs.aws.amazon.com/vpc/latest/privatelink/vpce-interface.html#create-interface-endpoint)
// - create a Client VPN Endpoint in your VPC (https://docs.aws.amazon.com/vpn/latest/clientvpn-admin/cvpn-getting-started.html)
// and connect your local machine (running the test) to your VPC via VPN
// - alternatively, the test can be run on an EC2 instance located on the VPC
final TestRunner runner = TestRunners.newTestRunner(new PutSQS());
runner.setProperty(PutSQS.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties");
runner.setProperty(PutSQS.REGION, Regions.US_WEST_2.getName());
runner.setProperty(PutSQS.QUEUE_URL, VPCE_QUEUE_URL);
runner.setProperty(PutSQS.ENDPOINT_OVERRIDE, VPCE_ENDPOINT_OVERRIDE);
runner.enqueue(Paths.get("src/test/resources/hello.txt"));
runner.run(1);
runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
} }
} }