NIFI-3809 - Added HTTP mode and HTTP proxy for S2S Reporting Tasks

This closes #1754.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Pierre Villard 2017-05-04 22:43:32 +02:00 committed by Koji Kawamura
parent b6eb0ac0fb
commit e05005584d
3 changed files with 69 additions and 12 deletions

View File

@ -27,7 +27,10 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.io.IOException; import java.io.IOException;
@ -44,6 +47,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder()
.name("Destination URL") .name("Destination URL")
.displayName("Destination URL")
.description("The URL of the destination NiFi instance to send data to, " + .description("The URL of the destination NiFi instance to send data to, " +
"should be in the format http(s)://host:port/nifi.") "should be in the format http(s)://host:port/nifi.")
.required(true) .required(true)
@ -52,6 +56,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
.build(); .build();
static final PropertyDescriptor PORT_NAME = new PropertyDescriptor.Builder() static final PropertyDescriptor PORT_NAME = new PropertyDescriptor.Builder()
.name("Input Port Name") .name("Input Port Name")
.displayName("Input Port Name")
.description("The name of the Input Port to deliver data to.") .description("The name of the Input Port to deliver data to.")
.required(true) .required(true)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
@ -59,12 +64,14 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
.build(); .build();
static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder() static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
.name("SSL Context Service") .name("SSL Context Service")
.displayName("SSL Context Service")
.description("The SSL Context Service to use when communicating with the destination. If not specified, communications will not be secure.") .description("The SSL Context Service to use when communicating with the destination. If not specified, communications will not be secure.")
.required(false) .required(false)
.identifiesControllerService(SSLContextService.class) .identifiesControllerService(SSLContextService.class)
.build(); .build();
static final PropertyDescriptor INSTANCE_URL = new PropertyDescriptor.Builder() static final PropertyDescriptor INSTANCE_URL = new PropertyDescriptor.Builder()
.name("Instance URL") .name("Instance URL")
.displayName("Instance URL")
.description("The URL of this instance to use in the Content URI of each event.") .description("The URL of this instance to use in the Content URI of each event.")
.required(true) .required(true)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
@ -73,6 +80,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
.build(); .build();
static final PropertyDescriptor COMPRESS = new PropertyDescriptor.Builder() static final PropertyDescriptor COMPRESS = new PropertyDescriptor.Builder()
.name("Compress Events") .name("Compress Events")
.displayName("Compress Events")
.description("Indicates whether or not to compress the data being sent.") .description("Indicates whether or not to compress the data being sent.")
.required(true) .required(true)
.allowableValues("true", "false") .allowableValues("true", "false")
@ -80,6 +88,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
.build(); .build();
static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout") .name("Communications Timeout")
.displayName("Communications Timeout")
.description("Specifies how long to wait to a response from the destination before deciding that an error has occurred and canceling the transaction") .description("Specifies how long to wait to a response from the destination before deciding that an error has occurred and canceling the transaction")
.required(true) .required(true)
.defaultValue("30 secs") .defaultValue("30 secs")
@ -87,11 +96,49 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
.build(); .build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size") .name("Batch Size")
.displayName("Batch Size")
.description("Specifies how many records to send in a single batch, at most.") .description("Specifies how many records to send in a single batch, at most.")
.required(true) .required(true)
.defaultValue("1000") .defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder()
.name("s2s-transport-protocol")
.displayName("Transport Protocol")
.description("Specifies which transport protocol to use for Site-to-Site communication.")
.required(true)
.allowableValues(SiteToSiteTransportProtocol.values())
.defaultValue(SiteToSiteTransportProtocol.RAW.name())
.build();
static final PropertyDescriptor HTTP_PROXY_HOSTNAME = new PropertyDescriptor.Builder()
.name("s2s-http-proxy-hostname")
.displayName("HTTP Proxy hostname")
.description("Specify the proxy server's hostname to use. If not specified, HTTP traffics are sent directly to the target NiFi instance.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor HTTP_PROXY_PORT = new PropertyDescriptor.Builder()
.name("s2s-http-proxy-port")
.displayName("HTTP Proxy port")
.description("Specify the proxy server's port number, optional. If not specified, default port 80 will be used.")
.required(false)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
.name("s2s-http-proxy-username")
.displayName("HTTP Proxy username")
.description("Specify an user name to connect to the proxy server, optional.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
.name("s2s-http-proxy-password")
.displayName("HTTP Proxy password")
.description("Specify an user password to connect to the proxy server, optional.")
.required(false)
.sensitive(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
protected volatile SiteToSiteClient siteToSiteClient; protected volatile SiteToSiteClient siteToSiteClient;
@ -105,6 +152,11 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
properties.add(COMPRESS); properties.add(COMPRESS);
properties.add(TIMEOUT); properties.add(TIMEOUT);
properties.add(BATCH_SIZE); properties.add(BATCH_SIZE);
properties.add(TRANSPORT_PROTOCOL);
properties.add(HTTP_PROXY_HOSTNAME);
properties.add(HTTP_PROXY_PORT);
properties.add(HTTP_PROXY_USERNAME);
properties.add(HTTP_PROXY_PASSWORD);
return properties; return properties;
} }
@ -131,6 +183,11 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
final String destinationUrl = context.getProperty(DESTINATION_URL).evaluateAttributeExpressions().getValue(); final String destinationUrl = context.getProperty(DESTINATION_URL).evaluateAttributeExpressions().getValue();
final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(context.getProperty(HTTP_PROXY_HOSTNAME).getValue()) ? null
: new HttpProxy(context.getProperty(HTTP_PROXY_HOSTNAME).getValue(), context.getProperty(HTTP_PROXY_PORT).asInteger(),
context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue());
siteToSiteClient = new SiteToSiteClient.Builder() siteToSiteClient = new SiteToSiteClient.Builder()
.url(destinationUrl) .url(destinationUrl)
.portName(context.getProperty(PORT_NAME).getValue()) .portName(context.getProperty(PORT_NAME).getValue())
@ -138,6 +195,8 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
.eventReporter(eventReporter) .eventReporter(eventReporter)
.sslContext(sslContext) .sslContext(sslContext)
.timeout(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) .timeout(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
.transportProtocol(mode)
.httpProxy(httpProxy)
.build(); .build();
} }

View File

@ -74,20 +74,12 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
private volatile long lastSentBulletinId = -1L; private volatile long lastSentBulletinId = -1L;
static List<PropertyDescriptor> descriptors = new ArrayList<>();
static {
descriptors.add(DESTINATION_URL);
descriptors.add(PORT_NAME);
descriptors.add(SSL_CONTEXT);
descriptors.add(COMPRESS);
descriptors.add(TIMEOUT);
descriptors.add(PLATFORM);
}
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors; final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(PLATFORM);
properties.remove(BATCH_SIZE);
return properties;
} }
@Override @Override

View File

@ -39,6 +39,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.state.MockStateManager; import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockPropertyValue; import org.apache.nifi.util.MockPropertyValue;
import org.junit.Assert; import org.junit.Assert;
@ -126,6 +127,11 @@ public class TestSiteToSiteBulletinReportingTask {
} }
properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000"); properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi"); properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi");
properties.put(SiteToSiteBulletinReportingTask.TRANSPORT_PROTOCOL, SiteToSiteTransportProtocol.HTTP.name());
properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_HOSTNAME, "localhost");
properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_PORT, "80");
properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_USERNAME, "username");
properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_PASSWORD, "password");
Mockito.doAnswer(new Answer<PropertyValue>() { Mockito.doAnswer(new Answer<PropertyValue>() {
@Override @Override