Adding support for using an HTTP proxy with AWS processors.

This closes #209

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
mans2singh 2016-02-08 11:43:03 -08:00 committed by Aldrin Piri
parent ef0018cf66
commit 36f1caad33
9 changed files with 110 additions and 7 deletions

View File

@ -77,6 +77,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
.required(false) .required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor ACCESS_KEY = new PropertyDescriptor.Builder() public static final PropertyDescriptor ACCESS_KEY = new PropertyDescriptor.Builder()
.name("Access Key") .name("Access Key")
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
@ -84,6 +85,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true) .sensitive(true)
.build(); .build();
public static final PropertyDescriptor SECRET_KEY = new PropertyDescriptor.Builder() public static final PropertyDescriptor SECRET_KEY = new PropertyDescriptor.Builder()
.name("Secret Key") .name("Secret Key")
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
@ -91,6 +93,23 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true) .sensitive(true)
.build(); .build();
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
.name("Proxy Host")
.description("Proxy host name or IP")
.expressionLanguageSupported(true)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROXY_HOST_PORT = new PropertyDescriptor.Builder()
.name("Proxy Host Port")
.description("Proxy host port")
.expressionLanguageSupported(true)
.required(false)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder() public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
.name("Region") .name("Region")
.required(true) .required(true)
@ -161,6 +180,12 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build()); problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build());
} }
final boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet();
final boolean proxyHostPortSet = validationContext.getProperty(PROXY_HOST_PORT).isSet();
if ( ((!proxyHostSet) && proxyHostPortSet) || (proxyHostSet && (!proxyHostPortSet)) ) {
problems.add(new ValidationResult.Builder().input("Proxy Host Port").valid(false).explanation("Both proxy host and port must be set").build());
}
return problems; return problems;
} }
@ -182,6 +207,13 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
config.getApacheHttpClientConfig().setSslSocketFactory(sdkTLSSocketFactory); config.getApacheHttpClientConfig().setSslSocketFactory(sdkTLSSocketFactory);
} }
if (context.getProperty(PROXY_HOST).isSet()) {
String proxyHost = context.getProperty(PROXY_HOST).getValue();
config.setProxyHost(proxyHost);
Integer proxyPort = context.getProperty(PROXY_HOST_PORT).asInteger();
config.setProxyPort(proxyPort);
}
return config; return config;
} }

View File

@ -58,7 +58,7 @@ public class DeleteS3Object extends AbstractS3Processor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, TIMEOUT, VERSION_ID, Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, TIMEOUT, VERSION_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

View File

@ -73,7 +73,8 @@ public class FetchS3Object extends AbstractS3Processor {
.build(); .build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID,
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

View File

@ -179,7 +179,7 @@ public class PutS3Object extends AbstractS3Processor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE,
ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE)); ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, PROXY_HOST, PROXY_HOST_PORT));
final static String S3_BUCKET_KEY = "s3.bucket"; final static String S3_BUCKET_KEY = "s3.bucket";
final static String S3_OBJECT_KEY = "s3.key"; final static String S3_OBJECT_KEY = "s3.key";

View File

@ -76,7 +76,7 @@ public class PutSNS extends AbstractSNSProcessor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
USE_JSON_STRUCTURE, CHARACTER_ENCODING)); USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT));
public static final int MAX_SIZE = 256 * 1024; public static final int MAX_SIZE = 256 * 1024;

View File

@ -54,7 +54,7 @@ public class DeleteSQS extends AbstractSQSProcessor {
.build(); .build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, QUEUE_URL, TIMEOUT)); Arrays.asList(ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, QUEUE_URL, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

View File

@ -115,7 +115,7 @@ public class GetSQS extends AbstractSQSProcessor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME)); AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME, PROXY_HOST, PROXY_HOST_PORT));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

View File

@ -65,7 +65,7 @@ public class PutSQS extends AbstractSQSProcessor {
.build(); .build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, DELAY, TIMEOUT)); Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, DELAY, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT));
private volatile List<PropertyDescriptor> userDefinedProperties = Collections.emptyList(); private volatile List<PropertyDescriptor> userDefinedProperties = Collections.emptyList();

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.credentials.provider.service;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.processors.aws.s3.FetchS3Object;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class AWSProcessorProxyTest {
private TestRunner runner;
@Before
public void testSetup() throws Throwable {
runner = TestRunners.newTestRunner(FetchS3Object.class);
runner.setProperty(FetchS3Object.BUCKET, "bucket");
runner.assertValid();
}
@After
public void testTearDown() throws Throwable {
runner = null;
}
@SuppressWarnings("deprecation")
@Test
public void testProxyHostOnlyInvalid() throws Throwable {
runner.setProperty(AbstractAWSProcessor.PROXY_HOST, "proxyHost");
runner.assertNotValid();
}
@SuppressWarnings("deprecation")
@Test
public void testProxyHostPortOnlyInvalid() throws Throwable {
runner.setProperty(AbstractAWSProcessor.PROXY_HOST_PORT, "1");
runner.assertNotValid();
}
@SuppressWarnings("deprecation")
@Test
public void testProxyHostPortNonNumberInvalid() throws Throwable {
runner.setProperty(AbstractAWSProcessor.PROXY_HOST_PORT, "a");
runner.assertNotValid();
}
@SuppressWarnings("deprecation")
@Test
public void testProxyHostAndPortValid() throws Throwable {
runner.setProperty(AbstractAWSProcessor.PROXY_HOST_PORT, "1");
runner.setProperty(AbstractAWSProcessor.PROXY_HOST, "proxyHost");
runner.assertValid();
}
}