diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java index 37ed73768f..fa123a3e67 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java @@ -27,7 +27,10 @@ import org.apache.nifi.events.EventReporter; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.StringUtils; import javax.net.ssl.SSLContext; import java.io.IOException; @@ -44,6 +47,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() .name("Destination URL") + .displayName("Destination URL") .description("The URL of the destination NiFi instance to send data to, " + "should be in the format http(s)://host:port/nifi.") .required(true) @@ -52,6 +56,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT .build(); static final PropertyDescriptor PORT_NAME = new PropertyDescriptor.Builder() .name("Input Port Name") + .displayName("Input Port Name") .description("The name of the Input Port to deliver data to.") .required(true) .expressionLanguageSupported(true) @@ -59,12 +64,14 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT .build(); static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder() .name("SSL Context Service") + .displayName("SSL Context Service") .description("The SSL Context Service to use when communicating with the destination. If not specified, communications will not be secure.") .required(false) .identifiesControllerService(SSLContextService.class) .build(); static final PropertyDescriptor INSTANCE_URL = new PropertyDescriptor.Builder() .name("Instance URL") + .displayName("Instance URL") .description("The URL of this instance to use in the Content URI of each event.") .required(true) .expressionLanguageSupported(true) @@ -73,6 +80,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT .build(); static final PropertyDescriptor COMPRESS = new PropertyDescriptor.Builder() .name("Compress Events") + .displayName("Compress Events") .description("Indicates whether or not to compress the data being sent.") .required(true) .allowableValues("true", "false") @@ -80,6 +88,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT .build(); static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() .name("Communications Timeout") + .displayName("Communications Timeout") .description("Specifies how long to wait to a response from the destination before deciding that an error has occurred and canceling the transaction") .required(true) .defaultValue("30 secs") @@ -87,11 +96,49 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT .build(); static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() .name("Batch Size") + .displayName("Batch Size") .description("Specifies how many records to send in a single batch, at most.") .required(true) .defaultValue("1000") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); + static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder() + .name("s2s-transport-protocol") + .displayName("Transport Protocol") + .description("Specifies which transport protocol to use for Site-to-Site communication.") + .required(true) + .allowableValues(SiteToSiteTransportProtocol.values()) + .defaultValue(SiteToSiteTransportProtocol.RAW.name()) + .build(); + static final PropertyDescriptor HTTP_PROXY_HOSTNAME = new PropertyDescriptor.Builder() + .name("s2s-http-proxy-hostname") + .displayName("HTTP Proxy hostname") + .description("Specify the proxy server's hostname to use. If not specified, HTTP traffics are sent directly to the target NiFi instance.") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + static final PropertyDescriptor HTTP_PROXY_PORT = new PropertyDescriptor.Builder() + .name("s2s-http-proxy-port") + .displayName("HTTP Proxy port") + .description("Specify the proxy server's port number, optional. If not specified, default port 80 will be used.") + .required(false) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder() + .name("s2s-http-proxy-username") + .displayName("HTTP Proxy username") + .description("Specify an user name to connect to the proxy server, optional.") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder() + .name("s2s-http-proxy-password") + .displayName("HTTP Proxy password") + .description("Specify an user password to connect to the proxy server, optional.") + .required(false) + .sensitive(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); protected volatile SiteToSiteClient siteToSiteClient; @@ -105,6 +152,11 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT properties.add(COMPRESS); properties.add(TIMEOUT); properties.add(BATCH_SIZE); + properties.add(TRANSPORT_PROTOCOL); + properties.add(HTTP_PROXY_HOSTNAME); + properties.add(HTTP_PROXY_PORT); + properties.add(HTTP_PROXY_USERNAME); + properties.add(HTTP_PROXY_PASSWORD); return properties; } @@ -131,6 +183,11 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT final String destinationUrl = context.getProperty(DESTINATION_URL).evaluateAttributeExpressions().getValue(); + final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue()); + final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(context.getProperty(HTTP_PROXY_HOSTNAME).getValue()) ? null + : new HttpProxy(context.getProperty(HTTP_PROXY_HOSTNAME).getValue(), context.getProperty(HTTP_PROXY_PORT).asInteger(), + context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue()); + siteToSiteClient = new SiteToSiteClient.Builder() .url(destinationUrl) .portName(context.getProperty(PORT_NAME).getValue()) @@ -138,6 +195,8 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT .eventReporter(eventReporter) .sslContext(sslContext) .timeout(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) + .transportProtocol(mode) + .httpProxy(httpProxy) .build(); } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java index 566b7804db..b829c1e20f 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java @@ -74,20 +74,12 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting private volatile long lastSentBulletinId = -1L; - static List descriptors = new ArrayList<>(); - - static { - descriptors.add(DESTINATION_URL); - descriptors.add(PORT_NAME); - descriptors.add(SSL_CONTEXT); - descriptors.add(COMPRESS); - descriptors.add(TIMEOUT); - descriptors.add(PLATFORM); - } - @Override protected List getSupportedPropertyDescriptors() { - return descriptors; + final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(PLATFORM); + properties.remove(BATCH_SIZE); + return properties; } @Override diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java index d5bce1b851..d247a4e705 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java @@ -39,6 +39,7 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.MockPropertyValue; import org.junit.Assert; @@ -126,6 +127,11 @@ public class TestSiteToSiteBulletinReportingTask { } properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000"); properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi"); + properties.put(SiteToSiteBulletinReportingTask.TRANSPORT_PROTOCOL, SiteToSiteTransportProtocol.HTTP.name()); + properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_HOSTNAME, "localhost"); + properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_PORT, "80"); + properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_USERNAME, "username"); + properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_PASSWORD, "password"); Mockito.doAnswer(new Answer() { @Override