diff --git a/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml b/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
index ae7322a30d..a48d65b67b 100644
--- a/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
+++ b/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
@@ -65,6 +65,15 @@
org.apache.nifi
nifi-record-sink-api
+
+ org.apache.nifi
+ nifi-proxy-configuration-api
+
+
+ org.apache.nifi
+ nifi-migration-utils
+ 2.0.0-SNAPSHOT
+
org.apache.nifi
nifi-avro-record-utils
diff --git a/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
index 34a2108256..135f8a92a1 100644
--- a/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
+++ b/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
@@ -31,6 +31,8 @@ import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.migration.PropertyConfiguration;
+import org.apache.nifi.migration.ProxyServiceMigration;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
@@ -115,15 +117,18 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
properties.add(SiteToSiteUtils.TIMEOUT);
properties.add(SiteToSiteUtils.BATCH_SIZE);
properties.add(SiteToSiteUtils.TRANSPORT_PROTOCOL);
- properties.add(SiteToSiteUtils.HTTP_PROXY_HOSTNAME);
- properties.add(SiteToSiteUtils.HTTP_PROXY_PORT);
- properties.add(SiteToSiteUtils.HTTP_PROXY_USERNAME);
- properties.add(SiteToSiteUtils.HTTP_PROXY_PASSWORD);
+ properties.add(SiteToSiteUtils.PROXY_CONFIGURATION_SERVICE);
properties.add(RECORD_WRITER);
properties.add(ALLOW_NULL_VALUES);
return properties;
}
+ @Override
+ public void migrateProperties(final PropertyConfiguration config) {
+ ProxyServiceMigration.migrateProxyProperties(config, SiteToSiteUtils.PROXY_CONFIGURATION_SERVICE,
+ SiteToSiteUtils.OBSOLETE_PROXY_HOST, SiteToSiteUtils.OBSOLETE_PROXY_PORT, SiteToSiteUtils.OBSOLETE_PROXY_USERNAME, SiteToSiteUtils.OBSOLETE_PROXY_PASSWORD);
+ }
+
public void setup(final PropertyContext reportContext) {
if (siteToSiteClient == null) {
siteToSiteClient = SiteToSiteUtils.getClient(reportContext, getLogger(), null);
diff --git a/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java b/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
index a09021be9a..9c6f739a42 100644
--- a/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
+++ b/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.reporting.s2s;
+import java.net.Proxy;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.nifi.components.PropertyDescriptor;
@@ -28,6 +29,9 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy;
@@ -35,10 +39,15 @@ import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.util.StringUtils;
public class SiteToSiteUtils {
+ // Obsolete property names
+ public static final String OBSOLETE_PROXY_HOST = "s2s-http-proxy-hostname";
+ public static final String OBSOLETE_PROXY_PORT = "s2s-http-proxy-port";
+ public static final String OBSOLETE_PROXY_USERNAME = "s2s-http-proxy-username";
+ public static final String OBSOLETE_PROXY_PASSWORD = "s2s-http-proxy-password";
+
public static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder()
.name("Destination URL")
.displayName("Destination URL")
@@ -105,36 +114,10 @@ public class SiteToSiteUtils {
.allowableValues(SiteToSiteTransportProtocol.values())
.defaultValue(SiteToSiteTransportProtocol.RAW.name())
.build();
- public static final PropertyDescriptor HTTP_PROXY_HOSTNAME = new PropertyDescriptor.Builder()
- .name("s2s-http-proxy-hostname")
- .displayName("HTTP Proxy hostname")
- .description("Specify the proxy server's hostname to use. If not specified, HTTP traffics are sent directly to the target NiFi instance.")
- .required(false)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxySpec.HTTP, ProxySpec.HTTP_AUTH))
+ .dependsOn(TRANSPORT_PROTOCOL, SiteToSiteTransportProtocol.HTTP.name())
.build();
- public static final PropertyDescriptor HTTP_PROXY_PORT = new PropertyDescriptor.Builder()
- .name("s2s-http-proxy-port")
- .displayName("HTTP Proxy port")
- .description("Specify the proxy server's port number, optional. If not specified, default port 80 will be used.")
- .required(false)
- .addValidator(StandardValidators.PORT_VALIDATOR)
- .build();
- public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
- .name("s2s-http-proxy-username")
- .displayName("HTTP Proxy username")
- .description("Specify an user name to connect to the proxy server, optional.")
- .required(false)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .build();
- public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
- .name("s2s-http-proxy-password")
- .displayName("HTTP Proxy password")
- .description("Specify an user password to connect to the proxy server, optional.")
- .required(false)
- .sensitive(true)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .build();
-
public static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
.name("Platform")
.description("The value to use for the platform field in each event.")
@@ -162,9 +145,18 @@ public class SiteToSiteUtils {
final String destinationUrl = reportContext.getProperty(SiteToSiteUtils.DESTINATION_URL).evaluateAttributeExpressions().getValue();
final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(reportContext.getProperty(SiteToSiteUtils.TRANSPORT_PROTOCOL).getValue());
- final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue()) ? null
- : new HttpProxy(reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue(), reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_PORT).asInteger(),
- reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_USERNAME).getValue(), reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_PASSWORD).getValue());
+
+ HttpProxy httpProxy = null;
+ if (mode == SiteToSiteTransportProtocol.HTTP) {
+ final ProxyConfigurationService proxyConfigurationService = reportContext.getProperty(SiteToSiteUtils.PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
+ if (proxyConfigurationService != null) {
+ final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
+ if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) {
+ httpProxy = new HttpProxy(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort(),
+ proxyConfiguration.getProxyUserName(), proxyConfiguration.getProxyUserPassword());
+ }
+ }
+ }
// If no state manager was provided and this context supports retrieving it, do so
if (stateManager == null && reportContext instanceof ReportingContext) {
diff --git a/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java b/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
index be118d0031..65bce7b083 100644
--- a/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
+++ b/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
@@ -33,6 +33,8 @@ import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.migration.PropertyConfiguration;
+import org.apache.nifi.migration.ProxyServiceMigration;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
@@ -67,10 +69,7 @@ public class SiteToSiteReportingRecordSink extends AbstractControllerService imp
properties.add(SiteToSiteUtils.TIMEOUT);
properties.add(SiteToSiteUtils.BATCH_SIZE);
properties.add(SiteToSiteUtils.TRANSPORT_PROTOCOL);
- properties.add(SiteToSiteUtils.HTTP_PROXY_HOSTNAME);
- properties.add(SiteToSiteUtils.HTTP_PROXY_PORT);
- properties.add(SiteToSiteUtils.HTTP_PROXY_USERNAME);
- properties.add(SiteToSiteUtils.HTTP_PROXY_PASSWORD);
+ properties.add(SiteToSiteUtils.PROXY_CONFIGURATION_SERVICE);
this.properties = Collections.unmodifiableList(properties);
this.stateManager = context.getStateManager();
}
@@ -80,6 +79,12 @@ public class SiteToSiteReportingRecordSink extends AbstractControllerService imp
return properties;
}
+ @Override
+ public void migrateProperties(final PropertyConfiguration config) {
+ ProxyServiceMigration.migrateProxyProperties(config, SiteToSiteUtils.PROXY_CONFIGURATION_SERVICE,
+ SiteToSiteUtils.OBSOLETE_PROXY_HOST, SiteToSiteUtils.OBSOLETE_PROXY_PORT, SiteToSiteUtils.OBSOLETE_PROXY_USERNAME, SiteToSiteUtils.OBSOLETE_PROXY_PASSWORD);
+ }
+
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
try {
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 942d2f5ac8..76b71636b2 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -608,7 +608,11 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
final ControllerServiceFactory serviceFactory = new StandardControllerServiceFactory(controller.getExtensionManager(), controller.getFlowManager(),
controller.getControllerServiceProvider(), taskNode);
- taskNode.migrateConfiguration(reportingTask.getProperties(), serviceFactory);
+ Map rawPropertyValues = taskNode.getRawPropertyValues().entrySet().stream()
+ .collect(HashMap::new,
+ (m, e) -> m.put(e.getKey().getName(), e.getValue()),
+ HashMap::putAll);
+ taskNode.migrateConfiguration(rawPropertyValues, serviceFactory);
}
private void updateReportingTask(final ReportingTaskNode taskNode, final VersionedReportingTask reportingTask, final FlowController controller) {