mirror of https://github.com/apache/nifi.git
NIFI-881: Added support for Proxy Host and Proxy Port properties to InvokeHTTP
Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
parent
4f6c1cfff3
commit
82d32230e6
|
@ -25,9 +25,13 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Proxy;
|
||||
import java.net.Proxy.Type;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -56,6 +60,8 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
|
|||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ProcessorLog;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
|
@ -115,7 +121,19 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
attributesToSend = Pattern.compile(trimmedValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
final List<ValidationResult> results = new ArrayList<>(1);
|
||||
final boolean proxyHostSet = validationContext.getProperty(Config.PROP_PROXY_HOST).isSet();
|
||||
final boolean proxyPortSet = validationContext.getProperty(Config.PROP_PROXY_PORT).isSet();
|
||||
|
||||
if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
|
||||
results.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -220,6 +238,20 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
.identifiesControllerService(SSLContextService.class)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_PROXY_HOST = new PropertyDescriptor.Builder()
|
||||
.name("Proxy Host")
|
||||
.description("The fully qualified hostname or IP address of the proxy server")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_PROXY_PORT = new PropertyDescriptor.Builder()
|
||||
.name("Proxy Port")
|
||||
.description("The port of the proxy server")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.PORT_VALIDATOR)
|
||||
.build();
|
||||
|
||||
// Per RFC 7235, 2617, and 2616.
|
||||
// basic-credentials = base64-user-pass
|
||||
// base64-user-pass = userid ":" password
|
||||
|
@ -249,16 +281,18 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
|
||||
PROP_METHOD,
|
||||
PROP_URL,
|
||||
PROP_SSL_CONTEXT_SERVICE,
|
||||
PROP_CONNECT_TIMEOUT,
|
||||
PROP_READ_TIMEOUT,
|
||||
PROP_DATE_HEADER,
|
||||
PROP_FOLLOW_REDIRECTS,
|
||||
PROP_ATTRIBUTES_TO_SEND,
|
||||
PROP_BASIC_AUTH_USERNAME,
|
||||
PROP_BASIC_AUTH_PASSWORD
|
||||
PROP_METHOD,
|
||||
PROP_URL,
|
||||
PROP_SSL_CONTEXT_SERVICE,
|
||||
PROP_CONNECT_TIMEOUT,
|
||||
PROP_READ_TIMEOUT,
|
||||
PROP_DATE_HEADER,
|
||||
PROP_FOLLOW_REDIRECTS,
|
||||
PROP_ATTRIBUTES_TO_SEND,
|
||||
PROP_BASIC_AUTH_USERNAME,
|
||||
PROP_BASIC_AUTH_PASSWORD,
|
||||
PROP_PROXY_HOST,
|
||||
PROP_PROXY_PORT
|
||||
));
|
||||
|
||||
// property to allow the hostname verifier to be overridden
|
||||
|
@ -386,10 +420,10 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
|
||||
private void openConnection() throws IOException {
|
||||
// read the url property from the context
|
||||
String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(request).getValue());
|
||||
URL url = new URL(urlstr);
|
||||
String authuser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
|
||||
String authpass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
|
||||
final String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(request).getValue());
|
||||
final URL url = new URL(urlstr);
|
||||
final String authuser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
|
||||
final String authpass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
|
||||
|
||||
String authstrencoded = null;
|
||||
if (!authuser.isEmpty()) {
|
||||
|
@ -399,7 +433,15 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
}
|
||||
|
||||
// create the connection
|
||||
conn = (HttpURLConnection) url.openConnection();
|
||||
final String proxyHost = context.getProperty(PROP_PROXY_HOST).getValue();
|
||||
final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).asInteger();
|
||||
if (proxyHost == null || proxyPort == null) {
|
||||
conn = (HttpURLConnection) url.openConnection();
|
||||
} else {
|
||||
final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
|
||||
conn = (HttpURLConnection) url.openConnection(proxy);
|
||||
}
|
||||
|
||||
if (authstrencoded != null) {
|
||||
conn.setRequestProperty("Authorization", "Basic " + authstrencoded);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue