NIFI-13851 Migrated SiteToSiteReportingTask Proxy properties to ProxyConfigurationService (#9359)

- Fixed Reporting Task property migration: in case of sensitive properties the encrypted form was migrated to the new property value

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Peter Turcsanyi 2024-10-11 23:09:39 +02:00 committed by GitHub
parent fa2a01c823
commit 4c243bd877
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 57 additions and 42 deletions

View File

@ -65,6 +65,15 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-sink-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-migration-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>

View File

@ -31,6 +31,8 @@ import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.ProxyServiceMigration;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
@ -115,15 +117,18 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
properties.add(SiteToSiteUtils.TIMEOUT);
properties.add(SiteToSiteUtils.BATCH_SIZE);
properties.add(SiteToSiteUtils.TRANSPORT_PROTOCOL);
properties.add(SiteToSiteUtils.HTTP_PROXY_HOSTNAME);
properties.add(SiteToSiteUtils.HTTP_PROXY_PORT);
properties.add(SiteToSiteUtils.HTTP_PROXY_USERNAME);
properties.add(SiteToSiteUtils.HTTP_PROXY_PASSWORD);
properties.add(SiteToSiteUtils.PROXY_CONFIGURATION_SERVICE);
properties.add(RECORD_WRITER);
properties.add(ALLOW_NULL_VALUES);
return properties;
}
@Override
public void migrateProperties(final PropertyConfiguration config) {
ProxyServiceMigration.migrateProxyProperties(config, SiteToSiteUtils.PROXY_CONFIGURATION_SERVICE,
SiteToSiteUtils.OBSOLETE_PROXY_HOST, SiteToSiteUtils.OBSOLETE_PROXY_PORT, SiteToSiteUtils.OBSOLETE_PROXY_USERNAME, SiteToSiteUtils.OBSOLETE_PROXY_PASSWORD);
}
public void setup(final PropertyContext reportContext) {
if (siteToSiteClient == null) {
siteToSiteClient = SiteToSiteUtils.getClient(reportContext, getLogger(), null);

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.reporting.s2s;
import java.net.Proxy;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.nifi.components.PropertyDescriptor;
@ -28,6 +29,9 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy;
@ -35,10 +39,15 @@ import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils;
public class SiteToSiteUtils {
// Obsolete property names
public static final String OBSOLETE_PROXY_HOST = "s2s-http-proxy-hostname";
public static final String OBSOLETE_PROXY_PORT = "s2s-http-proxy-port";
public static final String OBSOLETE_PROXY_USERNAME = "s2s-http-proxy-username";
public static final String OBSOLETE_PROXY_PASSWORD = "s2s-http-proxy-password";
public static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder()
.name("Destination URL")
.displayName("Destination URL")
@ -105,36 +114,10 @@ public class SiteToSiteUtils {
.allowableValues(SiteToSiteTransportProtocol.values())
.defaultValue(SiteToSiteTransportProtocol.RAW.name())
.build();
public static final PropertyDescriptor HTTP_PROXY_HOSTNAME = new PropertyDescriptor.Builder()
.name("s2s-http-proxy-hostname")
.displayName("HTTP Proxy hostname")
.description("Specify the proxy server's hostname to use. If not specified, HTTP traffics are sent directly to the target NiFi instance.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxySpec.HTTP, ProxySpec.HTTP_AUTH))
.dependsOn(TRANSPORT_PROTOCOL, SiteToSiteTransportProtocol.HTTP.name())
.build();
public static final PropertyDescriptor HTTP_PROXY_PORT = new PropertyDescriptor.Builder()
.name("s2s-http-proxy-port")
.displayName("HTTP Proxy port")
.description("Specify the proxy server's port number, optional. If not specified, default port 80 will be used.")
.required(false)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
.name("s2s-http-proxy-username")
.displayName("HTTP Proxy username")
.description("Specify an user name to connect to the proxy server, optional.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
.name("s2s-http-proxy-password")
.displayName("HTTP Proxy password")
.description("Specify an user password to connect to the proxy server, optional.")
.required(false)
.sensitive(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
.name("Platform")
.description("The value to use for the platform field in each event.")
@ -162,9 +145,18 @@ public class SiteToSiteUtils {
final String destinationUrl = reportContext.getProperty(SiteToSiteUtils.DESTINATION_URL).evaluateAttributeExpressions().getValue();
final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(reportContext.getProperty(SiteToSiteUtils.TRANSPORT_PROTOCOL).getValue());
final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue()) ? null
: new HttpProxy(reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue(), reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_PORT).asInteger(),
reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_USERNAME).getValue(), reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_PASSWORD).getValue());
HttpProxy httpProxy = null;
if (mode == SiteToSiteTransportProtocol.HTTP) {
final ProxyConfigurationService proxyConfigurationService = reportContext.getProperty(SiteToSiteUtils.PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
if (proxyConfigurationService != null) {
final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) {
httpProxy = new HttpProxy(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort(),
proxyConfiguration.getProxyUserName(), proxyConfiguration.getProxyUserPassword());
}
}
}
// If no state manager was provided and this context supports retrieving it, do so
if (stateManager == null && reportContext instanceof ReportingContext) {

View File

@ -33,6 +33,8 @@ import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.ProxyServiceMigration;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
@ -67,10 +69,7 @@ public class SiteToSiteReportingRecordSink extends AbstractControllerService imp
properties.add(SiteToSiteUtils.TIMEOUT);
properties.add(SiteToSiteUtils.BATCH_SIZE);
properties.add(SiteToSiteUtils.TRANSPORT_PROTOCOL);
properties.add(SiteToSiteUtils.HTTP_PROXY_HOSTNAME);
properties.add(SiteToSiteUtils.HTTP_PROXY_PORT);
properties.add(SiteToSiteUtils.HTTP_PROXY_USERNAME);
properties.add(SiteToSiteUtils.HTTP_PROXY_PASSWORD);
properties.add(SiteToSiteUtils.PROXY_CONFIGURATION_SERVICE);
this.properties = Collections.unmodifiableList(properties);
this.stateManager = context.getStateManager();
}
@ -80,6 +79,12 @@ public class SiteToSiteReportingRecordSink extends AbstractControllerService imp
return properties;
}
@Override
public void migrateProperties(final PropertyConfiguration config) {
ProxyServiceMigration.migrateProxyProperties(config, SiteToSiteUtils.PROXY_CONFIGURATION_SERVICE,
SiteToSiteUtils.OBSOLETE_PROXY_HOST, SiteToSiteUtils.OBSOLETE_PROXY_PORT, SiteToSiteUtils.OBSOLETE_PROXY_USERNAME, SiteToSiteUtils.OBSOLETE_PROXY_PASSWORD);
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
try {

View File

@ -608,7 +608,11 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
final ControllerServiceFactory serviceFactory = new StandardControllerServiceFactory(controller.getExtensionManager(), controller.getFlowManager(),
controller.getControllerServiceProvider(), taskNode);
taskNode.migrateConfiguration(reportingTask.getProperties(), serviceFactory);
Map<String, String> rawPropertyValues = taskNode.getRawPropertyValues().entrySet().stream()
.collect(HashMap::new,
(m, e) -> m.put(e.getKey().getName(), e.getValue()),
HashMap::putAll);
taskNode.migrateConfiguration(rawPropertyValues, serviceFactory);
}
private void updateReportingTask(final ReportingTaskNode taskNode, final VersionedReportingTask reportingTask, final FlowController controller) {