NIFI-12782 Migrated GCS processors' Proxy properties to ProxyConfigurationService

Extracted proxy service migration code into a common util module because the same logic was already used in AWS module,
and it is also reusable in other components for proxy property migration.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8400.
This commit is contained in:
Peter Turcsanyi 2024-02-12 23:38:46 +01:00 committed by Pierre Villard
parent d9bcc8b496
commit d370b470b8
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
10 changed files with 229 additions and 146 deletions

View File

@ -142,6 +142,11 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId> <artifactId>nifi-proxy-configuration-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-migration-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>com.github.ben-manes.caffeine</groupId> <groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId> <artifactId>caffeine</artifactId>

View File

@ -37,6 +37,7 @@ import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration; import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.ProxyServiceMigration;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
@ -49,9 +50,7 @@ import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.net.Proxy; import java.net.Proxy;
import java.net.Proxy.Type;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -69,7 +68,6 @@ import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;
public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends AmazonWebServiceClient> extends AbstractProcessor implements VerifiableProcessor { public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends AmazonWebServiceClient> extends AbstractProcessor implements VerifiableProcessor {
private static final String CREDENTIALS_SERVICE_CLASSNAME = "org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService"; private static final String CREDENTIALS_SERVICE_CLASSNAME = "org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService";
private static final String PROXY_SERVICE_CLASSNAME = "org.apache.nifi.proxy.StandardProxyConfigurationService";
// Obsolete property names // Obsolete property names
private static final String OBSOLETE_ACCESS_KEY = "Access Key"; private static final String OBSOLETE_ACCESS_KEY = "Access Key";
@ -85,11 +83,6 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends
private static final String AUTH_SERVICE_SECRET_KEY = "Secret Key"; private static final String AUTH_SERVICE_SECRET_KEY = "Secret Key";
private static final String AUTH_SERVICE_CREDENTIALS_FILE = "Credentials File"; private static final String AUTH_SERVICE_CREDENTIALS_FILE = "Credentials File";
private static final String AUTH_SERVICE_ANONYMOUS_CREDENTIALS = "anonymous-credentials"; private static final String AUTH_SERVICE_ANONYMOUS_CREDENTIALS = "anonymous-credentials";
private static final String PROXY_SERVICE_HOST = "proxy-server-host";
private static final String PROXY_SERVICE_PORT = "proxy-server-port";
private static final String PROXY_SERVICE_USERNAME = "proxy-user-name";
private static final String PROXY_SERVICE_PASSWORD = "proxy-user-password";
private static final String PROXY_SERVICE_TYPE = "proxy-type";
// Property Descriptors // Property Descriptors
@ -173,7 +166,7 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends
@Override @Override
public void migrateProperties(final PropertyConfiguration config) { public void migrateProperties(final PropertyConfiguration config) {
migrateAuthenticationProperties(config); migrateAuthenticationProperties(config);
migrateProxyProperties(config); ProxyServiceMigration.migrateProxyProperties(config, PROXY_CONFIGURATION_SERVICE, OBSOLETE_PROXY_HOST, OBSOLETE_PROXY_PORT, OBSOLETE_PROXY_USERNAME, OBSOLETE_PROXY_PASSWORD);
} }
private void migrateAuthenticationProperties(final PropertyConfiguration config) { private void migrateAuthenticationProperties(final PropertyConfiguration config) {
@ -201,27 +194,6 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends
config.removeProperty(OBSOLETE_CREDENTIALS_FILE); config.removeProperty(OBSOLETE_CREDENTIALS_FILE);
} }
private void migrateProxyProperties(final PropertyConfiguration config) {
if (config.isPropertySet(OBSOLETE_PROXY_HOST)) {
final Map<String, String> proxyProperties = new HashMap<>();
proxyProperties.put(PROXY_SERVICE_TYPE, Type.HTTP.name());
proxyProperties.put(PROXY_SERVICE_HOST, config.getRawPropertyValue(OBSOLETE_PROXY_HOST).get());
// Map any optional proxy configs
config.getRawPropertyValue(OBSOLETE_PROXY_PORT).ifPresent(value -> proxyProperties.put(PROXY_SERVICE_PORT, value));
config.getRawPropertyValue(OBSOLETE_PROXY_USERNAME).ifPresent(value -> proxyProperties.put(PROXY_SERVICE_USERNAME, value));
config.getRawPropertyValue(OBSOLETE_PROXY_PASSWORD).ifPresent(value -> proxyProperties.put(PROXY_SERVICE_PASSWORD, value));
final String serviceId = config.createControllerService(PROXY_SERVICE_CLASSNAME, proxyProperties);
config.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId);
}
config.removeProperty(OBSOLETE_PROXY_HOST);
config.removeProperty(OBSOLETE_PROXY_PORT);
config.removeProperty(OBSOLETE_PROXY_USERNAME);
config.removeProperty(OBSOLETE_PROXY_PASSWORD);
}
protected ClientConfiguration createConfiguration(final ProcessContext context) { protected ClientConfiguration createConfiguration(final ProcessContext context) {
return createConfiguration(context, context.getMaxConcurrentTasks()); return createConfiguration(context, context.getMaxConcurrentTasks());
} }

View File

@ -27,6 +27,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration; import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.ProxyServiceMigration;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -51,12 +52,10 @@ import software.amazon.awssdk.regions.Region;
import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManager;
import java.net.Proxy; import java.net.Proxy;
import java.net.Proxy.Type;
import java.net.URI; import java.net.URI;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -71,7 +70,6 @@ import java.util.concurrent.TimeUnit;
*/ */
public abstract class AbstractAwsProcessor<T extends SdkClient> extends AbstractSessionFactoryProcessor implements VerifiableProcessor { public abstract class AbstractAwsProcessor<T extends SdkClient> extends AbstractSessionFactoryProcessor implements VerifiableProcessor {
private static final String CREDENTIALS_SERVICE_CLASSNAME = "org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService"; private static final String CREDENTIALS_SERVICE_CLASSNAME = "org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService";
private static final String PROXY_SERVICE_CLASSNAME = "org.apache.nifi.proxy.StandardProxyConfigurationService";
// Obsolete property names // Obsolete property names
private static final String OBSOLETE_ACCESS_KEY = "Access Key"; private static final String OBSOLETE_ACCESS_KEY = "Access Key";
@ -87,11 +85,6 @@ public abstract class AbstractAwsProcessor<T extends SdkClient> extends Abstract
private static final String AUTH_SERVICE_SECRET_KEY = "Secret Key"; private static final String AUTH_SERVICE_SECRET_KEY = "Secret Key";
private static final String AUTH_SERVICE_CREDENTIALS_FILE = "Credentials File"; private static final String AUTH_SERVICE_CREDENTIALS_FILE = "Credentials File";
private static final String AUTH_SERVICE_ANONYMOUS_CREDENTIALS = "anonymous-credentials"; private static final String AUTH_SERVICE_ANONYMOUS_CREDENTIALS = "anonymous-credentials";
private static final String PROXY_SERVICE_HOST = "proxy-server-host";
private static final String PROXY_SERVICE_PORT = "proxy-server-port";
private static final String PROXY_SERVICE_USERNAME = "proxy-user-name";
private static final String PROXY_SERVICE_PASSWORD = "proxy-user-password";
private static final String PROXY_SERVICE_TYPE = "proxy-type";
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -196,7 +189,7 @@ public abstract class AbstractAwsProcessor<T extends SdkClient> extends Abstract
@Override @Override
public void migrateProperties(final PropertyConfiguration config) { public void migrateProperties(final PropertyConfiguration config) {
migrateAuthenticationProperties(config); migrateAuthenticationProperties(config);
migrateProxyProperties(config); ProxyServiceMigration.migrateProxyProperties(config, PROXY_CONFIGURATION_SERVICE, OBSOLETE_PROXY_HOST, OBSOLETE_PROXY_PORT, OBSOLETE_PROXY_USERNAME, OBSOLETE_PROXY_PASSWORD);
} }
private void migrateAuthenticationProperties(final PropertyConfiguration config) { private void migrateAuthenticationProperties(final PropertyConfiguration config) {
@ -224,27 +217,6 @@ public abstract class AbstractAwsProcessor<T extends SdkClient> extends Abstract
config.removeProperty(OBSOLETE_CREDENTIALS_FILE); config.removeProperty(OBSOLETE_CREDENTIALS_FILE);
} }
private void migrateProxyProperties(final PropertyConfiguration config) {
if (config.isPropertySet(OBSOLETE_PROXY_HOST)) {
final Map<String, String> proxyProperties = new HashMap<>();
proxyProperties.put(PROXY_SERVICE_TYPE, Type.HTTP.name());
proxyProperties.put(PROXY_SERVICE_HOST, config.getRawPropertyValue(OBSOLETE_PROXY_HOST).get());
// Map any optional proxy configs
config.getRawPropertyValue(OBSOLETE_PROXY_PORT).ifPresent(value -> proxyProperties.put(PROXY_SERVICE_PORT, value));
config.getRawPropertyValue(OBSOLETE_PROXY_USERNAME).ifPresent(value -> proxyProperties.put(PROXY_SERVICE_USERNAME, value));
config.getRawPropertyValue(OBSOLETE_PROXY_PASSWORD).ifPresent(value -> proxyProperties.put(PROXY_SERVICE_PASSWORD, value));
final String serviceId = config.createControllerService(PROXY_SERVICE_CLASSNAME, proxyProperties);
config.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId);
}
config.removeProperty(OBSOLETE_PROXY_HOST);
config.removeProperty(OBSOLETE_PROXY_PORT);
config.removeProperty(OBSOLETE_PROXY_USERNAME);
config.removeProperty(OBSOLETE_PROXY_PASSWORD);
}
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) { public void onScheduled(final ProcessContext context) {
getClient(context); getClient(context);

View File

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-extension-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-migration-utils</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.migration;
import org.apache.nifi.components.PropertyDescriptor;
import java.net.Proxy;
import java.util.HashMap;
import java.util.Map;
public final class ProxyServiceMigration {
static final String PROXY_SERVICE_CLASSNAME = "org.apache.nifi.proxy.StandardProxyConfigurationService";
static final String PROXY_SERVICE_TYPE = "proxy-type";
static final String PROXY_SERVICE_HOST = "proxy-server-host";
static final String PROXY_SERVICE_PORT = "proxy-server-port";
static final String PROXY_SERVICE_USERNAME = "proxy-user-name";
static final String PROXY_SERVICE_PASSWORD = "proxy-user-password";
private ProxyServiceMigration() {}
/**
* Migrates component level proxy properties to ProxyConfigurationService.
*
* @param config the component's property config to be migrated
* @param proxyServiceProperty the component's property descriptor referencing ProxyConfigurationService
* @param proxyHostProperty the name of the component level Proxy Host property
* @param proxyPortProperty the name of the component level Proxy Port property
* @param proxyUsernameProperty the name of the component level Proxy Username property
* @param proxyPasswordProperty the name of the component level Proxy Password property
*/
public static void migrateProxyProperties(final PropertyConfiguration config, final PropertyDescriptor proxyServiceProperty,
final String proxyHostProperty, final String proxyPortProperty,
final String proxyUsernameProperty, final String proxyPasswordProperty) {
if (config.isPropertySet(proxyHostProperty)) {
final Map<String, String> proxyProperties = new HashMap<>();
proxyProperties.put(PROXY_SERVICE_TYPE, Proxy.Type.HTTP.name());
proxyProperties.put(PROXY_SERVICE_HOST, config.getRawPropertyValue(proxyHostProperty).get());
// Map any optional proxy configs
config.getRawPropertyValue(proxyPortProperty).ifPresent(value -> proxyProperties.put(PROXY_SERVICE_PORT, value));
config.getRawPropertyValue(proxyUsernameProperty).ifPresent(value -> proxyProperties.put(PROXY_SERVICE_USERNAME, value));
config.getRawPropertyValue(proxyPasswordProperty).ifPresent(value -> proxyProperties.put(PROXY_SERVICE_PASSWORD, value));
final String serviceId = config.createControllerService(PROXY_SERVICE_CLASSNAME, proxyProperties);
config.setProperty(proxyServiceProperty, serviceId);
}
config.removeProperty(proxyHostProperty);
config.removeProperty(proxyPortProperty);
config.removeProperty(proxyUsernameProperty);
config.removeProperty(proxyPasswordProperty);
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.migration;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.util.MockPropertyConfiguration;
import org.apache.nifi.util.MockPropertyConfiguration.CreatedControllerService;
import org.apache.nifi.util.PropertyMigrationResult;
import org.junit.jupiter.api.Test;
import java.net.Proxy;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class ProxyServiceMigrationTest {
private static final PropertyDescriptor PROXY_SERVICE = new PropertyDescriptor.Builder()
.name("proxy-service")
.build();
private static final String OBSOLETE_PROXY_HOST = "proxy-host";
private static final String OBSOLETE_PROXY_PORT = "proxy-port";
private static final String OBSOLETE_PROXY_USERNAME = "proxy-username";
private static final String OBSOLETE_PROXY_PASSWORD = "proxy-password";
private static final String PROXY_HOST_VALUE = "localhost";
private static final String PROXY_PORT_VALUE = "8888";
private static final String PROXY_USERNAME_VALUE = "user";
private static final String PROXY_PASSWORD_VALUE = "pass";
@Test
void testMigrateProxyProperties() {
final Map<String, String> properties = Map.of(
OBSOLETE_PROXY_HOST, PROXY_HOST_VALUE,
OBSOLETE_PROXY_PORT, PROXY_PORT_VALUE,
OBSOLETE_PROXY_USERNAME, PROXY_USERNAME_VALUE,
OBSOLETE_PROXY_PASSWORD, PROXY_PASSWORD_VALUE
);
final MockPropertyConfiguration config = new MockPropertyConfiguration(properties);
ProxyServiceMigration.migrateProxyProperties(config, PROXY_SERVICE, OBSOLETE_PROXY_HOST, OBSOLETE_PROXY_PORT, OBSOLETE_PROXY_USERNAME, OBSOLETE_PROXY_PASSWORD);
assertFalse(config.hasProperty(OBSOLETE_PROXY_HOST));
assertFalse(config.hasProperty(OBSOLETE_PROXY_PORT));
assertFalse(config.hasProperty(OBSOLETE_PROXY_USERNAME));
assertFalse(config.hasProperty(OBSOLETE_PROXY_PASSWORD));
assertTrue(config.isPropertySet(PROXY_SERVICE));
PropertyMigrationResult result = config.toPropertyMigrationResult();
assertEquals(1, result.getCreatedControllerServices().size());
final CreatedControllerService createdService = result.getCreatedControllerServices().iterator().next();
assertEquals(config.getRawPropertyValue(PROXY_SERVICE).get(), createdService.id());
assertEquals(ProxyServiceMigration.PROXY_SERVICE_CLASSNAME, createdService.implementationClassName());
assertEquals(Map.of(
ProxyServiceMigration.PROXY_SERVICE_TYPE, Proxy.Type.HTTP.name(),
ProxyServiceMigration.PROXY_SERVICE_HOST, PROXY_HOST_VALUE,
ProxyServiceMigration.PROXY_SERVICE_PORT, PROXY_PORT_VALUE,
ProxyServiceMigration.PROXY_SERVICE_USERNAME, PROXY_USERNAME_VALUE,
ProxyServiceMigration.PROXY_SERVICE_PASSWORD, PROXY_PASSWORD_VALUE
),
createdService.serviceProperties());
}
}

View File

@ -28,6 +28,7 @@
<modules> <modules>
<module>nifi-bin-manager</module> <module>nifi-bin-manager</module>
<module>nifi-conflict-resolution</module>
<module>nifi-database-utils</module> <module>nifi-database-utils</module>
<module>nifi-database-test-utils</module> <module>nifi-database-test-utils</module>
<module>nifi-dbcp-base</module> <module>nifi-dbcp-base</module>
@ -38,14 +39,14 @@
<module>nifi-hadoop-utils</module> <module>nifi-hadoop-utils</module>
<module>nifi-kerberos-test-utils</module> <module>nifi-kerberos-test-utils</module>
<module>nifi-listed-entity</module> <module>nifi-listed-entity</module>
<module>nifi-migration-utils</module>
<module>nifi-prometheus-utils</module> <module>nifi-prometheus-utils</module>
<module>nifi-put-pattern</module> <module>nifi-put-pattern</module>
<module>nifi-record-path-property</module>
<module>nifi-record-utils</module> <module>nifi-record-utils</module>
<module>nifi-reporting-utils</module> <module>nifi-reporting-utils</module>
<module>nifi-resource-transfer</module> <module>nifi-resource-transfer</module>
<module>nifi-service-utils</module> <module>nifi-service-utils</module>
<module>nifi-syslog-utils</module> <module>nifi-syslog-utils</module>
<module>nifi-conflict-resolution</module>
<module>nifi-record-path-property</module>
</modules> </modules>
</project> </project>

View File

@ -87,6 +87,11 @@
<artifactId>nifi-conflict-resolution</artifactId> <artifactId>nifi-conflict-resolution</artifactId>
<version>2.0.0-SNAPSHOT</version> <version>2.0.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-migration-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId> <artifactId>nifi-mock</artifactId>

View File

@ -21,7 +21,6 @@ import com.google.cloud.Service;
import com.google.cloud.ServiceOptions; import com.google.cloud.ServiceOptions;
import com.google.cloud.TransportOptions; import com.google.cloud.TransportOptions;
import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.http.HttpTransportOptions;
import java.net.Proxy;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -32,6 +31,8 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.ProxyServiceMigration;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
@ -45,6 +46,12 @@ public abstract class AbstractGCPProcessor<
CloudService extends Service<CloudServiceOptions>, CloudService extends Service<CloudServiceOptions>,
CloudServiceOptions extends ServiceOptions<CloudService, CloudServiceOptions>> extends AbstractProcessor { CloudServiceOptions extends ServiceOptions<CloudService, CloudServiceOptions>> extends AbstractProcessor {
// Obsolete property names
private static final String OBSOLETE_PROXY_HOST = "gcp-proxy-host";
private static final String OBSOLETE_PROXY_PORT = "gcp-proxy-port";
private static final String OBSOLETE_PROXY_USERNAME = "gcp-proxy-user-name";
private static final String OBSOLETE_PROXY_PASSWORD = "gcp-proxy-user-password";
public static final PropertyDescriptor PROJECT_ID = new PropertyDescriptor public static final PropertyDescriptor PROJECT_ID = new PropertyDescriptor
.Builder().name("gcp-project-id") .Builder().name("gcp-project-id")
.displayName("Project ID") .displayName("Project ID")
@ -63,47 +70,6 @@ public abstract class AbstractGCPProcessor<
.addValidator(StandardValidators.INTEGER_VALIDATOR) .addValidator(StandardValidators.INTEGER_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor
.Builder().name("gcp-proxy-host")
.displayName("Proxy host")
.description("""
IP or hostname of the proxy to be used.
You might need to set the following properties in bootstrap for https proxy usage:
-Djdk.http.auth.tunneling.disabledSchemes=
-Djdk.http.auth.proxying.disabledSchemes=""")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor
.Builder().name("gcp-proxy-port")
.displayName("Proxy port")
.description("Proxy port number")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor
.Builder().name("gcp-proxy-user-name")
.displayName("HTTP Proxy Username")
.description("HTTP Proxy Username")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(false)
.build();
public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor
.Builder().name("gcp-proxy-user-password")
.displayName("HTTP Proxy Password")
.description("HTTP Proxy Password")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(false)
.sensitive(true)
.build();
public static final PropertyDescriptor GCP_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder() public static final PropertyDescriptor GCP_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
.name("GCP Credentials Provider Service") .name("GCP Credentials Provider Service")
.description("The Controller Service used to obtain Google Cloud Platform credentials.") .description("The Controller Service used to obtain Google Cloud Platform credentials.")
@ -111,6 +77,8 @@ public abstract class AbstractGCPProcessor<
.identifiesControllerService(GCPCredentialsService.class) .identifiesControllerService(GCPCredentialsService.class)
.build(); .build();
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS);
protected volatile CloudService cloudService; protected volatile CloudService cloudService;
protected CloudService getCloudService() { protected CloudService getCloudService() {
@ -122,14 +90,16 @@ public abstract class AbstractGCPProcessor<
return List.of(PROJECT_ID, return List.of(PROJECT_ID,
GCP_CREDENTIALS_PROVIDER_SERVICE, GCP_CREDENTIALS_PROVIDER_SERVICE,
RETRY_COUNT, RETRY_COUNT,
PROXY_HOST, PROXY_CONFIGURATION_SERVICE
PROXY_PORT,
HTTP_PROXY_USERNAME,
HTTP_PROXY_PASSWORD,
ProxyConfiguration.createProxyConfigPropertyDescriptor(true, ProxyAwareTransportFactory.PROXY_SPECS)
); );
} }
@Override
public void migrateProperties(final PropertyConfiguration config) {
ProxyServiceMigration.migrateProxyProperties(config, PROXY_CONFIGURATION_SERVICE, OBSOLETE_PROXY_HOST, OBSOLETE_PROXY_PORT, OBSOLETE_PROXY_USERNAME, OBSOLETE_PROXY_PASSWORD);
}
/** /**
* Verifies the cloud service configuration. This is in a separate method rather than implementing VerifiableProcessor due to type erasure. * Verifies the cloud service configuration. This is in a separate method rather than implementing VerifiableProcessor due to type erasure.
* @param context The process context * @param context The process context
@ -206,25 +176,7 @@ public abstract class AbstractGCPProcessor<
* @return Transport options object with proxy configuration * @return Transport options object with proxy configuration
*/ */
protected TransportOptions getTransportOptions(ProcessContext context) { protected TransportOptions getTransportOptions(ProcessContext context) {
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context, () -> { final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
if (context.getProperty(PROXY_HOST).isSet() && context.getProperty(PROXY_PORT).isSet()) {
final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
if (proxyHost != null && proxyPort != null && proxyPort > 0) {
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
componentProxyConfig.setProxyType(Proxy.Type.HTTP);
componentProxyConfig.setProxyServerHost(proxyHost);
componentProxyConfig.setProxyServerPort(proxyPort);
componentProxyConfig.setProxyUserName(proxyUser);
componentProxyConfig.setProxyUserPassword(proxyPassword);
return componentProxyConfig;
}
}
return ProxyConfiguration.DIRECT_CONFIGURATION;
});
final ProxyAwareTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration); final ProxyAwareTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
return HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build(); return HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build();

View File

@ -28,7 +28,6 @@ import java.util.List;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.proxy.ProxyConfiguration;
public abstract class AbstractGCPubSubWithProxyProcessor extends AbstractGCPubSubProcessor { public abstract class AbstractGCPubSubWithProxyProcessor extends AbstractGCPubSubProcessor {
@ -36,28 +35,13 @@ public abstract class AbstractGCPubSubWithProxyProcessor extends AbstractGCPubSu
public List<PropertyDescriptor> getSupportedPropertyDescriptors() { public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return List.of( return List.of(
PROJECT_ID, PROJECT_ID,
ProxyConfiguration.createProxyConfigPropertyDescriptor(true, ProxyAwareTransportFactory.PROXY_SPECS), PROXY_CONFIGURATION_SERVICE,
GCP_CREDENTIALS_PROVIDER_SERVICE GCP_CREDENTIALS_PROVIDER_SERVICE
); );
} }
protected TransportChannelProvider getTransportChannelProvider(ProcessContext context) { protected TransportChannelProvider getTransportChannelProvider(ProcessContext context) {
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context, () -> { final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
if (proxyHost != null && proxyPort != null && proxyPort > 0) {
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
componentProxyConfig.setProxyType(Proxy.Type.HTTP);
componentProxyConfig.setProxyServerHost(proxyHost);
componentProxyConfig.setProxyServerPort(proxyPort);
componentProxyConfig.setProxyUserName(proxyUser);
componentProxyConfig.setProxyUserPassword(proxyPassword);
return componentProxyConfig;
}
return ProxyConfiguration.DIRECT_CONFIGURATION;
});
return TopicAdminSettings.defaultGrpcTransportProviderBuilder() return TopicAdminSettings.defaultGrpcTransportProviderBuilder()
.setChannelConfigurator(managedChannelBuilder -> managedChannelBuilder.proxyDetector( .setChannelConfigurator(managedChannelBuilder -> managedChannelBuilder.proxyDetector(