NIFI-4199: Added ProxyConfigurationService

- Added ProxyConfigurationService to manage centralized proxy
configurations
- Adopt ProxyConfigurationService at FTP and HTTP processors

NIFI-4175 - Add HTTP proxy support to *SFTP processors

This closes #2018.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>

NIFI-4199: Add ProxyConfigurationService to SFTP processors

- Fixed check style issue
- Use the same proxy related PropertyDescriptors from FTPTransfer and
SFTPTransfer
- Dropped FlowFile EL evaluation support to make it align with other
processors spec, Now it supports VARIABLE_REGISTRY
- Added ProxyConfigurationService to SFTP processors
- Added SOCKS proxy support to SFTP processors

NIFI-4199: Added ProxyConfigurationService to ElasticsearchHttp processors

- ElasticsearchHttp processors now support SOCKS proxy, too
- Added proxy support to PutElasticsearchHttpRecord
- Moved more common property descriptors to
AbstractElasticsearchHttpProcessor and just return static unmodifiable
property descriptor list at each implementation processors

NIFI-4196 - Expose AWS proxy authentication settings

NIFI-4196 - Fix jUnit errors

This closes #2016.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>

NIFI-4199: Add ProxyConfigService to AWS processors

- Applied ProxyConfigService to S3 processors
- Added proxy support to following processors:
  - PutKinesisFirehose, PutKinesisStream
  - PutDynamoDB, DeleteDynamoDB, GetDynamoDB
  - PutKinesisStream
- All AWS processors support HTTP proxy now

NIFI-4199: Add ProxyConfigService to Azure processors

NIFI-4199: More explicit validation and docs for Proxy spec

- Each processor has different supporting Proxy specs
- Show supported spec to ProxyConfigurationService property doc
- Validate not only Proxy type, but also with Authentication

NIFI-4199: Incorporated review comments

- Fixed TestListS3 property descriptor check
- Separate name and displayName

This closes #2016
This closes #2018
This closes #2704

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Koji Kawamura 2018-05-15 14:52:39 +09:00 committed by Mike Thomsen
parent d79216d6b0
commit 2834fa4ce4
64 changed files with 1326 additions and 195 deletions

View File

@ -703,6 +703,12 @@ language governing permissions and limitations under the License. -->
<version>1.7.0-SNAPSHOT</version> <version>1.7.0-SNAPSHOT</version>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-nar</artifactId>
<version>1.7.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>
<profile> <profile>

View File

@ -76,6 +76,10 @@
<artifactId>nifi-utils</artifactId> <artifactId>nifi-utils</artifactId>
<version>1.7.0-SNAPSHOT</version> <version>1.7.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -28,6 +28,7 @@ import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions; import com.amazonaws.regions.Regions;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.Proxy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -53,6 +54,8 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors; import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
import org.apache.nifi.processors.aws.regions.AWSRegions; import org.apache.nifi.processors.aws.regions.AWSRegions;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
/** /**
@ -93,6 +96,25 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
.addValidator(StandardValidators.PORT_VALIDATOR) .addValidator(StandardValidators.PORT_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor PROXY_USERNAME = new PropertyDescriptor.Builder()
.name("proxy-user-name")
.displayName("Proxy Username")
.description("Proxy username")
.expressionLanguageSupported(true)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROXY_PASSWORD = new PropertyDescriptor.Builder()
.name("proxy-user-password")
.displayName("Proxy Password")
.description("Proxy password")
.expressionLanguageSupported(true)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder() public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
.name("Region") .name("Region")
.required(true) .required(true)
@ -131,6 +153,9 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS; protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS;
protected static final String DEFAULT_USER_AGENT = "NiFi"; protected static final String DEFAULT_USER_AGENT = "NiFi";
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
private static AllowableValue createAllowableValue(final Regions region) { private static AllowableValue createAllowableValue(final Regions region) {
return new AllowableValue(region.getName(), AWSRegions.getRegionDisplayName(region.getName())); return new AllowableValue(region.getName(), AWSRegions.getRegionDisplayName(region.getName()));
} }
@ -169,6 +194,8 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
problems.add(new ValidationResult.Builder().input("Proxy Host Port").valid(false).explanation("Both proxy host and port must be set").build()); problems.add(new ValidationResult.Builder().input("Proxy Host Port").valid(false).explanation("Both proxy host and port must be set").build());
} }
ProxyConfiguration.validateProxySpec(validationContext, problems, PROXY_SPECS);
return problems; return problems;
} }
@ -193,11 +220,31 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
} }
} }
if (context.getProperty(PROXY_HOST).isSet()) { final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue(); if (context.getProperty(PROXY_HOST).isSet()) {
config.setProxyHost(proxyHost); final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
Integer proxyPort = context.getProperty(PROXY_HOST_PORT).evaluateAttributeExpressions().asInteger(); String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
config.setProxyPort(proxyPort); Integer proxyPort = context.getProperty(PROXY_HOST_PORT).evaluateAttributeExpressions().asInteger();
String proxyUsername = context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue();
String proxyPassword = context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
componentProxyConfig.setProxyType(Proxy.Type.HTTP);
componentProxyConfig.setProxyServerHost(proxyHost);
componentProxyConfig.setProxyServerPort(proxyPort);
componentProxyConfig.setProxyUserName(proxyUsername);
componentProxyConfig.setProxyUserPassword(proxyPassword);
return componentProxyConfig;
}
return ProxyConfiguration.DIRECT_CONFIGURATION;
});
if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
config.setProxyHost(proxyConfig.getProxyServerHost());
config.setProxyPort(proxyConfig.getProxyServerPort());
if (proxyConfig.hasCredential()) {
config.setProxyUsername(proxyConfig.getProxyUserName());
config.setProxyPassword(proxyConfig.getProxyUserPassword());
}
} }
return config; return config;

View File

@ -174,7 +174,7 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
Collections.unmodifiableList( Collections.unmodifiableList(
Arrays.asList(NAMESPACE, METRIC_NAME, VALUE, MAXIMUM, MINIMUM, SAMPLECOUNT, SUM, TIMESTAMP, Arrays.asList(NAMESPACE, METRIC_NAME, VALUE, MAXIMUM, MINIMUM, SAMPLECOUNT, SUM, TIMESTAMP,
UNIT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, UNIT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE,
TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT) TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)
); );
private volatile Set<String> dynamicPropertyNames = new HashSet<>(); private volatile Set<String> dynamicPropertyNames = new HashSet<>();

View File

@ -74,7 +74,8 @@ public class DeleteDynamoDB extends AbstractWriteDynamoDBProcessor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE, Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY,
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE)); CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE,
PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

View File

@ -80,7 +80,8 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE, Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY,
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE)); CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE,
PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found") public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
.description("FlowFiles are routed to not found relationship if key not found in the table").build(); .description("FlowFiles are routed to not found relationship if key not found in the table").build();

View File

@ -84,7 +84,8 @@ public class PutDynamoDB extends AbstractWriteDynamoDBProcessor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE, Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, DOCUMENT_CHARSET, BATCH_SIZE, HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, DOCUMENT_CHARSET, BATCH_SIZE,
REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE)); REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE,
PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
/** /**
* Dyamodb max item size limit 400 kb * Dyamodb max item size limit 400 kb

View File

@ -72,7 +72,7 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
PROXY_HOST, PROXY_HOST_PORT, ENDPOINT_OVERRIDE)); PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, ENDPOINT_OVERRIDE));
/** /**
* Max buffer size 1 MB * Max buffer size 1 MB

View File

@ -80,7 +80,7 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT, ENDPOINT_OVERRIDE)); AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, ENDPOINT_OVERRIDE));
/** A random number generator for cases where partition key is not available */ /** A random number generator for cases where partition key is not available */
protected Random randomParitionKeyGenerator = new Random(); protected Random randomParitionKeyGenerator = new Random();

View File

@ -127,8 +127,8 @@ public class PutLambda extends AbstractAWSLambdaProcessor {
public static final long MAX_REQUEST_SIZE = 6 * 1000 * 1000; public static final long MAX_REQUEST_SIZE = 6 * 1000 * 1000;
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(AWS_LAMBDA_FUNCTION_NAME, AWS_LAMBDA_FUNCTION_QUALIFIER, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT Arrays.asList(AWS_LAMBDA_FUNCTION_NAME, AWS_LAMBDA_FUNCTION_QUALIFIER, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
)); PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

View File

@ -59,7 +59,7 @@ public class DeleteS3Object extends AbstractS3Processor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, TIMEOUT, VERSION_ID, Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, TIMEOUT, VERSION_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT)); SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

View File

@ -76,7 +76,7 @@ public class FetchS3Object extends AbstractS3Processor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID, Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID,
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT)); SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

View File

@ -137,7 +137,8 @@ public class ListS3 extends AbstractS3Processor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE)); SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD,
DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE));
public static final Set<Relationship> relationships = Collections.unmodifiableSet( public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Collections.singletonList(REL_SUCCESS))); new HashSet<>(Collections.singletonList(REL_SUCCESS)));

View File

@ -209,7 +209,7 @@ public class PutS3Object extends AbstractS3Processor {
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE,
ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE,
SERVER_SIDE_ENCRYPTION, PROXY_HOST, PROXY_HOST_PORT)); SERVER_SIDE_ENCRYPTION, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
final static String S3_BUCKET_KEY = "s3.bucket"; final static String S3_BUCKET_KEY = "s3.bucket";
final static String S3_OBJECT_KEY = "s3.key"; final static String S3_OBJECT_KEY = "s3.key";

View File

@ -77,7 +77,7 @@ public class PutSNS extends AbstractSNSProcessor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT)); USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
public static final int MAX_SIZE = 256 * 1024; public static final int MAX_SIZE = 256 * 1024;

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -141,7 +142,7 @@ public class TestDeleteS3Object {
public void testGetPropertyDescriptors() throws Exception { public void testGetPropertyDescriptors() throws Exception {
DeleteS3Object processor = new DeleteS3Object(); DeleteS3Object processor = new DeleteS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 20, pd.size()); assertEquals("size should be eq", 23, pd.size());
assertTrue(pd.contains(processor.ACCESS_KEY)); assertTrue(pd.contains(processor.ACCESS_KEY));
assertTrue(pd.contains(processor.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(processor.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(processor.BUCKET)); assertTrue(pd.contains(processor.BUCKET));
@ -160,5 +161,10 @@ public class TestDeleteS3Object {
assertTrue(pd.contains(processor.VERSION_ID)); assertTrue(pd.contains(processor.VERSION_ID));
assertTrue(pd.contains(processor.WRITE_ACL_LIST)); assertTrue(pd.contains(processor.WRITE_ACL_LIST));
assertTrue(pd.contains(processor.WRITE_USER_LIST)); assertTrue(pd.contains(processor.WRITE_USER_LIST));
assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE));
assertTrue(pd.contains(processor.PROXY_HOST));
assertTrue(pd.contains(processor.PROXY_HOST_PORT));
assertTrue(pd.contains(processor.PROXY_USERNAME));
assertTrue(pd.contains(processor.PROXY_PASSWORD));
} }
} }

View File

@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -178,7 +179,7 @@ public class TestFetchS3Object {
public void testGetPropertyDescriptors() throws Exception { public void testGetPropertyDescriptors() throws Exception {
FetchS3Object processor = new FetchS3Object(); FetchS3Object processor = new FetchS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 14, pd.size()); assertEquals("size should be eq", 17, pd.size());
assertTrue(pd.contains(FetchS3Object.ACCESS_KEY)); assertTrue(pd.contains(FetchS3Object.ACCESS_KEY));
assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(FetchS3Object.BUCKET)); assertTrue(pd.contains(FetchS3Object.BUCKET));
@ -191,5 +192,11 @@ public class TestFetchS3Object {
assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE)); assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE));
assertTrue(pd.contains(FetchS3Object.TIMEOUT)); assertTrue(pd.contains(FetchS3Object.TIMEOUT));
assertTrue(pd.contains(FetchS3Object.VERSION_ID)); assertTrue(pd.contains(FetchS3Object.VERSION_ID));
assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE));
assertTrue(pd.contains(FetchS3Object.PROXY_HOST));
assertTrue(pd.contains(FetchS3Object.PROXY_HOST_PORT));
assertTrue(pd.contains(FetchS3Object.PROXY_USERNAME));
assertTrue(pd.contains(FetchS3Object.PROXY_PASSWORD));
} }
} }

View File

@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.state.MockStateManager; import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
@ -294,7 +295,7 @@ public class TestListS3 {
public void testGetPropertyDescriptors() throws Exception { public void testGetPropertyDescriptors() throws Exception {
ListS3 processor = new ListS3(); ListS3 processor = new ListS3();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 17, pd.size()); assertEquals("size should be eq", 20, pd.size());
assertTrue(pd.contains(ListS3.ACCESS_KEY)); assertTrue(pd.contains(ListS3.ACCESS_KEY));
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(ListS3.BUCKET)); assertTrue(pd.contains(ListS3.BUCKET));
@ -305,11 +306,15 @@ public class TestListS3 {
assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE)); assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE));
assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE)); assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE));
assertTrue(pd.contains(ListS3.TIMEOUT)); assertTrue(pd.contains(ListS3.TIMEOUT));
assertTrue(pd.contains(ListS3.PROXY_HOST));
assertTrue(pd.contains(ListS3.PROXY_HOST_PORT));
assertTrue(pd.contains(ListS3.DELIMITER)); assertTrue(pd.contains(ListS3.DELIMITER));
assertTrue(pd.contains(ListS3.PREFIX)); assertTrue(pd.contains(ListS3.PREFIX));
assertTrue(pd.contains(ListS3.USE_VERSIONS)); assertTrue(pd.contains(ListS3.USE_VERSIONS));
assertTrue(pd.contains(ListS3.LIST_TYPE));
assertTrue(pd.contains(ListS3.MIN_AGE)); assertTrue(pd.contains(ListS3.MIN_AGE));
assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE));
assertTrue(pd.contains(ListS3.PROXY_HOST));
assertTrue(pd.contains(ListS3.PROXY_HOST_PORT));
assertTrue(pd.contains(ListS3.PROXY_USERNAME));
assertTrue(pd.contains(ListS3.PROXY_PASSWORD));
} }
} }

View File

@ -25,6 +25,7 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -150,7 +151,7 @@ public class TestPutS3Object {
public void testGetPropertyDescriptors() throws Exception { public void testGetPropertyDescriptors() throws Exception {
PutS3Object processor = new PutS3Object(); PutS3Object processor = new PutS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 28, pd.size()); assertEquals("size should be eq", 31, pd.size());
assertTrue(pd.contains(PutS3Object.ACCESS_KEY)); assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(PutS3Object.BUCKET)); assertTrue(pd.contains(PutS3Object.BUCKET));
@ -172,6 +173,11 @@ public class TestPutS3Object {
assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST)); assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST));
assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST)); assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION)); assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION));
assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE));
assertTrue(pd.contains(PutS3Object.PROXY_HOST));
assertTrue(pd.contains(PutS3Object.PROXY_HOST_PORT));
assertTrue(pd.contains(PutS3Object.PROXY_USERNAME));
assertTrue(pd.contains(PutS3Object.PROXY_PASSWORD));
} }
} }

View File

@ -40,6 +40,10 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId> <artifactId>nifi-record</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.microsoft.azure</groupId> <groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId> <artifactId>azure-eventhubs</artifactId>

View File

@ -59,7 +59,8 @@ public abstract class AbstractAzureBlobProcessor extends AbstractProcessor {
AzureStorageUtils.PROP_SAS_TOKEN, AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.ACCOUNT_KEY,
BLOB)); BLOB,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet( private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList( new HashSet<>(Arrays.asList(
@ -73,7 +74,9 @@ public abstract class AbstractAzureBlobProcessor extends AbstractProcessor {
@Override @Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return AzureStorageUtils.validateCredentialProperties(validationContext); final Collection<ValidationResult> results = AzureStorageUtils.validateCredentialProperties(validationContext);
AzureStorageUtils.validateProxySpec(validationContext, results);
return results;
} }
@Override @Override

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.processors.azure.storage; package org.apache.nifi.processors.azure.storage;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobClient;
@ -58,7 +59,10 @@ public class DeleteAzureBlobStorage extends AbstractAzureBlobProcessor {
CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile); CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile);
CloudBlobContainer container = blobClient.getContainerReference(containerName); CloudBlobContainer container = blobClient.getContainerReference(containerName);
CloudBlob blob = container.getBlockBlobReference(blobPath); CloudBlob blob = container.getBlockBlobReference(blobPath);
blob.deleteIfExists();
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
blob.deleteIfExists(null, null, null, operationContext);
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);

View File

@ -24,6 +24,7 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import com.microsoft.azure.storage.OperationContext;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -69,6 +70,9 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile); CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile);
CloudBlobContainer container = blobClient.getContainerReference(containerName); CloudBlobContainer container = blobClient.getContainerReference(containerName);
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
final CloudBlob blob = container.getBlockBlobReference(blobPath); final CloudBlob blob = container.getBlockBlobReference(blobPath);
@ -76,7 +80,7 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
// distribution of download over threads, investigate // distribution of download over threads, investigate
flowFile = session.write(flowFile, os -> { flowFile = session.write(flowFile, os -> {
try { try {
blob.download(os); blob.download(os, null, null, operationContext);
} catch (StorageException e) { } catch (StorageException e) {
storedException.set(e); storedException.set(e);
throw new IOException(e); throw new IOException(e);

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.processors.azure.storage; package org.apache.nifi.processors.azure.storage;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageUri; import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties; import com.microsoft.azure.storage.blob.BlobProperties;
@ -93,7 +94,8 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
AzureStorageUtils.PROP_SAS_TOKEN, AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.ACCOUNT_KEY,
PROP_PREFIX)); PROP_PREFIX,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -102,7 +104,9 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
@Override @Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return AzureStorageUtils.validateCredentialProperties(validationContext); final Collection<ValidationResult> results = AzureStorageUtils.validateCredentialProperties(validationContext);
AzureStorageUtils.validateProxySpec(validationContext, results);
return results;
} }
@Override @Override
@ -162,7 +166,10 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null); CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
CloudBlobContainer container = blobClient.getContainerReference(containerName); CloudBlobContainer container = blobClient.getContainerReference(containerName);
for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) { final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
if (blob instanceof CloudBlob) { if (blob instanceof CloudBlob) {
CloudBlob cloudBlob = (CloudBlob) blob; CloudBlob cloudBlob = (CloudBlob) blob;
BlobProperties properties = cloudBlob.getProperties(); BlobProperties properties = cloudBlob.getProperties();

View File

@ -26,6 +26,7 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import com.microsoft.azure.storage.OperationContext;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -77,6 +78,9 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
CloudBlob blob = container.getBlockBlobReference(blobPath); CloudBlob blob = container.getBlockBlobReference(blobPath);
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
long length = flowFile.getSize(); long length = flowFile.getSize();
session.read(flowFile, rawIn -> { session.read(flowFile, rawIn -> {
@ -87,7 +91,7 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
} }
try { try {
blob.upload(in, length); blob.upload(in, length, null, null, operationContext);
BlobProperties properties = blob.getProperties(); BlobProperties properties = blob.getProperties();
attributes.put("azure.container", containerName); attributes.put("azure.container", containerName);
attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString()); attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.processors.azure.storage.queue; package org.apache.nifi.processors.azure.storage.queue;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueue; import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueClient; import com.microsoft.azure.storage.queue.CloudQueueClient;
@ -94,7 +95,7 @@ public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList( private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
BATCH_SIZE, VISIBILITY_TIMEOUT)); BATCH_SIZE, VISIBILITY_TIMEOUT, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
@Override @Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() { public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -122,7 +123,11 @@ public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
try { try {
cloudQueueClient = createCloudQueueClient(context, null); cloudQueueClient = createCloudQueueClient(context, null);
cloudQueue = cloudQueueClient.getQueueReference(queue); cloudQueue = cloudQueueClient.getQueueReference(queue);
retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, operationContext);
} catch (URISyntaxException | StorageException e) { } catch (URISyntaxException | StorageException e) {
getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e}); getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
context.yield(); context.yield();
@ -184,6 +189,8 @@ public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
.build()); .build());
} }
AzureStorageUtils.validateProxySpec(validationContext, problems);
return problems; return problems;
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.processors.azure.storage.queue; package org.apache.nifi.processors.azure.storage.queue;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueue; import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueClient; import com.microsoft.azure.storage.queue.CloudQueueClient;
@ -70,7 +71,7 @@ public class PutAzureQueueStorage extends AbstractAzureQueueStorage {
private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList( private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, TTL, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, TTL,
QUEUE, VISIBILITY_DELAY)); QUEUE, VISIBILITY_DELAY, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
@Override @Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() { public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -101,7 +102,11 @@ public class PutAzureQueueStorage extends AbstractAzureQueueStorage {
try { try {
cloudQueueClient = createCloudQueueClient(context, flowFile); cloudQueueClient = createCloudQueueClient(context, flowFile);
cloudQueue = cloudQueueClient.getQueueReference(queue); cloudQueue = cloudQueueClient.getQueueReference(queue);
cloudQueue.addMessage(message, ttl, delay, null, null);
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
cloudQueue.addMessage(message, ttl, delay, null, operationContext);
} catch (URISyntaxException | StorageException e) { } catch (URISyntaxException | StorageException e) {
getLogger().error("Failed to write the message to Azure Queue Storage due to {}", new Object[]{e}); getLogger().error("Failed to write the message to Azure Queue Storage due to {}", new Object[]{e});
flowFile = session.penalize(flowFile); flowFile = session.penalize(flowFile);
@ -147,6 +152,8 @@ public class PutAzureQueueStorage extends AbstractAzureQueueStorage {
} }
} }
AzureStorageUtils.validateProxySpec(validationContext, problems);
return problems; return problems;
} }
} }

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.azure.storage.utils; package org.apache.nifi.processors.azure.storage.utils;
import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageCredentials; import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobClient;
@ -29,6 +30,8 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
@ -162,4 +165,17 @@ public final class AzureStorageUtils {
return results; return results;
} }
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.SOCKS};
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
= ProxyConfiguration.createProxyConfigPropertyDescriptor(false, PROXY_SPECS);
public static void validateProxySpec(ValidationContext context, Collection<ValidationResult> results) {
ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
}
public static void setProxy(final OperationContext operationContext, final ProcessContext processContext) {
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(processContext);
operationContext.setProxy(proxyConfig.createProxy());
}
} }

View File

@ -56,6 +56,10 @@ language governing permissions and limitations under the License. -->
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId> <artifactId>nifi-record</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>

View File

@ -33,13 +33,14 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.StringUtils;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Proxy; import java.net.Proxy;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
@ -136,25 +137,26 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
.build(); .build();
} }
private static final List<PropertyDescriptor> propertyDescriptors; private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS};
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
= ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
static final List<PropertyDescriptor> COMMON_PROPERTY_DESCRIPTORS;
static { static {
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ES_URL); properties.add(ES_URL);
properties.add(PROP_SSL_CONTEXT_SERVICE);
properties.add(USERNAME);
properties.add(PASSWORD);
properties.add(CONNECT_TIMEOUT);
properties.add(RESPONSE_TIMEOUT);
properties.add(PROXY_CONFIGURATION_SERVICE);
properties.add(PROXY_HOST); properties.add(PROXY_HOST);
properties.add(PROXY_PORT); properties.add(PROXY_PORT);
properties.add(PROXY_USERNAME); properties.add(PROXY_USERNAME);
properties.add(PROXY_PASSWORD); properties.add(PROXY_PASSWORD);
properties.add(RESPONSE_TIMEOUT);
propertyDescriptors = Collections.unmodifiableList(properties); COMMON_PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties);
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.addAll(propertyDescriptors);
return properties;
} }
@Override @Override
@ -164,27 +166,38 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder(); OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder();
// Add a proxy if set // Add a proxy if set
final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue(); final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger(); final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
if (proxyHost != null && proxyPort != null) { final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
final Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)); if (proxyHost != null && proxyPort != null) {
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
componentProxyConfig.setProxyType(Proxy.Type.HTTP);
componentProxyConfig.setProxyServerHost(proxyHost);
componentProxyConfig.setProxyServerPort(proxyPort);
componentProxyConfig.setProxyUserName(context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue());
componentProxyConfig.setProxyUserPassword(context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue());
return componentProxyConfig;
}
return ProxyConfiguration.DIRECT_CONFIGURATION;
});
if (!Proxy.Type.DIRECT.equals(proxyConfig.getProxyType())) {
final Proxy proxy = proxyConfig.createProxy();
okHttpClient.proxy(proxy); okHttpClient.proxy(proxy);
if (proxyConfig.hasCredential()){
okHttpClient.proxyAuthenticator(new Authenticator() {
@Override
public Request authenticate(Route route, Response response) throws IOException {
final String credential=Credentials.basic(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
return response.request().newBuilder()
.header("Proxy-Authorization", credential)
.build();
}
});
}
} }
final String proxyUsername = context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue();
final String proxyPassword = context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
if (proxyUsername != null && proxyPassword != null){
okHttpClient.proxyAuthenticator(new Authenticator() {
@Override
public Request authenticate(Route route, Response response) throws IOException {
final String credential=Credentials.basic(proxyUsername, proxyPassword);
return response.request().newBuilder()
.header("Proxy-Authorization", credential)
.build();
}
});
}
// Set timeouts // Set timeouts
okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS); okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
@ -208,8 +221,12 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
results.add(new ValidationResult.Builder() results.add(new ValidationResult.Builder()
.valid(false) .valid(false)
.explanation("Proxy Host and Proxy Port must be both set or empty") .explanation("Proxy Host and Proxy Port must be both set or empty")
.subject("Proxy server configuration")
.build()); .build());
} }
ProxyConfiguration.validateProxySpec(validationContext, results, PROXY_SPECS);
return results; return results;
} }

View File

@ -147,13 +147,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
_rels.add(REL_NOT_FOUND); _rels.add(REL_NOT_FOUND);
relationships = Collections.unmodifiableSet(_rels); relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>(); final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(ES_URL);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(CONNECT_TIMEOUT);
descriptors.add(RESPONSE_TIMEOUT);
descriptors.add(DOC_ID); descriptors.add(DOC_ID);
descriptors.add(INDEX); descriptors.add(INDEX);
descriptors.add(TYPE); descriptors.add(TYPE);
@ -169,9 +163,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
@Override @Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); return propertyDescriptors;
properties.addAll(propertyDescriptors);
return properties;
} }
@OnScheduled @OnScheduled

View File

@ -150,13 +150,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
_rels.add(REL_RETRY); _rels.add(REL_RETRY);
relationships = Collections.unmodifiableSet(_rels); relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>(); final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(ES_URL);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(CONNECT_TIMEOUT);
descriptors.add(RESPONSE_TIMEOUT);
descriptors.add(ID_ATTRIBUTE); descriptors.add(ID_ATTRIBUTE);
descriptors.add(INDEX); descriptors.add(INDEX);
descriptors.add(TYPE); descriptors.add(TYPE);
@ -174,9 +168,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
@Override @Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); return propertyDescriptors;
properties.addAll(propertyDescriptors);
return properties;
} }
@Override @Override

View File

@ -189,13 +189,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
_rels.add(REL_RETRY); _rels.add(REL_RETRY);
relationships = Collections.unmodifiableSet(_rels); relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>(); final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(ES_URL);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(CONNECT_TIMEOUT);
descriptors.add(RESPONSE_TIMEOUT);
descriptors.add(RECORD_READER); descriptors.add(RECORD_READER);
descriptors.add(ID_RECORD_PATH); descriptors.add(ID_RECORD_PATH);
descriptors.add(INDEX); descriptors.add(INDEX);

View File

@ -230,13 +230,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
private QueryInfoRouteStrategy queryInfoRouteStrategy = QueryInfoRouteStrategy.NEVER; private QueryInfoRouteStrategy queryInfoRouteStrategy = QueryInfoRouteStrategy.NEVER;
static { static {
final List<PropertyDescriptor> descriptors = new ArrayList<>(); final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(ES_URL);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(CONNECT_TIMEOUT);
descriptors.add(RESPONSE_TIMEOUT);
descriptors.add(QUERY); descriptors.add(QUERY);
descriptors.add(PAGE_SIZE); descriptors.add(PAGE_SIZE);
descriptors.add(INDEX); descriptors.add(INDEX);
@ -257,9 +251,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
@Override @Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); return propertyDescriptors;
properties.addAll(propertyDescriptors);
return properties;
} }
@OnScheduled @OnScheduled

View File

@ -182,13 +182,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
_rels.add(REL_FAILURE); _rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_rels); relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>(); final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(ES_URL);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(CONNECT_TIMEOUT);
descriptors.add(RESPONSE_TIMEOUT);
descriptors.add(QUERY); descriptors.add(QUERY);
descriptors.add(SCROLL_DURATION); descriptors.add(SCROLL_DURATION);
descriptors.add(PAGE_SIZE); descriptors.add(PAGE_SIZE);
@ -207,9 +201,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
@Override @Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); return propertyDescriptors;
properties.addAll(propertyDescriptors);
return properties;
} }
@OnScheduled @OnScheduled

View File

@ -64,6 +64,10 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId> <artifactId>nifi-record-serialization-service-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId> <artifactId>nifi-record</artifactId>

View File

@ -18,6 +18,7 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -27,6 +28,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.FTPTransfer; import org.apache.nifi.processors.standard.util.FTPTransfer;
@ -63,6 +66,7 @@ public class FetchFTP extends FetchFileTransfer {
properties.add(FTPTransfer.USE_COMPRESSION); properties.add(FTPTransfer.USE_COMPRESSION);
properties.add(FTPTransfer.CONNECTION_MODE); properties.add(FTPTransfer.CONNECTION_MODE);
properties.add(FTPTransfer.TRANSFER_MODE); properties.add(FTPTransfer.TRANSFER_MODE);
properties.add(FTPTransfer.PROXY_CONFIGURATION_SERVICE);
properties.add(FTPTransfer.PROXY_TYPE); properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST); properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT); properties.add(FTPTransfer.PROXY_PORT);
@ -76,4 +80,11 @@ public class FetchFTP extends FetchFileTransfer {
protected FileTransfer createFileTransfer(final ProcessContext context) { protected FileTransfer createFileTransfer(final ProcessContext context) {
return new FTPTransfer(context, getLogger()); return new FTPTransfer(context, getLogger());
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
FTPTransfer.validateProxySpec(validationContext, results);
return results;
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
@ -28,7 +29,10 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer;
@ -76,6 +80,12 @@ public class FetchSFTP extends FetchFileTransfer {
properties.add(SFTPTransfer.HOST_KEY_FILE); properties.add(SFTPTransfer.HOST_KEY_FILE);
properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING); properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
properties.add(SFTPTransfer.USE_COMPRESSION); properties.add(SFTPTransfer.USE_COMPRESSION);
properties.add(SFTPTransfer.PROXY_CONFIGURATION_SERVICE);
properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
return properties; return properties;
} }
@ -83,4 +93,11 @@ public class FetchSFTP extends FetchFileTransfer {
protected FileTransfer createFileTransfer(final ProcessContext context) { protected FileTransfer createFileTransfer(final ProcessContext context) {
return new SFTPTransfer(context, getLogger()); return new SFTPTransfer(context, getLogger());
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Collection<ValidationResult> results = new ArrayList<>();
SFTPTransfer.validateProxySpec(validationContext, results);
return results;
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -28,6 +29,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processors.standard.util.FTPTransfer; import org.apache.nifi.processors.standard.util.FTPTransfer;
@ -76,6 +79,7 @@ public class GetFTP extends GetFileTransfer {
properties.add(FTPTransfer.MAX_SELECTS); properties.add(FTPTransfer.MAX_SELECTS);
properties.add(FTPTransfer.REMOTE_POLL_BATCH_SIZE); properties.add(FTPTransfer.REMOTE_POLL_BATCH_SIZE);
properties.add(FTPTransfer.USE_NATURAL_ORDERING); properties.add(FTPTransfer.USE_NATURAL_ORDERING);
properties.add(FTPTransfer.PROXY_CONFIGURATION_SERVICE);
properties.add(FTPTransfer.PROXY_TYPE); properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST); properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT); properties.add(FTPTransfer.PROXY_PORT);
@ -95,4 +99,11 @@ public class GetFTP extends GetFileTransfer {
protected FileTransfer getFileTransfer(final ProcessContext context) { protected FileTransfer getFileTransfer(final ProcessContext context) {
return new FTPTransfer(context, getLogger()); return new FTPTransfer(context, getLogger());
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
FTPTransfer.validateProxySpec(validationContext, results);
return results;
}
} }

View File

@ -42,7 +42,6 @@ import java.util.regex.Pattern;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header; import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope; import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.auth.UsernamePasswordCredentials;
@ -93,6 +92,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.HTTPUtils;
import org.apache.nifi.security.util.KeyStoreUtils; import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
@ -100,6 +100,9 @@ import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple; import org.apache.nifi.util.Tuple;
import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_HOST;
import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_PORT;
@Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"}) @Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"})
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Fetches data from an HTTP or HTTPS URL and writes the data to the content of a FlowFile. Once the content has been fetched, the ETag and Last Modified " @CapabilityDescription("Fetches data from an HTTP or HTTPS URL and writes the data to the content of a FlowFile. Once the content has been fetched, the ETag and Last Modified "
@ -195,18 +198,6 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
.required(false) .required(false)
.identifiesControllerService(SSLContextService.class) .identifiesControllerService(SSLContextService.class)
.build(); .build();
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
.name("Proxy Host")
.description("The fully qualified hostname or IP address of the proxy server")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
.name("Proxy Port")
.description("The port of the proxy server")
.required(false)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final String DEFAULT_COOKIE_POLICY_STR = "default"; public static final String DEFAULT_COOKIE_POLICY_STR = "default";
public static final String STANDARD_COOKIE_POLICY_STR = "standard"; public static final String STANDARD_COOKIE_POLICY_STR = "standard";
@ -268,6 +259,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
properties.add(ACCEPT_CONTENT_TYPE); properties.add(ACCEPT_CONTENT_TYPE);
properties.add(FOLLOW_REDIRECTS); properties.add(FOLLOW_REDIRECTS);
properties.add(REDIRECT_COOKIE_POLICY); properties.add(REDIRECT_COOKIE_POLICY);
properties.add(HTTPUtils.PROXY_CONFIGURATION_SERVICE);
properties.add(PROXY_HOST); properties.add(PROXY_HOST);
properties.add(PROXY_PORT); properties.add(PROXY_PORT);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);
@ -315,13 +307,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
.build()); .build());
} }
if (context.getProperty(PROXY_HOST).isSet() && !context.getProperty(PROXY_PORT).isSet()) { HTTPUtils.validateProxyProperties(context, results);
results.add(new ValidationResult.Builder()
.explanation("Proxy Host was set but no Proxy Port was specified")
.valid(false)
.subject("Proxy server configuration")
.build());
}
return results; return results;
} }
@ -456,22 +442,18 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
final String password = context.getProperty(PASSWORD).getValue(); final String password = context.getProperty(PASSWORD).getValue();
// set the credentials if appropriate // set the credentials if appropriate
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
if (username != null) { if (username != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
if (password == null) { if (password == null) {
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username)); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
} else { } else {
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
} }
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
} }
// Set the proxy if specified // Set the proxy if specified
if (context.getProperty(PROXY_HOST).isSet() && context.getProperty(PROXY_PORT).isSet()) { HTTPUtils.setProxy(context, clientBuilder, credentialsProvider);
final String host = context.getProperty(PROXY_HOST).getValue();
final int port = context.getProperty(PROXY_PORT).asInteger();
clientBuilder.setProxy(new HttpHost(host, port));
}
// create request // create request
final HttpGet get = new HttpGet(url); final HttpGet get = new HttpGet(url);

View File

@ -33,6 +33,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer;
@ -81,6 +82,12 @@ public class GetSFTP extends GetFileTransfer {
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT); properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
properties.add(SFTPTransfer.USE_COMPRESSION); properties.add(SFTPTransfer.USE_COMPRESSION);
properties.add(SFTPTransfer.USE_NATURAL_ORDERING); properties.add(SFTPTransfer.USE_NATURAL_ORDERING);
properties.add(SFTPTransfer.PROXY_CONFIGURATION_SERVICE);
properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);
} }
@ -102,6 +109,8 @@ public class GetSFTP extends GetFileTransfer {
.build()); .build());
} }
SFTPTransfer.validateProxySpec(context, results);
return results; return results;
} }

View File

@ -59,6 +59,8 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.ProxyAuthenticator; import org.apache.nifi.processors.standard.util.ProxyAuthenticator;
import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream; import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth; import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
@ -79,7 +81,6 @@ import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Proxy; import java.net.Proxy;
import java.net.Proxy.Type; import java.net.Proxy.Type;
import java.net.URL; import java.net.URL;
@ -412,6 +413,10 @@ public final class InvokeHTTP extends AbstractProcessor {
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build(); .build();
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS};
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
= ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
PROP_METHOD, PROP_METHOD,
PROP_URL, PROP_URL,
@ -423,6 +428,7 @@ public final class InvokeHTTP extends AbstractProcessor {
PROP_ATTRIBUTES_TO_SEND, PROP_ATTRIBUTES_TO_SEND,
PROP_BASIC_AUTH_USERNAME, PROP_BASIC_AUTH_USERNAME,
PROP_BASIC_AUTH_PASSWORD, PROP_BASIC_AUTH_PASSWORD,
PROXY_CONFIGURATION_SERVICE,
PROP_PROXY_HOST, PROP_PROXY_HOST,
PROP_PROXY_PORT, PROP_PROXY_PORT,
PROP_PROXY_TYPE, PROP_PROXY_TYPE,
@ -565,6 +571,8 @@ public final class InvokeHTTP extends AbstractProcessor {
results.add(new ValidationResult.Builder().subject("SSL Context Service").valid(false).explanation("If Proxy Type is HTTPS, SSL Context Service must be set").build()); results.add(new ValidationResult.Builder().subject("SSL Context Service").valid(false).explanation("If Proxy Type is HTTPS, SSL Context Service must be set").build());
} }
ProxyConfiguration.validateProxySpec(validationContext, results, PROXY_SPECS);
return results; return results;
} }
@ -575,14 +583,30 @@ public final class InvokeHTTP extends AbstractProcessor {
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder(); OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder();
// Add a proxy if set // Add a proxy if set
final String proxyHost = context.getProperty(PROP_PROXY_HOST).evaluateAttributeExpressions().getValue(); boolean isHttpsProxy = HTTPS.equals(context.getProperty(PROP_PROXY_TYPE).evaluateAttributeExpressions().getValue());
final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).evaluateAttributeExpressions().asInteger(); final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
final String proxyType = context.getProperty(PROP_PROXY_TYPE).evaluateAttributeExpressions().getValue(); final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
boolean isHttpsProxy = false; final String proxyHost = context.getProperty(PROP_PROXY_HOST).evaluateAttributeExpressions().getValue();
if (proxyHost != null && proxyPort != null) { final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).evaluateAttributeExpressions().asInteger();
final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)); if (proxyHost != null && proxyPort != null) {
componentProxyConfig.setProxyType(Type.HTTP);
componentProxyConfig.setProxyServerHost(proxyHost);
componentProxyConfig.setProxyServerPort(proxyPort);
final String proxyUsername = trimToEmpty(context.getProperty(PROP_PROXY_USER).evaluateAttributeExpressions().getValue());
final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
componentProxyConfig.setProxyUserName(proxyUsername);
componentProxyConfig.setProxyUserPassword(proxyPassword);
}
return componentProxyConfig;
});
final Proxy proxy = proxyConfig.createProxy();
if (!Type.DIRECT.equals(proxy.type())) {
okHttpClientBuilder.proxy(proxy); okHttpClientBuilder.proxy(proxy);
isHttpsProxy = HTTPS.equals(proxyType); if (proxyConfig.hasCredential()) {
ProxyAuthenticator proxyAuthenticator = new ProxyAuthenticator(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
okHttpClientBuilder.proxyAuthenticator(proxyAuthenticator);
}
} }
// configure ETag cache if enabled // configure ETag cache if enabled
@ -691,7 +715,6 @@ public final class InvokeHTTP extends AbstractProcessor {
private void setAuthenticator(OkHttpClient.Builder okHttpClientBuilder, ProcessContext context) { private void setAuthenticator(OkHttpClient.Builder okHttpClientBuilder, ProcessContext context) {
final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue()); final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
final String proxyUsername = trimToEmpty(context.getProperty(PROP_PROXY_USER).evaluateAttributeExpressions().getValue());
// If the username/password properties are set then check if digest auth is being used // If the username/password properties are set then check if digest auth is being used
if (!authUser.isEmpty() && "true".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) { if (!authUser.isEmpty() && "true".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) {
@ -706,23 +729,8 @@ public final class InvokeHTTP extends AbstractProcessor {
com.burgstaller.okhttp.digest.Credentials credentials = new com.burgstaller.okhttp.digest.Credentials(authUser, authPass); com.burgstaller.okhttp.digest.Credentials credentials = new com.burgstaller.okhttp.digest.Credentials(authUser, authPass);
final DigestAuthenticator digestAuthenticator = new DigestAuthenticator(credentials); final DigestAuthenticator digestAuthenticator = new DigestAuthenticator(credentials);
if(!proxyUsername.isEmpty()) {
final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
ProxyAuthenticator proxyAuthenticator = new ProxyAuthenticator(proxyUsername, proxyPassword);
okHttpClientBuilder.proxyAuthenticator(proxyAuthenticator);
}
okHttpClientBuilder.interceptors().add(new AuthenticationCacheInterceptor(authCache)); okHttpClientBuilder.interceptors().add(new AuthenticationCacheInterceptor(authCache));
okHttpClientBuilder.authenticator(new CachingAuthenticatorDecorator(digestAuthenticator, authCache)); okHttpClientBuilder.authenticator(new CachingAuthenticatorDecorator(digestAuthenticator, authCache));
} else {
// Add proxy authentication only
if(!proxyUsername.isEmpty()) {
final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
ProxyAuthenticator proxyAuthenticator = new ProxyAuthenticator(proxyUsername, proxyPassword);
okHttpClientBuilder.proxyAuthenticator(proxyAuthenticator);
}
} }
} }

View File

@ -18,6 +18,7 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.Stateful;
@ -29,6 +30,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FileTransfer;
@ -79,6 +82,7 @@ public class ListFTP extends ListFileTransfer {
properties.add(FTPTransfer.DATA_TIMEOUT); properties.add(FTPTransfer.DATA_TIMEOUT);
properties.add(FTPTransfer.CONNECTION_MODE); properties.add(FTPTransfer.CONNECTION_MODE);
properties.add(FTPTransfer.TRANSFER_MODE); properties.add(FTPTransfer.TRANSFER_MODE);
properties.add(FTPTransfer.PROXY_CONFIGURATION_SERVICE);
properties.add(FTPTransfer.PROXY_TYPE); properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST); properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT); properties.add(FTPTransfer.PROXY_PORT);
@ -105,4 +109,11 @@ public class ListFTP extends ListFileTransfer {
// pick up where it left off, even if the Primary Node changes. // pick up where it left off, even if the Primary Node changes.
return Scope.CLUSTER; return Scope.CLUSTER;
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
FTPTransfer.validateProxySpec(validationContext, results);
return results;
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
@ -30,8 +31,11 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer;
@ -83,6 +87,12 @@ public class ListSFTP extends ListFileTransfer {
properties.add(SFTPTransfer.DATA_TIMEOUT); properties.add(SFTPTransfer.DATA_TIMEOUT);
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT); properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION); properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
properties.add(SFTPTransfer.PROXY_CONFIGURATION_SERVICE);
properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
return properties; return properties;
} }
@ -102,4 +112,11 @@ public class ListSFTP extends ListFileTransfer {
// pick up where it left off, even if the Primary Node changes. // pick up where it left off, even if the Primary Node changes.
return Scope.CLUSTER; return Scope.CLUSTER;
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Collection<ValidationResult> results = new ArrayList<>();
SFTPTransfer.validateProxySpec(validationContext, results);
return results;
}
} }

View File

@ -19,7 +19,6 @@ package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.http.Header; import org.apache.http.Header;
import org.apache.http.HttpException; import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor; import org.apache.http.HttpResponseInterceptor;
import org.apache.http.auth.AuthScope; import org.apache.http.auth.AuthScope;
@ -75,6 +74,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.HTTPUtils;
import org.apache.nifi.security.util.CertificateUtils; import org.apache.nifi.security.util.CertificateUtils;
import org.apache.nifi.security.util.KeyStoreUtils; import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
@ -127,6 +127,9 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_HOST;
import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_PORT;
@SupportsBatching @SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"http", "https", "remote", "copy", "archive"}) @Tags({"http", "https", "remote", "copy", "archive"})
@ -243,18 +246,6 @@ public class PostHTTP extends AbstractProcessor {
.required(false) .required(false)
.identifiesControllerService(SSLContextService.class) .identifiesControllerService(SSLContextService.class)
.build(); .build();
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
.name("Proxy Host")
.description("The fully qualified hostname or IP address of the proxy server")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
.name("Proxy Port")
.description("The port of the proxy server")
.required(false)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder() public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content-Type") .name("Content-Type")
.description("The Content-Type to specify for the content of the FlowFile being POSTed if " + SEND_AS_FLOWFILE.getName() + " is false. " .description("The Content-Type to specify for the content of the FlowFile being POSTed if " + SEND_AS_FLOWFILE.getName() + " is false. "
@ -302,6 +293,7 @@ public class PostHTTP extends AbstractProcessor {
properties.add(DATA_TIMEOUT); properties.add(DATA_TIMEOUT);
properties.add(ATTRIBUTES_AS_HEADERS_REGEX); properties.add(ATTRIBUTES_AS_HEADERS_REGEX);
properties.add(USER_AGENT); properties.add(USER_AGENT);
properties.add(HTTPUtils.PROXY_CONFIGURATION_SERVICE);
properties.add(PROXY_HOST); properties.add(PROXY_HOST);
properties.add(PROXY_PORT); properties.add(PROXY_PORT);
properties.add(CONTENT_TYPE); properties.add(CONTENT_TYPE);
@ -328,14 +320,6 @@ public class PostHTTP extends AbstractProcessor {
.valid(false).subject("SSL Context").build()); .valid(false).subject("SSL Context").build());
} }
if (context.getProperty(PROXY_HOST).isSet() && !context.getProperty(PROXY_PORT).isSet()) {
results.add(new ValidationResult.Builder()
.explanation("Proxy Host was set but no Proxy Port was specified")
.valid(false)
.subject("Proxy server configuration")
.build());
}
boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean(); boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean();
int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger(); int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
boolean chunkedSet = context.getProperty(CHUNKED_ENCODING).isSet(); boolean chunkedSet = context.getProperty(CHUNKED_ENCODING).isSet();
@ -345,6 +329,8 @@ public class PostHTTP extends AbstractProcessor {
.explanation("if compression level is 0 and not sending as a FlowFile, then the \'" + CHUNKED_ENCODING.getName() + "\' property must be set").build()); .explanation("if compression level is 0 and not sending as a FlowFile, then the \'" + CHUNKED_ENCODING.getName() + "\' property must be set").build());
} }
HTTPUtils.validateProxyProperties(context, results);
return results; return results;
} }
@ -535,22 +521,18 @@ public class PostHTTP extends AbstractProcessor {
final String username = context.getProperty(USERNAME).getValue(); final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue(); final String password = context.getProperty(PASSWORD).getValue();
// set the credentials if appropriate // set the credentials if appropriate
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
if (username != null) { if (username != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
if (password == null) { if (password == null) {
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username)); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
} else { } else {
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
} }
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
} }
// Set the proxy if specified // Set the proxy if specified
if (context.getProperty(PROXY_HOST).isSet() && context.getProperty(PROXY_PORT).isSet()) { HTTPUtils.setProxy(context, clientBuilder, credentialsProvider);
final String host = context.getProperty(PROXY_HOST).getValue();
final int port = context.getProperty(PROXY_PORT).asInteger();
clientBuilder.setProxy(new HttpHost(host, port));
}
client = clientBuilder.build(); client = clientBuilder.build();

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -36,6 +37,8 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
@ -89,6 +92,7 @@ public class PutFTP extends PutFileTransfer<FTPTransfer> {
properties.add(FTPTransfer.LAST_MODIFIED_TIME); properties.add(FTPTransfer.LAST_MODIFIED_TIME);
properties.add(FTPTransfer.PERMISSIONS); properties.add(FTPTransfer.PERMISSIONS);
properties.add(FTPTransfer.USE_COMPRESSION); properties.add(FTPTransfer.USE_COMPRESSION);
properties.add(FTPTransfer.PROXY_CONFIGURATION_SERVICE);
properties.add(FTPTransfer.PROXY_TYPE); properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST); properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT); properties.add(FTPTransfer.PROXY_PORT);
@ -163,4 +167,11 @@ public class PutFTP extends PutFileTransfer<FTPTransfer> {
return cmds; return cmds;
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
FTPTransfer.validateProxySpec(validationContext, results);
return results;
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -28,8 +29,11 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer;
@SupportsBatching @SupportsBatching
@ -70,6 +74,12 @@ public class PutSFTP extends PutFileTransfer<SFTPTransfer> {
properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING); properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT); properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
properties.add(SFTPTransfer.USE_COMPRESSION); properties.add(SFTPTransfer.USE_COMPRESSION);
properties.add(SFTPTransfer.PROXY_CONFIGURATION_SERVICE);
properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);
} }
@ -83,4 +93,10 @@ public class PutSFTP extends PutFileTransfer<SFTPTransfer> {
return new SFTPTransfer(context, getLogger()); return new SFTPTransfer(context, getLogger());
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Collection<ValidationResult> results = new ArrayList<>();
SFTPTransfer.validateProxySpec(validationContext, results);
return results;
}
} }

View File

@ -28,10 +28,12 @@ import java.nio.file.Paths;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPClient;
@ -39,6 +41,9 @@ import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPHTTPClient; import org.apache.commons.net.ftp.FTPHTTPClient;
import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.net.ftp.FTPReply;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
@ -46,6 +51,8 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
public class FTPTransfer implements FileTransfer { public class FTPTransfer implements FileTransfer {
@ -88,22 +95,26 @@ public class FTPTransfer implements FileTransfer {
.name("Proxy Host") .name("Proxy Host")
.description("The fully qualified hostname or IP address of the proxy server") .description("The fully qualified hostname or IP address of the proxy server")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .build();
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
.name("Proxy Port") .name("Proxy Port")
.description("The port of the proxy server") .description("The port of the proxy server")
.addValidator(StandardValidators.PORT_VALIDATOR) .addValidator(StandardValidators.PORT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .build();
public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
.name("Http Proxy Username") .name("Http Proxy Username")
.description("Http Proxy Username") .description("Http Proxy Username")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false) .required(false)
.build(); .build();
public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder() public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
.name("Http Proxy Password") .name("Http Proxy Password")
.description("Http Proxy Password") .description("Http Proxy Password")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false) .required(false)
.sensitive(true) .sensitive(true)
.build(); .build();
@ -123,6 +134,10 @@ public class FTPTransfer implements FileTransfer {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR) .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build(); .build();
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS};
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
= ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
private final ComponentLog logger; private final ComponentLog logger;
private final ProcessContext ctx; private final ProcessContext ctx;
@ -136,6 +151,10 @@ public class FTPTransfer implements FileTransfer {
this.logger = logger; this.logger = logger;
} }
public static void validateProxySpec(ValidationContext context, Collection<ValidationResult> results) {
ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
}
@Override @Override
public String getProtocolName() { public String getProtocolName() {
return "ftp"; return "ftp";
@ -522,12 +541,15 @@ public class FTPTransfer implements FileTransfer {
} }
} }
final Proxy.Type proxyType = Proxy.Type.valueOf(ctx.getProperty(PROXY_TYPE).getValue()); final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(ctx, createComponentProxyConfigSupplier(ctx));
final String proxyHost = ctx.getProperty(PROXY_HOST).getValue();
final Integer proxyPort = ctx.getProperty(PROXY_PORT).asInteger(); final Proxy.Type proxyType = proxyConfig.getProxyType();
final String proxyHost = proxyConfig.getProxyServerHost();
final Integer proxyPort = proxyConfig.getProxyServerPort();
FTPClient client; FTPClient client;
if (proxyType == Proxy.Type.HTTP) { if (proxyType == Proxy.Type.HTTP) {
client = new FTPHTTPClient(proxyHost, proxyPort, ctx.getProperty(HTTP_PROXY_USERNAME).getValue(), ctx.getProperty(HTTP_PROXY_PASSWORD).getValue()); client = new FTPHTTPClient(proxyHost, proxyPort, proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
} else { } else {
client = new FTPClient(); client = new FTPClient();
if (proxyType == Proxy.Type.SOCKS) { if (proxyType == Proxy.Type.SOCKS) {
@ -627,4 +649,17 @@ public class FTPTransfer implements FileTransfer {
} }
return number; return number;
} }
public static Supplier<ProxyConfiguration> createComponentProxyConfigSupplier(final PropertyContext ctx) {
return () -> {
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
componentProxyConfig.setProxyType(Proxy.Type.valueOf(ctx.getProperty(PROXY_TYPE).getValue()));
componentProxyConfig.setProxyServerHost(ctx.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue());
componentProxyConfig.setProxyServerPort(ctx.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger());
componentProxyConfig.setProxyUserName(ctx.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue());
componentProxyConfig.setProxyUserPassword(ctx.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue());
return componentProxyConfig;
};
}
} }

View File

@ -16,6 +16,21 @@
*/ */
package org.apache.nifi.processors.standard.util; package org.apache.nifi.processors.standard.util;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import java.net.Proxy;
import java.util.Collection;
import java.util.Map; import java.util.Map;
public class HTTPUtils { public class HTTPUtils {
@ -39,4 +54,63 @@ public class HTTPUtils {
} }
} }
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
.name("Proxy Host")
.description("The fully qualified hostname or IP address of the proxy server")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
.name("Proxy Port")
.description("The port of the proxy server")
.required(false)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
= ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
public static void setProxy(final ProcessContext context, final HttpClientBuilder clientBuilder, final CredentialsProvider credentialsProvider) {
// Set the proxy if specified
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
if (context.getProperty(PROXY_HOST).isSet() && context.getProperty(PROXY_PORT).isSet()) {
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
final String host = context.getProperty(PROXY_HOST).getValue();
final int port = context.getProperty(PROXY_PORT).asInteger();
componentProxyConfig.setProxyType(Proxy.Type.HTTP);
componentProxyConfig.setProxyServerHost(host);
componentProxyConfig.setProxyServerPort(port);
return componentProxyConfig;
}
return ProxyConfiguration.DIRECT_CONFIGURATION;
});
if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
final String host = proxyConfig.getProxyServerHost();
final int port = proxyConfig.getProxyServerPort();
clientBuilder.setProxy(new HttpHost(host, port));
if (proxyConfig.hasCredential()) {
final AuthScope proxyAuthScope = new AuthScope(host, port);
final UsernamePasswordCredentials proxyCredential
= new UsernamePasswordCredentials(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
credentialsProvider.setCredentials(proxyAuthScope, proxyCredential);
}
}
}
public static void validateProxyProperties(ValidationContext context, Collection<ValidationResult> results) {
if (context.getProperty(PROXY_HOST).isSet() && !context.getProperty(PROXY_PORT).isSet()) {
results.add(new ValidationResult.Builder()
.explanation("Proxy Host was set but no Proxy Port was specified")
.valid(false)
.subject("Proxy server configuration")
.build());
}
ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
}
} }

View File

@ -25,6 +25,7 @@ import java.nio.file.Paths;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@ -33,13 +34,18 @@ import java.util.Vector;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import com.jcraft.jsch.ProxySOCKS5;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.ChannelSftp;
@ -47,9 +53,12 @@ import com.jcraft.jsch.ChannelSftp.LsEntry;
import com.jcraft.jsch.ChannelSftp.LsEntrySelector; import com.jcraft.jsch.ChannelSftp.LsEntrySelector;
import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException; import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.ProxyHTTP;
import com.jcraft.jsch.Session; import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException; import com.jcraft.jsch.SftpException;
import static org.apache.nifi.processors.standard.util.FTPTransfer.createComponentProxyConfigSupplier;
public class SFTPTransfer implements FileTransfer { public class SFTPTransfer implements FileTransfer {
public static final PropertyDescriptor PRIVATE_KEY_PATH = new PropertyDescriptor.Builder() public static final PropertyDescriptor PRIVATE_KEY_PATH = new PropertyDescriptor.Builder()
@ -96,6 +105,7 @@ public class SFTPTransfer implements FileTransfer {
.required(true) .required(true)
.build(); .build();
/** /**
* Property which is used to decide if the {@link #ensureDirectoryExists(FlowFile, File)} method should perform a {@link ChannelSftp#ls(String)} before calling * Property which is used to decide if the {@link #ensureDirectoryExists(FlowFile, File)} method should perform a {@link ChannelSftp#ls(String)} before calling
* {@link ChannelSftp#mkdir(String)}. In most cases, the code should call ls before mkdir, but some weird permission setups (chmod 100) on a directory would cause the 'ls' to throw a permission * {@link ChannelSftp#mkdir(String)}. In most cases, the code should call ls before mkdir, but some weird permission setups (chmod 100) on a directory would cause the 'ls' to throw a permission
@ -116,6 +126,10 @@ public class SFTPTransfer implements FileTransfer {
.defaultValue("false") .defaultValue("false")
.build(); .build();
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS_AUTH};
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
= ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
private final ComponentLog logger; private final ComponentLog logger;
private final ProcessContext ctx; private final ProcessContext ctx;
@ -134,6 +148,10 @@ public class SFTPTransfer implements FileTransfer {
disableDirectoryListing = disableListing == null ? false : Boolean.TRUE.equals(disableListing.asBoolean()); disableDirectoryListing = disableListing == null ? false : Boolean.TRUE.equals(disableListing.asBoolean());
} }
public static void validateProxySpec(ValidationContext context, Collection<ValidationResult> results) {
ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
}
@Override @Override
public String getProtocolName() { public String getProtocolName() {
return "sftp"; return "sftp";
@ -418,6 +436,26 @@ public class SFTPTransfer implements FileTransfer {
ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(), ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(),
ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue()); ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue());
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(ctx, createComponentProxyConfigSupplier(ctx));
switch (proxyConfig.getProxyType()) {
case HTTP:
final ProxyHTTP proxyHTTP = new ProxyHTTP(proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort());
// Check if Username is set and populate the proxy accordingly
if (proxyConfig.hasCredential()) {
proxyHTTP.setUserPasswd(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
}
session.setProxy(proxyHTTP);
break;
case SOCKS:
final ProxySOCKS5 proxySOCKS5 = new ProxySOCKS5(proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort());
if (proxyConfig.hasCredential()) {
proxySOCKS5.setUserPasswd(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
}
session.setProxy(proxySOCKS5);
break;
}
final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE).getValue(); final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE).getValue();
if (hostKeyVal != null) { if (hostKeyVal != null) {
jsch.setKnownHosts(hostKeyVal); jsch.setKnownHosts(hostKeyVal);

View File

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-standard-services</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.7.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-proxy-configuration-api</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.7.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.proxy;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
import static org.apache.nifi.proxy.ProxySpec.HTTP;
import static org.apache.nifi.proxy.ProxySpec.HTTP_AUTH;
import static org.apache.nifi.proxy.ProxySpec.SOCKS;
import static org.apache.nifi.proxy.ProxySpec.SOCKS_AUTH;
public class ProxyConfiguration {
public static final ProxyConfiguration DIRECT_CONFIGURATION = new ProxyConfiguration();
public static PropertyDescriptor createProxyConfigPropertyDescriptor(final boolean hasComponentProxyConfigs, final ProxySpec ... _specs) {
final Set<ProxySpec> specs = getUniqueProxySpecs(_specs);
final StringBuilder description = new StringBuilder("Specifies the Proxy Configuration Controller Service to proxy network requests.");
if (hasComponentProxyConfigs) {
description.append(" If set, it supersedes proxy settings configured per component.");
}
description.append(" Supported proxies: ");
description.append(specs.stream().map(ProxySpec::getDisplayName).collect(Collectors.joining(", ")));
return new PropertyDescriptor.Builder()
.fromPropertyDescriptor(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)
.description(description.toString())
.build();
}
/**
* Remove redundancy. If X_AUTH is supported, then X should be supported, too.
* @param _specs original specs
* @return sorted unique specs
*/
private static Set<ProxySpec> getUniqueProxySpecs(ProxySpec ... _specs) {
final Set<ProxySpec> specs = Arrays.stream(_specs).sorted().collect(Collectors.toSet());
if (specs.contains(HTTP_AUTH)) {
specs.remove(HTTP);
}
if (specs.contains(SOCKS_AUTH)) {
specs.remove(SOCKS);
}
return specs;
}
/**
* This method can be used from customValidate method of components using this Controller Service
* to validate the service is configured with the supported proxy types.
* @param context the validation context
* @param results if validation fails, an invalid validation result will be added to this collection
* @param _specs specify supported proxy specs
*/
public static void validateProxySpec(ValidationContext context, Collection<ValidationResult> results, final ProxySpec ... _specs) {
final Set<ProxySpec> specs = getUniqueProxySpecs(_specs);
final Set<Proxy.Type> supportedProxyTypes = specs.stream().map(ProxySpec::getProxyType).collect(Collectors.toSet());
if (!context.getProperty(PROXY_CONFIGURATION_SERVICE).isSet()) {
return;
}
final ProxyConfigurationService proxyService = context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
final ProxyConfiguration proxyConfiguration = proxyService.getConfiguration();
final Proxy.Type proxyType = proxyConfiguration.getProxyType();
if (proxyType.equals(Proxy.Type.DIRECT)) {
return;
}
if (!supportedProxyTypes.contains(proxyType)) {
results.add(new ValidationResult.Builder()
.explanation(String.format("Proxy type %s is not supported.", proxyType))
.valid(false)
.subject(PROXY_CONFIGURATION_SERVICE.getDisplayName())
.build());
// If the proxy type is not supported, no need to do further validation.
return;
}
if (proxyConfiguration.hasCredential()) {
// If credential is set, check whether the component is capable to use it.
if (!specs.contains(Proxy.Type.HTTP.equals(proxyType) ? HTTP_AUTH : SOCKS_AUTH)) {
results.add(new ValidationResult.Builder()
.explanation(String.format("Proxy type %s with Authentication is not supported.", proxyType))
.valid(false)
.subject(PROXY_CONFIGURATION_SERVICE.getDisplayName())
.build());
}
}
}
/**
* A convenient method to get ProxyConfiguration instance from a PropertyContext.
* @param context the process context
* @return The proxy configurations at Controller Service if set, or DIRECT_CONFIGURATION
*/
public static ProxyConfiguration getConfiguration(PropertyContext context) {
return getConfiguration(context, () -> DIRECT_CONFIGURATION);
}
/**
* This method can be used by Components those originally have per component proxy configurations
* to implement ProxyConfiguration Controller Service with backward compatibility.
* @param context the process context
* @param perComponentSetting the function to supply ProxyConfiguration based on per component settings,
* only called when Proxy Configuration Service is not set
* @return The proxy configurations at Controller Service if set, or per component settings otherwise
*/
public static ProxyConfiguration getConfiguration(PropertyContext context, Supplier<ProxyConfiguration> perComponentSetting) {
if (context.getProperty(PROXY_CONFIGURATION_SERVICE).isSet()) {
final ProxyConfigurationService proxyService = context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
return proxyService.getConfiguration();
} else {
return perComponentSetting.get();
}
}
private Proxy.Type proxyType = Proxy.Type.DIRECT;
private String proxyServerHost;
private Integer proxyServerPort;
private String proxyUserName;
private String proxyUserPassword;
public Proxy.Type getProxyType() {
return proxyType;
}
public void setProxyType(Proxy.Type proxyType) {
this.proxyType = proxyType;
}
public String getProxyServerHost() {
return proxyServerHost;
}
public void setProxyServerHost(String proxyServerHost) {
this.proxyServerHost = proxyServerHost;
}
public Integer getProxyServerPort() {
return proxyServerPort;
}
public void setProxyServerPort(Integer proxyServerPort) {
this.proxyServerPort = proxyServerPort;
}
public boolean hasCredential() {
return proxyUserName != null && !proxyUserName.isEmpty();
}
public String getProxyUserName() {
return proxyUserName;
}
public void setProxyUserName(String proxyUserName) {
this.proxyUserName = proxyUserName;
}
public String getProxyUserPassword() {
return proxyUserPassword;
}
public void setProxyUserPassword(String proxyUserPassword) {
this.proxyUserPassword = proxyUserPassword;
}
/**
* Create a Proxy instance based on proxy type, proxy server host and port.
*/
public Proxy createProxy() {
return Proxy.Type.DIRECT.equals(proxyType) ? Proxy.NO_PROXY : new Proxy(proxyType, new InetSocketAddress(proxyServerHost, proxyServerPort));
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.proxy;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
/**
* Provides configurations to access a Proxy server.
*/
public interface ProxyConfigurationService extends ControllerService {
PropertyDescriptor PROXY_CONFIGURATION_SERVICE = new PropertyDescriptor.Builder()
.name("proxy-configuration-service")
.displayName("Proxy Configuration Service")
.description("Specifies the Proxy Configuration Controller Service to proxy network requests." +
" If set, it supersedes proxy settings configured per component.")
.identifiesControllerService(ProxyConfigurationService.class)
.required(false)
.build();
/**
* Returns proxy configurations.
* Implementations should return a non-null ProxyConfiguration instance which returns DIRECT proxy type instead of returning null,
* when underlying configuration or initialization is not done yet.
* @return A ProxyConfiguration instance.
*/
ProxyConfiguration getConfiguration();
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.proxy;
import java.net.Proxy;
public enum ProxySpec {
HTTP(Proxy.Type.HTTP, "HTTP"),
HTTP_AUTH(Proxy.Type.HTTP, "HTTP + AuthN"),
SOCKS(Proxy.Type.SOCKS, "SOCKS"),
SOCKS_AUTH(Proxy.Type.SOCKS, "SOCKS + AuthN");
private Proxy.Type proxyType;
private String displayName;
ProxySpec(Proxy.Type type, String displayName) {
this.proxyType = type;
this.displayName = displayName;
}
public Proxy.Type getProxyType() {
return proxyType;
}
public String getDisplayName() {
return displayName;
}
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.proxy;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.net.Proxy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static org.apache.nifi.proxy.ProxyConfiguration.DIRECT_CONFIGURATION;
import static org.apache.nifi.proxy.ProxyConfiguration.createProxyConfigPropertyDescriptor;
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
import static org.apache.nifi.proxy.ProxySpec.HTTP;
import static org.apache.nifi.proxy.ProxySpec.HTTP_AUTH;
import static org.apache.nifi.proxy.ProxySpec.SOCKS;
import static org.apache.nifi.proxy.ProxySpec.SOCKS_AUTH;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestProxyConfiguration {
private static class ComponentUsingProxy extends AbstractProcessor {
private ProxySpec[] proxySpecs;
private void setProxySpecs(ProxySpec ... proxySpecs) {
this.proxySpecs = proxySpecs;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.singletonList(createProxyConfigPropertyDescriptor(true, proxySpecs));
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
ProxyConfiguration.validateProxySpec(validationContext, results, proxySpecs);
return results;
}
}
private static final ProxyConfiguration HTTP_CONFIG = new ProxyConfiguration();
private static final ProxyConfiguration SOCKS_CONFIG = new ProxyConfiguration();
private static final ProxyConfiguration HTTP_AUTH_CONFIG = new ProxyConfiguration();
private static final ProxyConfiguration SOCKS_AUTH_CONFIG = new ProxyConfiguration();
static {
HTTP_CONFIG.setProxyType(Proxy.Type.HTTP);
HTTP_AUTH_CONFIG.setProxyType(Proxy.Type.HTTP);
HTTP_AUTH_CONFIG.setProxyUserName("proxy-user");
HTTP_AUTH_CONFIG.setProxyUserPassword("proxy-password");
SOCKS_CONFIG.setProxyType(Proxy.Type.SOCKS);
SOCKS_AUTH_CONFIG.setProxyType(Proxy.Type.SOCKS);
SOCKS_AUTH_CONFIG.setProxyUserName("proxy-user");
SOCKS_AUTH_CONFIG.setProxyUserPassword("proxy-password");
}
private void testValidateProxySpec(final boolean[] expectations, ProxySpec ... specs) throws InitializationException {
final String serviceId = "proxyConfigurationService";
final ProxyConfigurationService service = mock(ProxyConfigurationService.class);
when(service.getIdentifier()).thenReturn(serviceId);
when(service.getConfiguration()).thenReturn(DIRECT_CONFIGURATION, HTTP_CONFIG, HTTP_AUTH_CONFIG, SOCKS_CONFIG, SOCKS_AUTH_CONFIG);
final ComponentUsingProxy processor = new ComponentUsingProxy();
processor.setProxySpecs(specs);
final TestRunner testRunner = TestRunners.newTestRunner(processor);
testRunner.addControllerService(serviceId, service);
testRunner.enableControllerService(service);
testRunner.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId);
for (boolean expectation : expectations) {
if (expectation) {
testRunner.assertValid();
} else {
testRunner.assertNotValid();
}
}
}
@Test
public void testHTTP() throws Exception {
// DEFAULT, HTTP
testValidateProxySpec(new boolean[] {true, true, false, false, false}, HTTP);
}
@Test
public void testHTTPAuth() throws Exception {
// DEFAULT, HTTP, HTTP_AUTH
testValidateProxySpec(new boolean[] {true, true, true, false, false}, HTTP_AUTH);
}
@Test
public void testHTTP_HTTPAuth() throws Exception {
// DEFAULT, HTTP, HTTP_AUTH
testValidateProxySpec(new boolean[] {true, true, true, false, false}, HTTP, HTTP_AUTH);
}
@Test
public void testSOCKS() throws Exception {
// DEFAULT, SOCKS
testValidateProxySpec(new boolean[] {true, false, false, true, false}, SOCKS);
}
@Test
public void testSOCKSAuth() throws Exception {
// DEFAULT, SOCKS, SOCKS_AUTH
testValidateProxySpec(new boolean[] {true, false, false, true, true}, SOCKS_AUTH);
}
@Test
public void testSOCKS_SOCKSAuth() throws Exception {
// DEFAULT, SOCKS, SOCKS_AUTH
testValidateProxySpec(new boolean[] {true, false, false, true, true}, SOCKS, SOCKS_AUTH);
}
@Test
public void testHTTPAuth_SOCKS() throws Exception {
// DEFAULT, HTTP, HTTP_AUTH, SOCKS
testValidateProxySpec(new boolean[] {true, true, true, true, false}, HTTP_AUTH, SOCKS);
}
@Test
public void testHTTPAuth_SOCKSAuth() throws Exception {
// DEFAULT, HTTP, HTTP_AUTH, SOCKS, SOCKS_AUTH
testValidateProxySpec(new boolean[] {true, true, true, true, true}, HTTP_AUTH, SOCKS_AUTH);
}
}

View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-proxy-configuration-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.7.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-proxy-configuration-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>1.7.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration</artifactId>
<version>1.7.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-proxy-configuration-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.7.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-proxy-configuration</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-utils</artifactId>
<version>1.7.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.proxy;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import java.net.Proxy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@CapabilityDescription("Provides a set of configurations for different NiFi components to use a proxy server.")
@Tags({"Proxy"})
public class StandardProxyConfigurationService extends AbstractControllerService implements ProxyConfigurationService {
static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder()
.name("proxy-type")
.displayName("Proxy Type")
.description("Proxy type.")
.allowableValues(Proxy.Type.values())
.defaultValue(Proxy.Type.DIRECT.name())
.required(true)
.build();
static final PropertyDescriptor PROXY_SERVER_HOST = new PropertyDescriptor.Builder()
.name("proxy-server-host")
.displayName("Proxy Server Host")
.description("Proxy server hostname or ip-address.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor PROXY_SERVER_PORT = new PropertyDescriptor.Builder()
.name("proxy-server-port")
.displayName("Proxy Server Port")
.description("Proxy server port number.")
.addValidator(StandardValidators.PORT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor PROXY_USER_NAME = new PropertyDescriptor.Builder()
.name("proxy-user-name")
.displayName("Proxy User Name")
.description("The name of the proxy client for user authentication.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor PROXY_USER_PASSWORD = new PropertyDescriptor.Builder()
.name("proxy-user-password")
.displayName("Proxy User Password")
.description("The password of the proxy client for user authentication.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.sensitive(true)
.build();
private volatile ProxyConfiguration configuration = ProxyConfiguration.DIRECT_CONFIGURATION;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(PROXY_TYPE);
properties.add(PROXY_SERVER_HOST);
properties.add(PROXY_SERVER_PORT);
properties.add(PROXY_USER_NAME);
properties.add(PROXY_USER_PASSWORD);
return properties;
}
@OnEnabled
public void setConfiguredValues(final ConfigurationContext context) {
configuration = new ProxyConfiguration();
configuration.setProxyType(Proxy.Type.valueOf(context.getProperty(PROXY_TYPE).getValue()));
configuration.setProxyServerHost(context.getProperty(PROXY_SERVER_HOST).evaluateAttributeExpressions().getValue());
configuration.setProxyServerPort(context.getProperty(PROXY_SERVER_PORT).evaluateAttributeExpressions().asInteger());
configuration.setProxyUserName(context.getProperty(PROXY_USER_NAME).evaluateAttributeExpressions().getValue());
configuration.setProxyUserPassword(context.getProperty(PROXY_USER_PASSWORD).evaluateAttributeExpressions().getValue());
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Proxy.Type proxyType = Proxy.Type.valueOf(validationContext.getProperty(PROXY_TYPE).getValue());
if (Proxy.Type.DIRECT.equals(proxyType)) {
return Collections.emptyList();
}
final List<ValidationResult> results = new ArrayList<>();
if (!validationContext.getProperty(PROXY_SERVER_HOST).isSet()) {
results.add(new ValidationResult.Builder().subject(PROXY_SERVER_HOST.getDisplayName())
.explanation("required").valid(false).build());
}
if (!validationContext.getProperty(PROXY_SERVER_PORT).isSet()) {
results.add(new ValidationResult.Builder().subject(PROXY_SERVER_PORT.getDisplayName())
.explanation("required").valid(false).build());
}
return results;
}
@Override
public ProxyConfiguration getConfiguration() {
return configuration;
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.proxy.StandardProxyConfigurationService

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-standard-services</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.7.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-proxy-configuration-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-proxy-configuration</module>
<module>nifi-proxy-configuration-nar</module>
</modules>
</project>

View File

@ -83,5 +83,10 @@
<artifactId>nifi-kerberos-credentials-service-api</artifactId> <artifactId>nifi-kerberos-credentials-service-api</artifactId>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -43,5 +43,7 @@
<module>nifi-hwx-schema-registry-bundle</module> <module>nifi-hwx-schema-registry-bundle</module>
<module>nifi-kerberos-credentials-service-api</module> <module>nifi-kerberos-credentials-service-api</module>
<module>nifi-kerberos-credentials-service-bundle</module> <module>nifi-kerberos-credentials-service-bundle</module>
<module>nifi-proxy-configuration-api</module>
<module>nifi-proxy-configuration-bundle</module>
</modules> </modules>
</project> </project>

View File

@ -239,6 +239,11 @@
<artifactId>nifi-http-context-map</artifactId> <artifactId>nifi-http-context-map</artifactId>
<version>1.7.0-SNAPSHOT</version> <version>1.7.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
<version>1.7.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-volatile-provenance-repository</artifactId> <artifactId>nifi-volatile-provenance-repository</artifactId>